From 2db1aa88e8ab53096a936163d686b90d8e056a99 Mon Sep 17 00:00:00 2001
From: rain <167982779@qq.com>
Date: Wed, 21 Aug 2024 23:18:33 +0800
Subject: [PATCH] 国土对接返回信息加密

---
 src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java |  300 +++++++++++++++++++++++++++++++++---------------------------
 1 files changed, 165 insertions(+), 135 deletions(-)

diff --git a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
index 9418b94..ab4c9fd 100644
--- a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
+++ b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
@@ -12,11 +12,15 @@
 import com.dji.sample.manage.model.enums.UserTypeEnum;
 import com.dji.sample.manage.service.IDeviceRedisService;
 import com.dji.sample.media.model.MediaFileCountDTO;
-import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey;
 import com.dji.sample.wayline.model.dto.WaylineJobDTO;
+import com.dji.sample.wayline.model.dto.WaylineJobKey;
+import com.dji.sample.wayline.model.dto.WaylineTaskProgressExtBreakPoint;
 import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver;
+import com.dji.sample.wayline.model.entity.WaylineJobBreakPointEntity;
 import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
+import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum;
 import com.dji.sample.wayline.service.IFlightTaskService;
+import com.dji.sample.wayline.service.IWaylineJobBreakPointService;
 import com.dji.sample.wayline.service.IWaylineJobService;
 import com.dji.sample.wayline.service.IWaylineRedisService;
 import com.fasterxml.jackson.core.type.TypeReference;
@@ -25,7 +29,6 @@
 import org.apache.http.HttpStatus;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.mqtt.support.MqttHeaders;
 import org.springframework.messaging.MessageHeaders;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
@@ -63,64 +66,107 @@
     @Autowired
     private IWaylineRedisService waylineRedisService;
 
+    @Autowired
+    private IWaylineJobBreakPointService waylineJobBreakPointService;
+
     /**
      * Handle the progress messages of the flight tasks reported by the dock.
+     * 处理机场上报的飞行任务进度信息
      * @param receiver
      * @param headers
      */
     @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS)
     public CommonTopicReceiver handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
-        String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
-        String dockSn  = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
-                receivedTopic.indexOf(TopicConst.EVENTS_SUF));
-        EventsReceiver<WaylineTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
+        log.info("上报航线任务进度: {}", receiver.toString());
+                        EventsReceiver<WaylineTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
                 new TypeReference<EventsReceiver<WaylineTaskProgressReceiver>>(){});
         eventsReceiver.setBid(receiver.getBid());
         eventsReceiver.setSn(receiver.getGateway());
 
+        //用户记录飞行任务id
+        RedisOpsUtils.set(RedisConst.FLIGHT_LOG + "job_id",receiver.getBid());
+
         WaylineTaskProgressReceiver output = eventsReceiver.getOutput();
 
-        log.info("Task progress: {}", output.getProgress().toString());
+        log.info("任务进度: {}", output.getProgress().toString());
+
+        if (null != output.getExt().getBreakPoint()) {
+            WaylineTaskProgressExtBreakPoint breakPoint = output.getExt().getBreakPoint();
+            log.info("任务进度 ===> 断点信息:{}", breakPoint.toString());
+            // 保存断点信息
+            try {
+                Boolean isAddBp = waylineJobBreakPointService.addWaylineJobBreakPoint(WaylineJobBreakPointEntity.builder()
+                        .jobId(receiver.getBid())
+                        .bpIndex(breakPoint.getIndex())
+                        .state(breakPoint.getState())
+                        .progress(breakPoint.getProgress())
+                        .waylineId(breakPoint.getWaylineId())
+                        .breakReason(breakPoint.getBreakReason())
+                        .latitude(breakPoint.getLatitude())
+                        .longitude(breakPoint.getLongitude())
+                        .height(breakPoint.getHeight())
+                        .attitudeHead(breakPoint.getAttitudeHead())
+                        .build());
+                if (isAddBp) {
+                    log.info("任务进度 ===> 断点信息 ===> 保存成功:{}", breakPoint.toString());
+                } else {
+                    log.info("任务进度 ===> 断点信息 ===> 保存失败:{}", breakPoint.toString());
+                }
+            }catch (Exception e) {
+                log.info("任务进度 ===> 断点信息 ===> 保存失败:{},\n {}", breakPoint.toString(),e.getMessage());
+            }
+
+        }
 
         if (eventsReceiver.getResult() != ResponseResult.CODE_SUCCESS) {
-            log.error("Task progress ===> Error code: " + eventsReceiver.getResult());
+            log.error("任务进度 ===> 错误编码: " + eventsReceiver.getResult());
         }
 
         EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
