From a7aaeabc7873a0eafb4a7ecad7f65b018b7a9bc9 Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Fri, 24 Feb 2023 19:31:23 +0800
Subject: [PATCH] What's new? 1. Add license for dock. 2. Modify the logic corresponding to the firmware file and device type. 3. Add multiple mqtt clients options. 4. Modify the structure of the interface for obtaining the device list. 5. Fixed some issues.

---
 src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java |   52 ++++++++++++++++++++++++++++++----------------------
 1 files changed, 30 insertions(+), 22 deletions(-)

diff --git a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
index 77ca1cf..224602d 100644
--- a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
+++ b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
@@ -12,8 +12,8 @@
 import com.dji.sample.manage.model.dto.DeviceDTO;
 import com.dji.sample.manage.model.enums.UserTypeEnum;
 import com.dji.sample.media.model.MediaFileCountDTO;
-import com.dji.sample.wayline.model.dto.FlightTaskProgressReceiver;
 import com.dji.sample.wayline.model.dto.WaylineJobDTO;
+import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver;
 import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
 import com.dji.sample.wayline.service.IFlightTaskService;
 import com.dji.sample.wayline.service.IWaylineJobService;
@@ -29,7 +29,7 @@
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
-import java.util.Objects;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -59,12 +59,15 @@
     @Override
     @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND)
     public void handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
-        EventsReceiver<FlightTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
-                new TypeReference<EventsReceiver<FlightTaskProgressReceiver>>(){});
+        String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
+        String dockSn  = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
+                receivedTopic.indexOf(TopicConst.EVENTS_SUF));
+        EventsReceiver<WaylineTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
+                new TypeReference<EventsReceiver<WaylineTaskProgressReceiver>>(){});
         eventsReceiver.setBid(receiver.getBid());
         eventsReceiver.setSn(receiver.getGateway());
 
-        FlightTaskProgressReceiver output = eventsReceiver.getOutput();
+        WaylineTaskProgressReceiver output = eventsReceiver.getOutput();
 
         log.info("Task progress: {}", output.getProgress().toString());
 
@@ -73,16 +76,19 @@
         }
 
         EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
+        String key = RedisConst.WAYLINE_JOB_RUNNING_PREFIX + dockSn;
+        RedisOpsUtils.setWithExpire(key, eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
+
         if (statusEnum.getEnd()) {
             WaylineJobDTO job = WaylineJobDTO.builder()
                     .jobId(receiver.getBid())
                     .status(WaylineJobStatusEnum.SUCCESS.getVal())
-                    .endTime(LocalDateTime.now())
+                    .completedTime(LocalDateTime.now())
                     .mediaCount(output.getExt().getMediaCount())
                     .build();
 
             // record the update of the media count.
-            if (Objects.nonNull(job.getMediaCount())) {
+            if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) {
                 RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(),
                         MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build());
             }
@@ -93,9 +99,9 @@
             }
 
             waylineJobService.updateJob(job);
-            RedisOpsUtils.del(receiver.getBid());
+            RedisOpsUtils.del(key);
+            RedisOpsUtils.del(RedisConst.WAYLINE_JOB_PAUSED_PREFIX + receiver.getBid());
         }
-        RedisOpsUtils.setWithExpire(receiver.getBid(), eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
 
         DeviceDTO device = (DeviceDTO) RedisOpsUtils.get(RedisConst.DEVICE_ONLINE_PREFIX + receiver.getGateway());
         websocketMessageService.sendBatch(
@@ -108,8 +114,7 @@
                         .build());
 
         if (receiver.getNeedReply() == 1) {
-            String topic = headers.get(MqttHeaders.RECEIVED_TOPIC) + TopicConst._REPLY_SUF;
-            messageSender.publish(topic,
+            messageSender.publish(receivedTopic + TopicConst._REPLY_SUF,
                     CommonTopicResponse.builder()
                             .tid(receiver.getTid())
                             .bid(receiver.getBid())
@@ -122,39 +127,42 @@
 
     @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
     private void checkScheduledJob() {
-        Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB);
-        log.info("Check the timed jobs of the wayline. {}", jobIdValue);
+        Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE);
         if (Objects.isNull(jobIdValue)) {
             return;
         }
-        String jobId = String.valueOf(jobIdValue);
-        double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB, jobIdValue);
+        log.info("Check the timed tasks of the wayline. {}", jobIdValue);
+        // format: {workspace_id}:{dock_sn}:{job_id}
+        String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER);
+        double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
         long now = System.currentTimeMillis();
         int offset = 30_000;
 
         // Expired tasks are deleted directly.
         if (time < now - offset) {
-            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB, jobId);
+            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
             waylineJobService.updateJob(WaylineJobDTO.builder()
-                    .jobId(jobId)
+                    .jobId(jobArr[2])
                     .status(WaylineJobStatusEnum.FAILED.getVal())
-                    .endTime(LocalDateTime.now())
+                    .executeTime(LocalDateTime.now())
+                    .completedTime(LocalDateTime.now())
                     .code(HttpStatus.SC_REQUEST_TIMEOUT).build());
             return;
         }
 
         if (now <= time && time <= now + offset) {
             try {
-                waylineJobService.executeFlightTask(jobId);
+                waylineJobService.executeFlightTask(jobArr[0], jobArr[2]);
             } catch (Exception e) {
                 log.info("The scheduled task delivery failed.");
                 waylineJobService.updateJob(WaylineJobDTO.builder()
-                        .jobId(jobId)
+                        .jobId(jobArr[2])
                         .status(WaylineJobStatusEnum.FAILED.getVal())
-                        .endTime(LocalDateTime.now())
+                        .executeTime(LocalDateTime.now())
+                        .completedTime(LocalDateTime.now())
                         .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build());
             } finally {
-                RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB, jobId);
+                RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
             }
         }
     }

--
Gitblit v1.9.3