From 2d8ded3e77b22e44985265ca4063102662e452c1 Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Mon, 12 Dec 2022 18:32:19 +0800
Subject: [PATCH] initial v1.3.1

---
 src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java |   73 ++++++++++++++++++++++++++++++++----
 1 files changed, 64 insertions(+), 9 deletions(-)

diff --git a/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java b/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java
index 5313863..067a3fc 100644
--- a/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java
+++ b/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java
@@ -14,6 +14,9 @@
 import com.dji.sample.component.redis.RedisOpsUtils;
 import com.dji.sample.manage.model.dto.DeviceDTO;
 import com.dji.sample.manage.service.IDeviceService;
+import com.dji.sample.media.model.MediaFileCountDTO;
+import com.dji.sample.media.model.MediaMethodEnum;
+import com.dji.sample.media.service.IFileService;
 import com.dji.sample.wayline.dao.IWaylineJobMapper;
 import com.dji.sample.wayline.model.dto.*;
 import com.dji.sample.wayline.model.entity.WaylineJobEntity;
@@ -66,11 +69,10 @@
     private IMessageSenderService messageSender;
 
     @Autowired
-    private RedisOpsUtils redisOps;
-
-    @Autowired
     private ObjectMapper objectMapper;
 
+    @Autowired
+    private IFileService fileService;
 
     @Override
     public Optional<WaylineJobDTO> createWaylineJob(CreateJobParam param, CustomClaim customClaim) {
@@ -94,6 +96,7 @@
                 .waylineType(param.getWaylineType())
                 .outOfControlAction(param.getOutOfControlAction())
                 .rthAltitude(param.getRthAltitude())
+                .mediaCount(0)
                 .build();
         int id = mapper.insert(jobEntity);
         if (id <= 0) {
@@ -167,7 +170,7 @@
         }
 
         if (WaylineTaskTypeEnum.TIMED.getVal() == waylineJob.getTaskType()) {
-            boolean isAdd = redisOps.zAdd(RedisConst.WAYLINE_JOB, waylineJob.getJobId(),
+            boolean isAdd = RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB, waylineJob.getJobId(),
                     waylineJob.getExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
             if (!isAdd) {
                 return ResponseResult.error("Failed to create scheduled job.");
@@ -218,7 +221,7 @@
                 .jobId(jobId)
                 .status(WaylineJobStatusEnum.IN_PROGRESS.getVal())
                 .build());
-        redisOps.setWithExpire(jobId,
+        RedisOpsUtils.setWithExpire(jobId,
                 EventsReceiver.<FlightTaskProgressReceiver>builder().bid(jobId).sn(job.getDockSn()).build(),
                 RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
         return true;
@@ -256,7 +259,7 @@
 
     private void publishCancelTask(String workspaceId, String dockSn, List<String> jobIds) {
         boolean isOnline = deviceService.checkDeviceOnline(dockSn);
-        if (isOnline) {
+        if (!isOnline) {
             throw new RuntimeException("Dock is offline.");
         }
         String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + dockSn + TopicConst.SERVICES_SUF;
@@ -282,7 +285,7 @@
                     .status(WaylineJobStatusEnum.CANCEL.getVal())
                     .endTime(LocalDateTime.now())
                     .build());
-            redisOps.zRemove(RedisConst.WAYLINE_JOB, jobId);
+            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB, jobId);
         }
 
     }
@@ -372,6 +375,33 @@
 
     }
 
+    @Override
+    public void uploadMediaHighestPriority(String workspaceId, String jobId) {
+        Optional<WaylineJobDTO> jobOpt = getJobByJobId(jobId);
+        if (jobOpt.isEmpty()) {
+            throw new RuntimeException(CommonErrorEnum.ILLEGAL_ARGUMENT.getErrorMsg());
+        }
+
+        String dockSn = jobOpt.get().getDockSn();
+        String key = RedisConst.MEDIA_HIGHEST_PRIORITY_PREFIX + dockSn;
+        if (RedisOpsUtils.checkExist(key) && jobId.equals(RedisOpsUtils.get(key).toString())) {
+            return;
+        }
+
+        ServiceReply reply = messageSender.publishWithReply(TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + dockSn + TopicConst.SERVICES_SUF,
+                CommonTopicResponse.builder()
+                        .tid(UUID.randomUUID().toString())
+                        .bid(UUID.randomUUID().toString())
+                        .timestamp(System.currentTimeMillis())
+                        .method(MediaMethodEnum.UPLOAD_FLIGHT_TASK_MEDIA_PRIORITIZE.getMethod())
+                        .data(Map.of(MapKeyConst.FLIGHT_ID, jobId))
+                        .build());
+        if (ResponseResult.CODE_SUCCESS != reply.getResult()) {
+            throw new RuntimeException("Failed to set media job upload priority. Error Code: " + reply.getResult());
+        }
+        RedisOpsUtils.setWithExpire(key, jobId, RedisConst.DEVICE_ALIVE_SECOND * 5);
+    }
+
     private WaylineJobEntity dto2Entity(WaylineJobDTO dto) {
         WaylineJobEntity.WaylineJobEntityBuilder builder = WaylineJobEntity.builder();
         if (dto == null) {
@@ -418,12 +448,37 @@
         if (Objects.nonNull(entity.getEndTime())) {
             builder.endTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getEndTime()), ZoneId.systemDefault()));
         }
-        if (WaylineJobStatusEnum.IN_PROGRESS.getVal() == entity.getStatus() && redisOps.getExpire(entity.getJobId()) > 0) {
-            EventsReceiver<FlightTaskProgressReceiver> taskProgress = (EventsReceiver<FlightTaskProgressReceiver>) redisOps.get(entity.getJobId());
+        if (WaylineJobStatusEnum.IN_PROGRESS.getVal() == entity.getStatus() && RedisOpsUtils.getExpire(entity.getJobId()) > 0) {
+            EventsReceiver<FlightTaskProgressReceiver> taskProgress = (EventsReceiver<FlightTaskProgressReceiver>) RedisOpsUtils.get(entity.getJobId());
             if (Objects.nonNull(taskProgress.getOutput()) && Objects.nonNull(taskProgress.getOutput().getProgress())) {
                 builder.progress(taskProgress.getOutput().getProgress().getPercent());
             }
         }
+
+        if (entity.getMediaCount() == 0) {
+            return builder.build();
+        }
+
+        // sync the number of media files
+        String key = RedisConst.MEDIA_HIGHEST_PRIORITY_PREFIX + entity.getDockSn();
+        String countKey = RedisConst.MEDIA_FILE_PREFIX + entity.getDockSn();
+        Object mediaFileCount = RedisOpsUtils.hashGet(countKey, entity.getJobId());
+        if (Objects.nonNull(mediaFileCount)) {
+            builder.uploadedCount(((MediaFileCountDTO) mediaFileCount).getUploadedCount())
+                    .uploading(RedisOpsUtils.checkExist(key) && entity.getJobId().equals(RedisOpsUtils.get(key)));
+            return builder.build();
+        }
+
+        int uploadedSize = fileService.getFilesByWorkspaceAndJobId(entity.getWorkspaceId(), entity.getJobId()).size();
+        // All media for this job have been uploaded.
+        if (uploadedSize >= entity.getMediaCount()) {
+            return builder.uploadedCount(uploadedSize).build();
+        }
+        RedisOpsUtils.hashSet(countKey, entity.getJobId(),
+                MediaFileCountDTO.builder()
+                        .jobId(entity.getJobId())
+                        .mediaCount(entity.getMediaCount())
+                        .uploadedCount(uploadedSize).build());
         return builder.build();
     }
 }

--
Gitblit v1.9.3