package com.dji.sample.wayline.service.impl; import com.dji.sample.common.error.CommonErrorEnum; import com.dji.sample.common.model.ResponseResult; import com.dji.sample.component.mqtt.model.*; import com.dji.sample.component.mqtt.service.IMessageSenderService; import com.dji.sample.component.redis.RedisConst; import com.dji.sample.component.redis.RedisOpsUtils; import com.dji.sample.component.websocket.model.BizCodeEnum; import com.dji.sample.component.websocket.service.ISendMessageService; import com.dji.sample.manage.model.dto.DeviceDTO; import com.dji.sample.manage.model.enums.UserTypeEnum; import com.dji.sample.manage.service.IDeviceRedisService; import com.dji.sample.media.model.MediaFileCountDTO; import com.dji.sample.wayline.model.dto.WaylineJobDTO; import com.dji.sample.wayline.model.dto.WaylineJobKey; import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver; import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum; import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum; import com.dji.sample.wayline.service.IFlightTaskService; import com.dji.sample.wayline.service.IWaylineJobService; import com.dji.sample.wayline.service.IWaylineRedisService; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.http.HttpStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.messaging.MessageHeaders; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.*; import java.util.concurrent.TimeUnit; /** * @author sean * @version 1.1 * @date 2022/6/9 */ @Service @Slf4j public class FlightTaskServiceImpl implements IFlightTaskService { @Autowired private IMessageSenderService messageSender; @Autowired private ObjectMapper mapper; @Autowired private ISendMessageService websocketMessageService; @Autowired private IWaylineJobService waylineJobService; @Autowired private IDeviceRedisService deviceRedisService; @Autowired private IWaylineRedisService waylineRedisService; /** * Handle the progress messages of the flight tasks reported by the dock. * 处理机场上报的飞行任务进度信息 * @param receiver * @param headers */ @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS) public CommonTopicReceiver handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) { log.info("上报航线任务进度: {}", receiver.toString()); EventsReceiver eventsReceiver = mapper.convertValue(receiver.getData(), new TypeReference>(){}); eventsReceiver.setBid(receiver.getBid()); eventsReceiver.setSn(receiver.getGateway()); //用户记录飞行任务id RedisOpsUtils.set(RedisConst.FLIGHT_LOG + "job_id",receiver.getBid()); WaylineTaskProgressReceiver output = eventsReceiver.getOutput(); log.info("任务进度: {}", output.getProgress().toString()); if (null != output.getExt().getBreakPoint()) { log.info("任务进度 ===> 断点信息:{}", output.getExt().getBreakPoint().toString()); } if (eventsReceiver.getResult() != ResponseResult.CODE_SUCCESS) { log.error("任务进度 ===> 错误编码: " + eventsReceiver.getResult()); } EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus()); waylineRedisService.setRunningWaylineJob(receiver.getGateway(), eventsReceiver); Optional deviceOpt = deviceRedisService.getDeviceOnline(receiver.getGateway()); if (deviceOpt.isEmpty()) { return null; } if (statusEnum.getEnd()) { handleEndStatus(receiver, statusEnum, output.getExt().getMediaCount(), eventsReceiver.getResult(), deviceOpt.get()); } websocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(), BizCodeEnum.FLIGHT_TASK_PROGRESS.getCode(), eventsReceiver); return receiver; } private void handleEndStatus(CommonTopicReceiver receiver, EventsResultStatusEnum statusEnum, int mediaCount, int code, DeviceDTO dock) { WaylineJobDTO job = WaylineJobDTO.builder() .jobId(receiver.getBid()) .status(WaylineJobStatusEnum.SUCCESS.getVal()) .completedTime(LocalDateTime.now()) .mediaCount(mediaCount) .build(); //记录媒体计数的更新 if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) { RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(), MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build()); } if (EventsResultStatusEnum.OK != statusEnum) { job.setCode(code); job.setStatus(WaylineJobStatusEnum.FAILED.getVal()); } waylineRedisService.getConditionalWaylineJob(receiver.getBid()).ifPresent(waylineJob -> retryPrepareConditionJob(new WaylineJobKey(dock.getWorkspaceId(), dock.getDeviceSn(), receiver.getBid()), waylineJob)); waylineJobService.updateJob(job); waylineRedisService.delRunningWaylineJob(receiver.getGateway()); waylineRedisService.delPausedWaylineJob(receiver.getBid()); waylineRedisService.delBlockedWaylineJobId(receiver.getGateway()); } /** * Notifications will be received through this interface when tasks are ready on the device. * @param receiver * @param headers */ @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_READY, outputChannel = ChannelName.OUTBOUND_EVENTS) public CommonTopicReceiver handleTaskNotifications(CommonTopicReceiver receiver, MessageHeaders headers) { String dockSn = receiver.getGateway(); Set flightIds = mapper.convertValue(receiver.getData(), new TypeReference>>(){}).get(MapKeyConst.FLIGHT_IDS); log.info("ready task list:{}", Arrays.toString(flightIds.toArray())); // Check conditional task blocking status. String blockedId = waylineRedisService.getBlockedWaylineJobId(dockSn); if (StringUtils.hasText(blockedId)) { log.info("The dock is in a state of wayline congestion, and the task will not be executed."); return null; } Optional deviceOpt = deviceRedisService.getDeviceOnline(dockSn); if (deviceOpt.isEmpty()) { log.info("The dock is offline."); return null; } DeviceDTO device = deviceOpt.get(); Optional jobOpt = waylineJobService.getJobsByConditions(device.getWorkspaceId(), flightIds, WaylineJobStatusEnum.PENDING) .stream().filter(job -> flightIds.contains(job.getJobId())) .sorted(Comparator.comparingInt(a -> a.getTaskType().getVal())) .min(Comparator.comparing(WaylineJobDTO::getBeginTime)); if (jobOpt.isEmpty()) { return receiver; } executeReadyTask(jobOpt.get()); return receiver; } private void executeReadyTask(WaylineJobDTO waylineJob) { try { boolean isExecute = waylineJobService.executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId()); if (isExecute || WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) { return; } Optional waylineJobOpt = waylineRedisService.getConditionalWaylineJob(waylineJob.getJobId()); if (waylineJobOpt.isEmpty()) { log.info("The conditional job has expired and will no longer be executed."); return; } waylineJob = waylineJobOpt.get(); this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob); } catch (Exception e) { log.error("Failed to execute task. ID: {}, Name:{}", waylineJob.getJobId(), waylineJob.getJobName()); this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob); e.printStackTrace(); } } @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) private void prepareWaylineJob() { Optional jobKeyOpt = waylineRedisService.getNearestPreparedWaylineJob(); if (jobKeyOpt.isEmpty()) { return; } // format: {workspace_id}:{dock_sn}:{job_id} WaylineJobKey jobKey = jobKeyOpt.get(); log.info("Check the prepared tasks of the wayline. {}", jobKey.toString()); WaylineJobDTO job = WaylineJobDTO.builder() .jobId(jobKey.getJobId()) .status(WaylineJobStatusEnum.FAILED.getVal()) .executeTime(LocalDateTime.now()) .completedTime(LocalDateTime.now()) .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build(); Optional waylineJobOpt = getPreparedJob(jobKey, job); if (waylineJobOpt.isEmpty()) { return; } WaylineJobDTO waylineJob = waylineJobOpt.get(); try { ResponseResult result = waylineJobService.publishOneFlightTask(waylineJob); if (ResponseResult.CODE_SUCCESS == result.getCode()) { return; } log.info("Failed to prepare the task. {}", result.getMessage()); job.setCode(result.getCode()); waylineJobService.updateJob(job); // Retry if the end time has not been exceeded. this.retryPrepareConditionJob(jobKey, waylineJob); } catch (Exception e) { log.info("Failed to prepare the task. {}", e.getLocalizedMessage()); waylineJobService.updateJob(job); this.retryPrepareConditionJob(jobKey, waylineJob); } } private boolean checkTime(long time) { // prepare the task one day in advance. int offset = 86_400_000; return System.currentTimeMillis() + offset >= time; } private Optional getPreparedJob(WaylineJobKey jobKey, WaylineJobDTO job) { long time = waylineRedisService.getPreparedWaylineJobTime(jobKey).longValue(); if (!checkTime(time)) { return Optional.empty(); } Optional waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId()); // Determine whether the conditional task or the scheduled task has expired. if (waylineJobOpt.isEmpty()) { waylineJobOpt = waylineJobService.getJobByJobId(jobKey.getWorkspaceId(), jobKey.getJobId()); if (waylineJobOpt.isEmpty() || waylineJobOpt.get().getEndTime().isBefore(LocalDateTime.now())) { job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode()); waylineJobService.updateJob(job); return Optional.empty(); } } waylineRedisService.removePreparedWaylineJob(jobKey); return waylineJobOpt; } private void retryPrepareConditionJob(WaylineJobKey jobKey, WaylineJobDTO waylineJob) { if (WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) { return; } // If the end time is exceeded, no more retries will be made. waylineRedisService.delConditionalWaylineJob(jobKey.getJobId()); if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() < System.currentTimeMillis()) { return; } Optional childJobOpt = waylineJobService.createWaylineJobByParent(jobKey.getWorkspaceId(), jobKey.getJobId()); if (childJobOpt.isEmpty()) { log.error("Failed to create wayline job."); return; } WaylineJobDTO newJob = childJobOpt.get(); newJob.setBeginTime(LocalDateTime.now().plusSeconds(RedisConst.WAYLINE_JOB_BLOCK_TIME)); boolean isAdd = waylineRedisService.addPreparedWaylineJob(newJob); if (!isAdd) { log.error("Failed to create wayline job. {}", newJob.getJobId()); return; } waylineJob.setJobId(newJob.getJobId()); waylineRedisService.setConditionalWaylineJob(waylineJob); } }