| | |
| | | import com.dji.sample.manage.model.enums.UserTypeEnum; |
| | | import com.dji.sample.manage.service.IDeviceRedisService; |
| | | import com.dji.sample.media.model.MediaFileCountDTO; |
| | | import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey; |
| | | import com.dji.sample.wayline.model.dto.WaylineJobDTO; |
| | | import com.dji.sample.wayline.model.dto.WaylineJobKey; |
| | | import com.dji.sample.wayline.model.dto.WaylineTaskProgressExtBreakPoint; |
| | | import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver; |
| | | import com.dji.sample.wayline.model.entity.WaylineJobBreakPointEntity; |
| | | import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum; |
| | | import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum; |
| | | import com.dji.sample.wayline.service.IFlightTaskService; |
| | | import com.dji.sample.wayline.service.IWaylineJobBreakPointService; |
| | | import com.dji.sample.wayline.service.IWaylineJobService; |
| | | import com.dji.sample.wayline.service.IWaylineRedisService; |
| | | import com.fasterxml.jackson.core.type.TypeReference; |
| | |
| | | import org.apache.http.HttpStatus; |
| | | import org.springframework.beans.factory.annotation.Autowired; |
| | | import org.springframework.integration.annotation.ServiceActivator; |
| | | import org.springframework.integration.mqtt.support.MqttHeaders; |
| | | import org.springframework.messaging.MessageHeaders; |
| | | import org.springframework.scheduling.annotation.Scheduled; |
| | | import org.springframework.stereotype.Service; |
| | |
| | | @Autowired |
| | | private IWaylineRedisService waylineRedisService; |
| | | |
| | | @Autowired |
| | | private IWaylineJobBreakPointService waylineJobBreakPointService; |
| | | |
| | | /** |
| | | * Handle the progress messages of the flight tasks reported by the dock. |
| | | * 处理机场上报的飞行任务进度信息 |
| | | * @param receiver |
| | | * @param headers |
| | | */ |
| | | @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS) |
| | | public CommonTopicReceiver handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) { |
| | | String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); |
| | | String dockSn = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(), |
| | | receivedTopic.indexOf(TopicConst.EVENTS_SUF)); |
| | | log.info("上报航线任务进度: {}", receiver.toString()); |
| | | EventsReceiver<WaylineTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(), |
| | | new TypeReference<EventsReceiver<WaylineTaskProgressReceiver>>(){}); |
| | | eventsReceiver.setBid(receiver.getBid()); |
| | | eventsReceiver.setSn(receiver.getGateway()); |
| | | |
| | | //用户记录飞行任务id |
| | | RedisOpsUtils.set(RedisConst.FLIGHT_LOG + "job_id",receiver.getBid()); |
| | | |
| | | WaylineTaskProgressReceiver output = eventsReceiver.getOutput(); |
| | | |
| | | log.info("Task progress: {}", output.getProgress().toString()); |
| | | log.info("任务进度: {}", output.getProgress().toString()); |
| | | |
| | | if (null != output.getExt().getBreakPoint()) { |
| | | WaylineTaskProgressExtBreakPoint breakPoint = output.getExt().getBreakPoint(); |
| | | log.info("任务进度 ===> 断点信息:{}", breakPoint.toString()); |
| | | // 保存断点信息 |
| | | try { |
| | | Boolean isAddBp = waylineJobBreakPointService.addWaylineJobBreakPoint(WaylineJobBreakPointEntity.builder() |
| | | .jobId(receiver.getBid()) |
| | | .bpIndex(breakPoint.getIndex()) |
| | | .state(breakPoint.getState()) |
| | | .progress(breakPoint.getProgress()) |
| | | .waylineId(breakPoint.getWaylineId()) |
| | | .breakReason(breakPoint.getBreakReason()) |
| | | .latitude(breakPoint.getLatitude()) |
| | | .longitude(breakPoint.getLongitude()) |
| | | .height(breakPoint.getHeight()) |
| | | .attitudeHead(breakPoint.getAttitudeHead()) |
| | | .build()); |
| | | if (isAddBp) { |
| | | log.info("任务进度 ===> 断点信息 ===> 保存成功:{}", breakPoint.toString()); |
| | | } else { |
| | | log.info("任务进度 ===> 断点信息 ===> 保存失败:{}", breakPoint.toString()); |
| | | } |
| | | }catch (Exception e) { |
| | | log.info("任务进度 ===> 断点信息 ===> 保存失败:{},\n {}", breakPoint.toString(),e.getMessage()); |
| | | } |
| | | |
| | | } |
| | | |
| | | if (eventsReceiver.getResult() != ResponseResult.CODE_SUCCESS) { |
| | | log.error("Task progress ===> Error code: " + eventsReceiver.getResult()); |
| | | log.error("任务进度 ===> 错误编码: " + eventsReceiver.getResult()); |
| | | } |
| | | |
| | | EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus()); |
| | | waylineRedisService.setRunningWaylineJob(dockSn, eventsReceiver); |
| | | |
| | | if (statusEnum.getEnd()) { |
| | | WaylineJobDTO job = WaylineJobDTO.builder() |
| | | .jobId(receiver.getBid()) |
| | | .status(WaylineJobStatusEnum.SUCCESS.getVal()) |
| | | .completedTime(LocalDateTime.now()) |
| | | .mediaCount(output.getExt().getMediaCount()) |
| | | .build(); |
| | | |
| | | // record the update of the media count. |
| | | if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) { |
| | | RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(), |
| | | MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build()); |
| | | } |
| | | |
| | | if (EventsResultStatusEnum.OK != statusEnum) { |
| | | job.setCode(eventsReceiver.getResult()); |
| | | job.setStatus(WaylineJobStatusEnum.FAILED.getVal()); |
| | | } |
| | | |
| | | waylineJobService.updateJob(job); |
| | | waylineRedisService.delRunningWaylineJob(dockSn); |
| | | waylineRedisService.delPausedWaylineJob(receiver.getBid()); |
| | | } |
| | | waylineRedisService.setRunningWaylineJob(receiver.getGateway(), eventsReceiver); |
| | | |
| | | Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(receiver.getGateway()); |
| | | if (deviceOpt.isEmpty()) { |
| | | return null; |
| | | } |
| | | |
| | | if (statusEnum.getEnd()) { |
| | | handleEndStatus(receiver, statusEnum, output.getExt().getMediaCount(), eventsReceiver.getResult(), deviceOpt.get()); |
| | | } |
| | | |
| | | websocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(), |
| | | BizCodeEnum.FLIGHT_TASK_PROGRESS.getCode(), eventsReceiver); |
| | | |
| | | return receiver; |
| | | } |
| | | |
| | | private void handleEndStatus(CommonTopicReceiver receiver, EventsResultStatusEnum statusEnum, int mediaCount, int code, DeviceDTO dock) { |
| | | |
| | | WaylineJobDTO job = WaylineJobDTO.builder() |
| | | .jobId(receiver.getBid()) |
| | | .status(WaylineJobStatusEnum.SUCCESS.getVal()) |
| | | .completedTime(LocalDateTime.now()) |
| | | .mediaCount(mediaCount) |
| | | .build(); |
| | | |
| | | //记录媒体计数的更新 |
| | | if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) { |
| | | RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(), |
| | | MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build()); |
| | | } |
| | | |
| | | if (EventsResultStatusEnum.OK != statusEnum) { |
| | | job.setCode(code); |
| | | job.setStatus(WaylineJobStatusEnum.FAILED.getVal()); |
| | | } |
| | | |
| | | waylineRedisService.getConditionalWaylineJob(receiver.getBid()).ifPresent(waylineJob -> |
| | | retryPrepareConditionJob(new WaylineJobKey(dock.getWorkspaceId(), dock.getDeviceSn(), receiver.getBid()), waylineJob)); |
| | | waylineJobService.updateJob(job); |
| | | waylineRedisService.delRunningWaylineJob(receiver.getGateway()); |
| | | waylineRedisService.delPausedWaylineJob(receiver.getBid()); |
| | | waylineRedisService.delBlockedWaylineJobId(receiver.getGateway()); |
| | | |
| | | } |
| | | |
| | | /** |
| | |
| | | */ |
| | | @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_READY, outputChannel = ChannelName.OUTBOUND_EVENTS) |
| | | public CommonTopicReceiver handleTaskNotifications(CommonTopicReceiver receiver, MessageHeaders headers) { |
| | | String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); |
| | | String dockSn = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(), |
| | | receivedTopic.indexOf(TopicConst.EVENTS_SUF)); |
| | | List<String> flightIds = mapper.convertValue(receiver.getData(), |
| | | new TypeReference<Map<String, List<String>>>(){}).get(MapKeyConst.FLIGHT_IDS); |
| | | String dockSn = receiver.getGateway(); |
| | | Set<String> flightIds = mapper.convertValue(receiver.getData(), |
| | | new TypeReference<Map<String, Set<String>>>(){}).get(MapKeyConst.FLIGHT_IDS); |
| | | |
| | | log.info("ready task list:{}", Arrays.toString(flightIds.toArray()) ); |
| | | log.info("ready task list:{}", Arrays.toString(flightIds.toArray())); |
| | | // Check conditional task blocking status. |
| | | String blockedId = waylineRedisService.getBlockedWaylineJobId(dockSn); |
| | | if (!StringUtils.hasText(blockedId)) { |
| | | if (StringUtils.hasText(blockedId)) { |
| | | log.info("The dock is in a state of wayline congestion, and the task will not be executed."); |
| | | return null; |
| | | } |
| | | |
| | | Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(dockSn); |
| | | if (deviceOpt.isEmpty()) { |
| | | log.info("The dock is offline."); |
| | | return null; |
| | | } |
| | | DeviceDTO device = deviceOpt.get(); |
| | | |
| | | try { |
| | | for (String jobId : flightIds) { |
| | | boolean isExecute = waylineJobService.executeFlightTask(device.getWorkspaceId(), jobId); |
| | | if (!isExecute) { |
| | | return null; |
| | | } |
| | | Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobId); |
| | | if (waylineJobOpt.isEmpty()) { |
| | | log.info("The conditional job has expired and will no longer be executed."); |
| | | return receiver; |
| | | } |
| | | WaylineJobDTO waylineJob = waylineJobOpt.get(); |
| | | this.retryPrepareJob(new ConditionalWaylineJobKey(device.getWorkspaceId(), dockSn, jobId), waylineJob); |
| | | return receiver; |
| | | } |
| | | } catch (Exception e) { |
| | | log.error("Failed to execute conditional task."); |
| | | e.printStackTrace(); |
| | | Optional<WaylineJobDTO> jobOpt = waylineJobService.getJobsByConditions(device.getWorkspaceId(), flightIds, WaylineJobStatusEnum.PENDING) |
| | | .stream().filter(job -> flightIds.contains(job.getJobId())) |
| | | .sorted(Comparator.comparingInt(a -> a.getTaskType().getVal())) |
| | | .min(Comparator.comparing(WaylineJobDTO::getBeginTime)); |
| | | if (jobOpt.isEmpty()) { |
| | | return receiver; |
| | | } |
| | | executeReadyTask(jobOpt.get()); |
| | | |
| | | return receiver; |
| | | } |
| | | |
| | | @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) |
| | | private void checkScheduledJob() { |
| | | Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE); |
| | | if (Objects.isNull(jobIdValue)) { |
| | | return; |
| | | } |
| | | log.info("Check the timed tasks of the wayline. {}", jobIdValue); |
| | | // format: {workspace_id}:{dock_sn}:{job_id} |
| | | String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER); |
| | | double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); |
| | | long now = System.currentTimeMillis(); |
| | | int offset = 30_000; |
| | | |
| | | // Expired tasks are deleted directly. |
| | | if (time < now - offset) { |
| | | RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); |
| | | waylineJobService.updateJob(WaylineJobDTO.builder() |
| | | .jobId(jobArr[2]) |
| | | .status(WaylineJobStatusEnum.FAILED.getVal()) |
| | | .executeTime(LocalDateTime.now()) |
| | | .completedTime(LocalDateTime.now()) |
| | | .code(HttpStatus.SC_REQUEST_TIMEOUT).build()); |
| | | return; |
| | | } |
| | | |
| | | if (now <= time && time <= now + offset) { |
| | | try { |
| | | waylineJobService.executeFlightTask(jobArr[0], jobArr[2]); |
| | | } catch (Exception e) { |
| | | log.info("The scheduled task delivery failed."); |
| | | waylineJobService.updateJob(WaylineJobDTO.builder() |
| | | .jobId(jobArr[2]) |
| | | .status(WaylineJobStatusEnum.FAILED.getVal()) |
| | | .executeTime(LocalDateTime.now()) |
| | | .completedTime(LocalDateTime.now()) |
| | | .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build()); |
| | | } finally { |
| | | RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); |
| | | private void executeReadyTask(WaylineJobDTO waylineJob) { |
| | | try { |
| | | boolean isExecute = waylineJobService.executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId()); |
| | | if (isExecute || WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) { |
| | | return; |
| | | } |
| | | Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(waylineJob.getJobId()); |
| | | if (waylineJobOpt.isEmpty()) { |
| | | log.info("The conditional job has expired and will no longer be executed."); |
| | | return; |
| | | } |
| | | waylineJob = waylineJobOpt.get(); |
| | | this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob); |
| | | } catch (Exception e) { |
| | | log.error("Failed to execute task. ID: {}, Name:{}", waylineJob.getJobId(), waylineJob.getJobName()); |
| | | this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob); |
| | | e.printStackTrace(); |
| | | } |
| | | } |
| | | |
| | | @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) |
| | | private void prepareConditionJob() { |
| | | Optional<ConditionalWaylineJobKey> jobKeyOpt = waylineRedisService.getNearestConditionalWaylineJob(); |
| | | private void prepareWaylineJob() { |
| | | Optional<WaylineJobKey> jobKeyOpt = waylineRedisService.getNearestPreparedWaylineJob(); |
| | | if (jobKeyOpt.isEmpty()) { |
| | | return; |
| | | } |
| | | ConditionalWaylineJobKey jobKey = jobKeyOpt.get(); |
| | | log.info("Check the conditional tasks of the wayline. {}", jobKey.toString()); |
| | | // format: {workspace_id}:{dock_sn}:{job_id} |
| | | double time = waylineRedisService.getConditionalWaylineJobTime(jobKey); |
| | | long now = System.currentTimeMillis(); |
| | | // prepare the task one day in advance. |
| | | int offset = 86_400_000; |
| | | |
| | | if (now + offset < time) { |
| | | return; |
| | | } |
| | | WaylineJobKey jobKey = jobKeyOpt.get(); |
| | | log.info("Check the prepared tasks of the wayline. {}", jobKey.toString()); |
| | | |
| | | WaylineJobDTO job = WaylineJobDTO.builder() |
| | | .jobId(jobKey.getJobId()) |
| | |
| | | .executeTime(LocalDateTime.now()) |
| | | .completedTime(LocalDateTime.now()) |
| | | .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build(); |
| | | Optional<WaylineJobDTO> waylineJobOpt = getPreparedJob(jobKey, job); |
| | | if (waylineJobOpt.isEmpty()) { |
| | | return; |
| | | } |
| | | |
| | | WaylineJobDTO waylineJob = waylineJobOpt.get(); |
| | | try { |
| | | Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId()); |
| | | if (waylineJobOpt.isEmpty()) { |
| | | job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode()); |
| | | waylineJobService.updateJob(job); |
| | | waylineRedisService.removePrepareConditionalWaylineJob(jobKey); |
| | | return; |
| | | } |
| | | WaylineJobDTO waylineJob = waylineJobOpt.get(); |
| | | |
| | | ResponseResult result = waylineJobService.publishOneFlightTask(waylineJob); |
| | | waylineRedisService.removePrepareConditionalWaylineJob(jobKey); |
| | | |
| | | if (ResponseResult.CODE_SUCCESS == result.getCode()) { |
| | | return; |
| | | } |
| | | |
| | | // If the end time is exceeded, no more retries will be made. |
| | | waylineRedisService.delConditionalWaylineJob(jobKey.getJobId()); |
| | | if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - RedisConst.WAYLINE_JOB_BLOCK_TIME * 1000 < now) { |
| | | return; |
| | | } |
| | | |
| | | // Retry if the end time has not been exceeded. |
| | | this.retryPrepareJob(jobKey, waylineJob); |
| | | |
| | | } catch (Exception e) { |
| | | log.info("Failed to prepare the conditional task."); |
| | | log.info("Failed to prepare the task. {}", result.getMessage()); |
| | | job.setCode(result.getCode()); |
| | | waylineJobService.updateJob(job); |
| | | // Retry if the end time has not been exceeded. |
| | | this.retryPrepareConditionJob(jobKey, waylineJob); |
| | | } catch (Exception e) { |
| | | log.info("Failed to prepare the task. {}", e.getLocalizedMessage()); |
| | | waylineJobService.updateJob(job); |
| | | this.retryPrepareConditionJob(jobKey, waylineJob); |
| | | } |
| | | |
| | | } |
| | | |
| | | private void retryPrepareJob(ConditionalWaylineJobKey jobKey, WaylineJobDTO waylineJob) { |
| | | private boolean checkTime(long time) { |
| | | // prepare the task one day in advance. |
| | | int offset = 86_400_000; |
| | | return System.currentTimeMillis() + offset >= time; |
| | | } |
| | | |
| | | private Optional<WaylineJobDTO> getPreparedJob(WaylineJobKey jobKey, WaylineJobDTO job) { |
| | | long time = waylineRedisService.getPreparedWaylineJobTime(jobKey).longValue(); |
| | | if (!checkTime(time)) { |
| | | return Optional.empty(); |
| | | } |
| | | |
| | | Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId()); |
| | | // Determine whether the conditional task or the scheduled task has expired. |
| | | if (waylineJobOpt.isEmpty()) { |
| | | waylineJobOpt = waylineJobService.getJobByJobId(jobKey.getWorkspaceId(), jobKey.getJobId()); |
| | | if (waylineJobOpt.isEmpty() || waylineJobOpt.get().getEndTime().isBefore(LocalDateTime.now())) { |
| | | job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode()); |
| | | waylineJobService.updateJob(job); |
| | | return Optional.empty(); |
| | | } |
| | | } |
| | | waylineRedisService.removePreparedWaylineJob(jobKey); |
| | | return waylineJobOpt; |
| | | } |
| | | |
| | | private void retryPrepareConditionJob(WaylineJobKey jobKey, WaylineJobDTO waylineJob) { |
| | | if (WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) { |
| | | return; |
| | | } |
| | | // If the end time is exceeded, no more retries will be made. |
| | | waylineRedisService.delConditionalWaylineJob(jobKey.getJobId()); |
| | | if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() < System.currentTimeMillis()) { |
| | | return; |
| | | } |
| | | |
| | | Optional<WaylineJobDTO> childJobOpt = waylineJobService.createWaylineJobByParent(jobKey.getWorkspaceId(), jobKey.getJobId()); |
| | | if (childJobOpt.isEmpty()) { |
| | | log.error("Failed to create wayline job."); |
| | |
| | | |
| | | WaylineJobDTO newJob = childJobOpt.get(); |
| | | newJob.setBeginTime(LocalDateTime.now().plusSeconds(RedisConst.WAYLINE_JOB_BLOCK_TIME)); |
| | | boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(newJob); |
| | | boolean isAdd = waylineRedisService.addPreparedWaylineJob(newJob); |
| | | if (!isAdd) { |
| | | log.error("Failed to create wayline job. {}", newJob.getJobId()); |
| | | return; |