From 2d8ded3e77b22e44985265ca4063102662e452c1 Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Mon, 12 Dec 2022 18:32:19 +0800
Subject: [PATCH] initial v1.3.1

---
 src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java |   87 +++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 82 insertions(+), 5 deletions(-)

diff --git a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
index 46774bb..77ca1cf 100644
--- a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
+++ b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
@@ -11,16 +11,26 @@
 import com.dji.sample.component.websocket.service.IWebSocketManageService;
 import com.dji.sample.manage.model.dto.DeviceDTO;
 import com.dji.sample.manage.model.enums.UserTypeEnum;
+import com.dji.sample.media.model.MediaFileCountDTO;
 import com.dji.sample.wayline.model.dto.FlightTaskProgressReceiver;
+import com.dji.sample.wayline.model.dto.WaylineJobDTO;
+import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
 import com.dji.sample.wayline.service.IFlightTaskService;
+import com.dji.sample.wayline.service.IWaylineJobService;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpStatus;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.integration.annotation.ServiceActivator;
 import org.springframework.integration.mqtt.support.MqttHeaders;
 import org.springframework.messaging.MessageHeaders;
+import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Service;
+
+import java.time.LocalDateTime;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
 
 /**
  * @author sean
@@ -44,7 +54,7 @@
     private IWebSocketManageService webSocketManageService;
 
     @Autowired
-    private RedisOpsUtils redisOps;
+    private IWaylineJobService waylineJobService;
 
     @Override
     @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND)
@@ -52,14 +62,42 @@
         EventsReceiver<FlightTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
                 new TypeReference<EventsReceiver<FlightTaskProgressReceiver>>(){});
         eventsReceiver.setBid(receiver.getBid());
+        eventsReceiver.setSn(receiver.getGateway());
 
-        log.info("Task progress: " + eventsReceiver.getOutput().getProgress().toString());
+        FlightTaskProgressReceiver output = eventsReceiver.getOutput();
+
+        log.info("Task progress: {}", output.getProgress().toString());
 
         if (eventsReceiver.getResult() != ResponseResult.CODE_SUCCESS) {
-            log.error("Error code: " + eventsReceiver.getResult());
+            log.error("Task progress ===> Error code: " + eventsReceiver.getResult());
         }
 
-        DeviceDTO device = (DeviceDTO) redisOps.get(RedisConst.DEVICE_ONLINE_PREFIX + receiver.getGateway());
+        EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
+        if (statusEnum.getEnd()) {
+            WaylineJobDTO job = WaylineJobDTO.builder()
+                    .jobId(receiver.getBid())
+                    .status(WaylineJobStatusEnum.SUCCESS.getVal())
+                    .endTime(LocalDateTime.now())
+                    .mediaCount(output.getExt().getMediaCount())
+                    .build();
+
+            // record the update of the media count.
+            if (Objects.nonNull(job.getMediaCount())) {
+                RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(),
+                        MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build());
+            }
+
+            if (EventsResultStatusEnum.OK != statusEnum) {
+                job.setCode(eventsReceiver.getResult());
+                job.setStatus(WaylineJobStatusEnum.FAILED.getVal());
+            }
+
+            waylineJobService.updateJob(job);
+            RedisOpsUtils.del(receiver.getBid());
+        }
+        RedisOpsUtils.setWithExpire(receiver.getBid(), eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
+
+        DeviceDTO device = (DeviceDTO) RedisOpsUtils.get(RedisConst.DEVICE_ONLINE_PREFIX + receiver.getGateway());
         websocketMessageService.sendBatch(
                 webSocketManageService.getValueWithWorkspaceAndUserType(
                         device.getWorkspaceId(), UserTypeEnum.WEB.getVal()),
@@ -77,8 +115,47 @@
                             .bid(receiver.getBid())
                             .method(EventsMethodEnum.FLIGHT_TASK_PROGRESS.getMethod())
                             .timestamp(System.currentTimeMillis())
-                            .data(ResponseResult.success())
+                            .data(RequestsReply.success())
                             .build());
         }
     }
+
+    @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
+    private void checkScheduledJob() {
+        Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB);
+        log.info("Check the timed jobs of the wayline. {}", jobIdValue);
+        if (Objects.isNull(jobIdValue)) {
+            return;
+        }
+        String jobId = String.valueOf(jobIdValue);
+        double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB, jobIdValue);
+        long now = System.currentTimeMillis();
+        int offset = 30_000;
+
+        // Expired tasks are deleted directly.
+        if (time < now - offset) {
+            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB, jobId);
+            waylineJobService.updateJob(WaylineJobDTO.builder()
+                    .jobId(jobId)
+                    .status(WaylineJobStatusEnum.FAILED.getVal())
+                    .endTime(LocalDateTime.now())
+                    .code(HttpStatus.SC_REQUEST_TIMEOUT).build());
+            return;
+        }
+
+        if (now <= time && time <= now + offset) {
+            try {
+                waylineJobService.executeFlightTask(jobId);
+            } catch (Exception e) {
+                log.info("The scheduled task delivery failed.");
+                waylineJobService.updateJob(WaylineJobDTO.builder()
+                        .jobId(jobId)
+                        .status(WaylineJobStatusEnum.FAILED.getVal())
+                        .endTime(LocalDateTime.now())
+                        .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build());
+            } finally {
+                RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB, jobId);
+            }
+        }
+    }
 }

--
Gitblit v1.9.3