-        waylineRedisService.setRunningWaylineJob(dockSn, eventsReceiver);
-
-        if (statusEnum.getEnd()) {
-            WaylineJobDTO job = WaylineJobDTO.builder()
-                    .jobId(receiver.getBid())
-                    .status(WaylineJobStatusEnum.SUCCESS.getVal())
-                    .completedTime(LocalDateTime.now())
-                    .mediaCount(output.getExt().getMediaCount())
-                    .build();
-
-            // record the update of the media count.
-            if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) {
-                RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(),
-                        MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build());
-            }
-
-            if (EventsResultStatusEnum.OK != statusEnum) {
-                job.setCode(eventsReceiver.getResult());
-                job.setStatus(WaylineJobStatusEnum.FAILED.getVal());
-            }
-
-            waylineJobService.updateJob(job);
-            waylineRedisService.delRunningWaylineJob(dockSn);
-            waylineRedisService.delPausedWaylineJob(receiver.getBid());
-        }
+        waylineRedisService.setRunningWaylineJob(receiver.getGateway(), eventsReceiver);
 
         Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(receiver.getGateway());
         if (deviceOpt.isEmpty()) {
             return null;
         }
+
+        if (statusEnum.getEnd()) {
+            handleEndStatus(receiver, statusEnum, output.getExt().getMediaCount(), eventsReceiver.getResult(), deviceOpt.get());
+        }
+
         websocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(),
                         BizCodeEnum.FLIGHT_TASK_PROGRESS.getCode(), eventsReceiver);
 
         return receiver;
+    }
+
+    private void handleEndStatus(CommonTopicReceiver receiver, EventsResultStatusEnum statusEnum, int mediaCount, int code, DeviceDTO dock) {
+
+        WaylineJobDTO job = WaylineJobDTO.builder()
+                .jobId(receiver.getBid())
+                .status(WaylineJobStatusEnum.SUCCESS.getVal())
+                .completedTime(LocalDateTime.now())
+                .mediaCount(mediaCount)
+                .build();
+
+        //记录媒体计数的更新
+        if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) {
+            RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(),
+                    MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build());
+        }
+
+        if (EventsResultStatusEnum.OK != statusEnum) {
+            job.setCode(code);
+            job.setStatus(WaylineJobStatusEnum.FAILED.getVal());
+        }
+
+        waylineRedisService.getConditionalWaylineJob(receiver.getBid()).ifPresent(waylineJob ->
+                retryPrepareConditionJob(new WaylineJobKey(dock.getWorkspaceId(), dock.getDeviceSn(), receiver.getBid()), waylineJob));
+        waylineJobService.updateJob(job);
+        waylineRedisService.delRunningWaylineJob(receiver.getGateway());
+        waylineRedisService.delPausedWaylineJob(receiver.getBid());
+        waylineRedisService.delBlockedWaylineJobId(receiver.getGateway());
+
     }
 
     /**
@@ -130,106 +176,65 @@
      */
     @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_READY, outputChannel = ChannelName.OUTBOUND_EVENTS)
     public CommonTopicReceiver handleTaskNotifications(CommonTopicReceiver receiver, MessageHeaders headers) {
-        String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
-        String dockSn  = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
-                receivedTopic.indexOf(TopicConst.EVENTS_SUF));
-        List<String> flightIds = mapper.convertValue(receiver.getData(),
-                new TypeReference<Map<String, List<String>>>(){}).get(MapKeyConst.FLIGHT_IDS);
+        String dockSn  = receiver.getGateway();
+        Set<String> flightIds = mapper.convertValue(receiver.getData(),
+                new TypeReference<Map<String, Set<String>>>(){}).get(MapKeyConst.FLIGHT_IDS);
 
-        log.info("ready task list:{}", Arrays.toString(flightIds.toArray()) );
+        log.info("ready task list:{}", Arrays.toString(flightIds.toArray()));
         // Check conditional task blocking status.
         String blockedId = waylineRedisService.getBlockedWaylineJobId(dockSn);
-        if (!StringUtils.hasText(blockedId)) {
+        if (StringUtils.hasText(blockedId)) {
+            log.info("The dock is in a state of wayline congestion, and the task will not be executed.");
             return null;
         }
 
         Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(dockSn);
         if (deviceOpt.isEmpty()) {
+            log.info("The dock is offline.");
             return null;
         }
         DeviceDTO device = deviceOpt.get();
-
-        try {
-            for (String jobId : flightIds) {
-                boolean isExecute = waylineJobService.executeFlightTask(device.getWorkspaceId(), jobId);
-                if (!isExecute) {
-                    return null;
-                }
-                Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobId);
-                if (waylineJobOpt.isEmpty()) {
-                    log.info("The conditional job has expired and will no longer be executed.");
-                    return receiver;
-                }
-                WaylineJobDTO waylineJob = waylineJobOpt.get();
-                this.retryPrepareJob(new ConditionalWaylineJobKey(device.getWorkspaceId(), dockSn, jobId), waylineJob);
-                return receiver;
-            }
-        } catch (Exception e) {
-            log.error("Failed to execute conditional task.");
-            e.printStackTrace();
+        Optional<WaylineJobDTO> jobOpt = waylineJobService.getJobsByConditions(device.getWorkspaceId(), flightIds, WaylineJobStatusEnum.PENDING)
+                .stream().filter(job -> flightIds.contains(job.getJobId()))
+                .sorted(Comparator.comparingInt(a -> a.getTaskType().getVal()))
+                .min(Comparator.comparing(WaylineJobDTO::getBeginTime));
+        if (jobOpt.isEmpty()) {
+            return receiver;
         }
+        executeReadyTask(jobOpt.get());
+
         return receiver;
     }
 
-    @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
-    private void checkScheduledJob() {
-        Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE);
-        if (Objects.isNull(jobIdValue)) {
-            return;
-        }
-        log.info("Check the timed tasks of the wayline. {}", jobIdValue);
-        // format: {workspace_id}:{dock_sn}:{job_id}
-        String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER);
-        double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
-        long now = System.currentTimeMillis();
-        int offset = 30_000;
-
-        // Expired tasks are deleted directly.
-        if (time < now - offset) {
-            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
-            waylineJobService.updateJob(WaylineJobDTO.builder()
-                    .jobId(jobArr[2])
-                    .status(WaylineJobStatusEnum.FAILED.getVal())
-                    .executeTime(LocalDateTime.now())
-                    .completedTime(LocalDateTime.now())
-                    .code(HttpStatus.SC_REQUEST_TIMEOUT).build());
-            return;
-        }
-
-        if (now <= time && time <= now + offset) {
-            try {
-                waylineJobService.executeFlightTask(jobArr[0], jobArr[2]);
-            } catch (Exception e) {
-                log.info("The scheduled task delivery failed.");
-                waylineJobService.updateJob(WaylineJobDTO.builder()
-                        .jobId(jobArr[2])
-                        .status(WaylineJobStatusEnum.FAILED.getVal())
-                        .executeTime(LocalDateTime.now())
-                        .completedTime(LocalDateTime.now())
-                        .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build());
-            } finally {
-                RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
+    private void executeReadyTask(WaylineJobDTO waylineJob) {
+        try {
+            boolean isExecute = waylineJobService.executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId());
+            if (isExecute || WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) {
+                return;
             }
+            Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(waylineJob.getJobId());
+            if (waylineJobOpt.isEmpty()) {
+                log.info("The conditional job has expired and will no longer be executed.");
+                return;
+            }
+            waylineJob = waylineJobOpt.get();
+            this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob);
+        } catch (Exception e) {
+            log.error("Failed to execute task. ID: {}, Name:{}", waylineJob.getJobId(), waylineJob.getJobName());
+            this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob);
+            e.printStackTrace();
         }
     }
 
     @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
-    private void prepareConditionJob() {
-        Optional<ConditionalWaylineJobKey> jobKeyOpt = waylineRedisService.getNearestConditionalWaylineJob();
+    private void prepareWaylineJob() {
+        Optional<WaylineJobKey> jobKeyOpt = waylineRedisService.getNearestPreparedWaylineJob();
         if (jobKeyOpt.isEmpty()) {
             return;
         }
-        ConditionalWaylineJobKey jobKey = jobKeyOpt.get();
-        log.info("Check the conditional tasks of the wayline. {}", jobKey.toString());
         // format: {workspace_id}:{dock_sn}:{job_id}
-        double time = waylineRedisService.getConditionalWaylineJobTime(jobKey);
-        long now = System.currentTimeMillis();
-        // prepare the task one day in advance.
-        int offset = 86_400_000;
-
-        if (now + offset < time) {
-            return;
-        }
+        WaylineJobKey jobKey = jobKeyOpt.get();
+        log.info("Check the prepared tasks of the wayline. {}", jobKey.toString());
 
         WaylineJobDTO job = WaylineJobDTO.builder()
                 .jobId(jobKey.getJobId())
@@ -237,40 +242,65 @@
                 .executeTime(LocalDateTime.now())
                 .completedTime(LocalDateTime.now())
                 .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build();
+        Optional<WaylineJobDTO> waylineJobOpt = getPreparedJob(jobKey, job);
+        if (waylineJobOpt.isEmpty()) {
+            return;
+        }
+
+        WaylineJobDTO waylineJob = waylineJobOpt.get();
         try {
-            Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId());
-            if (waylineJobOpt.isEmpty()) {
-                job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode());
-                waylineJobService.updateJob(job);
-                waylineRedisService.removePrepareConditionalWaylineJob(jobKey);
-                return;
-            }
-            WaylineJobDTO waylineJob = waylineJobOpt.get();
-
             ResponseResult result = waylineJobService.publishOneFlightTask(waylineJob);
-            waylineRedisService.removePrepareConditionalWaylineJob(jobKey);
-
             if (ResponseResult.CODE_SUCCESS == result.getCode()) {
                 return;
             }
-
-            // If the end time is exceeded, no more retries will be made.
-            waylineRedisService.delConditionalWaylineJob(jobKey.getJobId());
-            if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - RedisConst.WAYLINE_JOB_BLOCK_TIME * 1000 < now) {
-                return;
-            }
-
-            // Retry if the end time has not been exceeded.
-            this.retryPrepareJob(jobKey, waylineJob);
-
-        } catch (Exception e) {
-            log.info("Failed to prepare the conditional task.");
+            log.info("Failed to prepare the task. {}", result.getMessage());
+            job.setCode(result.getCode());
             waylineJobService.updateJob(job);
+            // Retry if the end time has not been exceeded.
+            this.retryPrepareConditionJob(jobKey, waylineJob);
+        } catch (Exception e) {
+            log.info("Failed to prepare the task. {}", e.getLocalizedMessage());
+            waylineJobService.updateJob(job);
+            this.retryPrepareConditionJob(jobKey, waylineJob);
         }
-
     }
 
-    private void retryPrepareJob(ConditionalWaylineJobKey jobKey, WaylineJobDTO waylineJob) {
+    private boolean checkTime(long time) {
+        // prepare the task one day in advance.
+        int offset = 86_400_000;
+        return System.currentTimeMillis() + offset >= time;
+    }
+
+    private Optional<WaylineJobDTO> getPreparedJob(WaylineJobKey jobKey, WaylineJobDTO job) {
+        long time = waylineRedisService.getPreparedWaylineJobTime(jobKey).longValue();
+        if (!checkTime(time)) {
+            return Optional.empty();
+        }
+
+        Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId());
+        // Determine whether the conditional task or the scheduled task has expired.
+        if (waylineJobOpt.isEmpty()) {
+            waylineJobOpt = waylineJobService.getJobByJobId(jobKey.getWorkspaceId(), jobKey.getJobId());
+            if (waylineJobOpt.isEmpty() || waylineJobOpt.get().getEndTime().isBefore(LocalDateTime.now())) {
+                job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode());
+                waylineJobService.updateJob(job);
+                return Optional.empty();
+            }
+        }
+        waylineRedisService.removePreparedWaylineJob(jobKey);
+        return waylineJobOpt;
+    }
+
+    private void retryPrepareConditionJob(WaylineJobKey jobKey, WaylineJobDTO waylineJob) {
+        if (WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) {
+            return;
+        }
+        // If the end time is exceeded, no more retries will be made.
+        waylineRedisService.delConditionalWaylineJob(jobKey.getJobId());
+        if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() < System.currentTimeMillis()) {
+            return;
+        }
+
         Optional<WaylineJobDTO> childJobOpt = waylineJobService.createWaylineJobByParent(jobKey.getWorkspaceId(), jobKey.getJobId());
         if (childJobOpt.isEmpty()) {
             log.error("Failed to create wayline job.");
@@ -279,7 +309,7 @@
 
         WaylineJobDTO newJob = childJobOpt.get();
         newJob.setBeginTime(LocalDateTime.now().plusSeconds(RedisConst.WAYLINE_JOB_BLOCK_TIME));
-        boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(newJob);
+        boolean isAdd = waylineRedisService.addPreparedWaylineJob(newJob);
         if (!isAdd) {
             log.error("Failed to create wayline job. {}", newJob.getJobId());
             return;

--
Gitblit v1.9.3