From a7aaeabc7873a0eafb4a7ecad7f65b018b7a9bc9 Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Fri, 24 Feb 2023 19:31:23 +0800
Subject: [PATCH] What's new? 1. Add license for dock. 2. Modify the logic corresponding to the firmware file and device type. 3. Add multiple mqtt clients options. 4. Modify the structure of the interface for obtaining the device list. 5. Fixed some issues.

---
 src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java |  203 ++++++++++++++++++++++++++++++++------------------
 1 files changed, 129 insertions(+), 74 deletions(-)

diff --git a/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java b/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java
index 067a3fc..acbeabe 100644
--- a/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java
+++ b/src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java
@@ -40,9 +40,7 @@
 
 import java.net.URL;
 import java.sql.SQLException;
-import java.time.Instant;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
+import java.time.*;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -74,30 +72,7 @@
     @Autowired
     private IFileService fileService;
 
-    @Override
-    public Optional<WaylineJobDTO> createWaylineJob(CreateJobParam param, CustomClaim customClaim) {
-        if (Objects.isNull(param)) {
-            return Optional.empty();
-        }
-        // Immediate tasks, allocating time on the backend.
-        if (Objects.isNull(param.getExecuteTime())) {
-            param.setExecuteTime(System.currentTimeMillis());
-        }
-        WaylineJobEntity jobEntity = WaylineJobEntity.builder()
-                .name(param.getName())
-                .dockSn(param.getDockSn())
-                .fileId(param.getFileId())
-                .username(customClaim.getUsername())
-                .workspaceId(customClaim.getWorkspaceId())
-                .jobId(UUID.randomUUID().toString())
-                .executeTime(param.getExecuteTime())
-                .status(WaylineJobStatusEnum.PENDING.getVal())
-                .taskType(param.getTaskType())
-                .waylineType(param.getWaylineType())
-                .outOfControlAction(param.getOutOfControlAction())
-                .rthAltitude(param.getRthAltitude())
-                .mediaCount(0)
-                .build();
+    private Optional<WaylineJobDTO> insertWaylineJob(WaylineJobEntity jobEntity) {
         int id = mapper.insert(jobEntity);
         if (id <= 0) {
             return Optional.empty();
@@ -106,8 +81,56 @@
     }
 
     @Override
+    public Optional<WaylineJobDTO> createWaylineJob(CreateJobParam param, String workspaceId, String username, Long beginTime, Long endTime) {
+        if (Objects.isNull(param)) {
+            return Optional.empty();
+        }
+        // Immediate tasks, allocating time on the backend.
+        WaylineJobEntity jobEntity = WaylineJobEntity.builder()
+                .name(param.getName())
+                .dockSn(param.getDockSn())
+                .fileId(param.getFileId())
+                .username(username)
+                .workspaceId(workspaceId)
+                .jobId(UUID.randomUUID().toString())
+                .beginTime(beginTime)
+                .endTime(endTime)
+                .status(WaylineJobStatusEnum.PENDING.getVal())
+                .taskType(param.getTaskType())
+                .waylineType(param.getWaylineType())
+                .outOfControlAction(param.getOutOfControlAction())
+                .rthAltitude(param.getRthAltitude())
+                .mediaCount(0)
+                .build();
+
+        return insertWaylineJob(jobEntity);
+    }
+
+    @Override
+    public Optional<WaylineJobDTO> createWaylineJobByParent(String workspaceId, String parentId) {
+        Optional<WaylineJobDTO> parentJobOpt = this.getJobByJobId(workspaceId, parentId);
+        if (parentJobOpt.isEmpty()) {
+            return Optional.empty();
+        }
+        WaylineJobEntity jobEntity = this.dto2Entity(parentJobOpt.get());
+        jobEntity.setJobId(UUID.randomUUID().toString());
+        jobEntity.setErrorCode(null);
+        jobEntity.setCompletedTime(null);
+        jobEntity.setExecuteTime(null);
+        jobEntity.setStatus(WaylineJobStatusEnum.PENDING.getVal());
+        jobEntity.setParentId(parentId);
+
+        return this.insertWaylineJob(jobEntity);
+    }
+
+    @Override
     public ResponseResult publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException {
-        Optional<WaylineJobDTO> waylineJobOpt = this.createWaylineJob(param, customClaim);
+        if (WaylineTaskTypeEnum.IMMEDIATE.getVal() == param.getTaskType()) {
+            param.setExecuteTime(System.currentTimeMillis());
+        }
+        Optional<WaylineJobDTO> waylineJobOpt = this.createWaylineJob(param,
+                customClaim.getWorkspaceId(), customClaim.getUsername(),
+                param.getExecuteTime(), param.getExecuteTime());
         if (waylineJobOpt.isEmpty()) {
             throw new SQLException("Failed to create wayline job.");
         }
@@ -127,14 +150,14 @@
         // get file url
         URL url = waylineFileService.getObjectUrl(waylineJob.getWorkspaceId(), waylineFile.get().getWaylineId());
 
-        FlightTaskCreateDTO flightTask = FlightTaskCreateDTO.builder()
+        WaylineTaskCreateDTO flightTask = WaylineTaskCreateDTO.builder()
                 .flightId(waylineJob.getJobId())
-                .executeTime(waylineJob.getExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli())
+                .executeTime(waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli())
                 .taskType(waylineJob.getTaskType())
                 .waylineType(waylineJob.getWaylineType())
                 .rthAltitude(waylineJob.getRthAltitude())
                 .outOfControlAction(waylineJob.getOutOfControlAction())
-                .file(FlightTaskFileDTO.builder()
+                .file(WaylineTaskFileDTO.builder()
                         .url(url.toString())
                         .fingerprint(waylineFile.get().getSign())
                         .build())
@@ -156,22 +179,24 @@
             this.updateJob(WaylineJobDTO.builder()
                     .workspaceId(waylineJob.getWorkspaceId())
                     .jobId(waylineJob.getJobId())
+                    .executeTime(LocalDateTime.now())
                     .status(WaylineJobStatusEnum.FAILED.getVal())
-                    .endTime(LocalDateTime.now())
+                    .completedTime(LocalDateTime.now())
                     .code(serviceReply.getResult()).build());
             return ResponseResult.error("Prepare task ====> Error code: " + serviceReply.getResult());
         }
 
         // Issue an immediate task execution command.
         if (WaylineTaskTypeEnum.IMMEDIATE.getVal() == waylineJob.getTaskType()) {
-            if (!executeFlightTask(waylineJob.getJobId())) {
+            if (!executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId())) {
                 return ResponseResult.error("Failed to execute job.");
             }
         }
 
         if (WaylineTaskTypeEnum.TIMED.getVal() == waylineJob.getTaskType()) {
-            boolean isAdd = RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB, waylineJob.getJobId(),
-                    waylineJob.getExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
+            boolean isAdd = RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB_TIMED_EXECUTE,
+                    waylineJob.getWorkspaceId() + RedisConst.DELIMITER + waylineJob.getDockSn() + RedisConst.DELIMITER + waylineJob.getJobId(),
+                    waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
             if (!isAdd) {
                 return ResponseResult.error("Failed to create scheduled job.");
             }
@@ -181,9 +206,9 @@
     }
 
     @Override
-    public Boolean executeFlightTask(String jobId) {
+    public Boolean executeFlightTask(String workspaceId, String jobId) {
         // get job
-        Optional<WaylineJobDTO> waylineJob = this.getJobByJobId(jobId);
+        Optional<WaylineJobDTO> waylineJob = this.getJobByJobId(workspaceId, jobId);
         if (waylineJob.isEmpty()) {
             throw new IllegalArgumentException("Job doesn't exist.");
         }
@@ -194,7 +219,7 @@
         }
 
         WaylineJobDTO job = waylineJob.get();
-        FlightTaskCreateDTO flightTask = FlightTaskCreateDTO.builder().flightId(jobId).build();
+        WaylineTaskCreateDTO flightTask = WaylineTaskCreateDTO.builder().flightId(jobId).build();
 
         String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT +
                 job.getDockSn() + TopicConst.SERVICES_SUF;
@@ -211,53 +236,45 @@
             log.info("Execute job ====> Error code: {}", serviceReply.getResult());
             this.updateJob(WaylineJobDTO.builder()
                     .jobId(jobId)
+                    .executeTime(LocalDateTime.now())
                     .status(WaylineJobStatusEnum.FAILED.getVal())
-                    .endTime(LocalDateTime.now())
+                    .completedTime(LocalDateTime.now())
                     .code(serviceReply.getResult()).build());
             return false;
         }
 
         this.updateJob(WaylineJobDTO.builder()
                 .jobId(jobId)
+                .executeTime(LocalDateTime.now())
                 .status(WaylineJobStatusEnum.IN_PROGRESS.getVal())
                 .build());
-        RedisOpsUtils.setWithExpire(jobId,
-                EventsReceiver.<FlightTaskProgressReceiver>builder().bid(jobId).sn(job.getDockSn()).build(),
+        RedisOpsUtils.setWithExpire(RedisConst.WAYLINE_JOB_RUNNING_PREFIX + job.getDockSn(),
+                EventsReceiver.<WaylineTaskProgressReceiver>builder().bid(jobId).sn(job.getDockSn()).build(),
                 RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
         return true;
     }
 
     @Override
     public void cancelFlightTask(String workspaceId, Collection<String> jobIds) {
-        List<WaylineJobEntity> waylineJobs = mapper.selectList(
-                new LambdaQueryWrapper<WaylineJobEntity>()
-                        .or(wrapper -> jobIds.forEach(id -> wrapper.eq(WaylineJobEntity::getJobId, id))));
+        List<WaylineJobDTO> waylineJobs = getJobsByConditions(workspaceId, jobIds, WaylineJobStatusEnum.PENDING);
 
-        // Check if the job have ended.
-        List<String> endJobs = waylineJobs.stream()
-                .filter(job -> WaylineJobStatusEnum.find(job.getStatus()).getEnd())
-                .map(WaylineJobEntity::getName)
-                .collect(Collectors.toList());
-        if (!CollectionUtils.isEmpty(endJobs)) {
-            throw new IllegalArgumentException("There are jobs that have ended." + Arrays.toString(endJobs.toArray()));
-        }
-
-        Set<String> ids = waylineJobs.stream().map(WaylineJobEntity::getJobId).collect(Collectors.toSet());
-        for (String id : jobIds) {
-            if (!ids.contains(id)) {
-                throw new IllegalArgumentException("Job id " + id + " doesn't exist.");
-            }
+        Set<String> waylineJobIds = waylineJobs.stream().map(WaylineJobDTO::getJobId).collect(Collectors.toSet());
+        // Check if the task status is correct.
+        boolean isErr = !jobIds.removeAll(waylineJobIds) || !jobIds.isEmpty() ;
+        if (isErr) {
+            throw new IllegalArgumentException("These tasks have an incorrect status and cannot be canceled. "
+                    + Arrays.toString(jobIds.toArray()));
         }
 
         // Group job id by dock sn.
         Map<String, List<String>> dockJobs = waylineJobs.stream()
-                .collect(Collectors.groupingBy(WaylineJobEntity::getDockSn,
-                        Collectors.mapping(WaylineJobEntity::getJobId, Collectors.toList())));
+                .collect(Collectors.groupingBy(WaylineJobDTO::getDockSn,
+                        Collectors.mapping(WaylineJobDTO::getJobId, Collectors.toList())));
         dockJobs.forEach((dockSn, idList) -> this.publishCancelTask(workspaceId, dockSn, idList));
 
     }
 
-    private void publishCancelTask(String workspaceId, String dockSn, List<String> jobIds) {
+    public void publishCancelTask(String workspaceId, String dockSn, List<String> jobIds) {
         boolean isOnline = deviceService.checkDeviceOnline(dockSn);
         if (!isOnline) {
             throw new RuntimeException("Dock is offline.");
@@ -283,17 +300,30 @@
                     .workspaceId(workspaceId)
                     .jobId(jobId)
                     .status(WaylineJobStatusEnum.CANCEL.getVal())
-                    .endTime(LocalDateTime.now())
+                    .completedTime(LocalDateTime.now())
                     .build());
-            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB, jobId);
+            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, workspaceId + RedisConst.DELIMITER + dockSn + RedisConst.DELIMITER + jobId);
         }
 
     }
 
+    public List<WaylineJobDTO> getJobsByConditions(String workspaceId, Collection<String> jobIds, WaylineJobStatusEnum status) {
+        return mapper.selectList(
+                new LambdaQueryWrapper<WaylineJobEntity>()
+                        .eq(WaylineJobEntity::getWorkspaceId, workspaceId)
+                        .eq(Objects.nonNull(status), WaylineJobEntity::getStatus, status.getVal())
+                        .and(!CollectionUtils.isEmpty(jobIds),
+                                wrapper -> jobIds.forEach(id -> wrapper.eq(WaylineJobEntity::getJobId, id).or())))
+                .stream()
+                .map(this::entity2Dto)
+                .collect(Collectors.toList());
+    }
+
     @Override
-    public Optional<WaylineJobDTO> getJobByJobId(String jobId) {
+    public Optional<WaylineJobDTO> getJobByJobId(String workspaceId, String jobId) {
         WaylineJobEntity jobEntity = mapper.selectOne(
                 new LambdaQueryWrapper<WaylineJobEntity>()
+                        .eq(WaylineJobEntity::getWorkspaceId, workspaceId)
                         .eq(WaylineJobEntity::getJobId, jobId));
         return Optional.ofNullable(entity2Dto(jobEntity));
     }
@@ -336,7 +366,8 @@
 
         String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString() + TopicConst._REPLY_SUF;
 
-        Optional<WaylineJobDTO> waylineJobOpt = this.getJobByJobId(jobId);
+        DeviceDTO device = (DeviceDTO) RedisOpsUtils.get(RedisConst.DEVICE_ONLINE_PREFIX + receiver.getGateway());
+        Optional<WaylineJobDTO> waylineJobOpt = this.getJobByJobId(device.getWorkspaceId(), jobId);
         if (waylineJobOpt.isEmpty()) {
             builder.data(RequestsReply.error(CommonErrorEnum.ILLEGAL_ARGUMENT));
             messageSender.publish(topic, builder.build());
@@ -357,8 +388,8 @@
         URL url = null;
         try {
             url = waylineFileService.getObjectUrl(waylineJob.getWorkspaceId(), waylineFile.get().getWaylineId());
-            builder.data(RequestsReply.success(FlightTaskCreateDTO.builder()
-                    .file(FlightTaskFileDTO.builder()
+            builder.data(RequestsReply.success(WaylineTaskCreateDTO.builder()
+                    .file(WaylineTaskFileDTO.builder()
                             .url(url.toString())
                             .fingerprint(waylineFile.get().getSign())
                             .build())
@@ -377,14 +408,15 @@
 
     @Override
     public void uploadMediaHighestPriority(String workspaceId, String jobId) {
-        Optional<WaylineJobDTO> jobOpt = getJobByJobId(jobId);
+        Optional<WaylineJobDTO> jobOpt = getJobByJobId(workspaceId, jobId);
         if (jobOpt.isEmpty()) {
             throw new RuntimeException(CommonErrorEnum.ILLEGAL_ARGUMENT.getErrorMsg());
         }
 
         String dockSn = jobOpt.get().getDockSn();
         String key = RedisConst.MEDIA_HIGHEST_PRIORITY_PREFIX + dockSn;
-        if (RedisOpsUtils.checkExist(key) && jobId.equals(RedisOpsUtils.get(key).toString())) {
+        if (RedisOpsUtils.checkExist(key) &&
+                jobId.equals(((MediaFileCountDTO) RedisOpsUtils.get(key)).getJobId())) {
             return;
         }
 
@@ -399,7 +431,6 @@
         if (ResponseResult.CODE_SUCCESS != reply.getResult()) {
             throw new RuntimeException("Failed to set media job upload priority. Error Code: " + reply.getResult());
         }
-        RedisOpsUtils.setWithExpire(key, jobId, RedisConst.DEVICE_ALIVE_SECOND * 5);
     }
 
     private WaylineJobEntity dto2Entity(WaylineJobDTO dto) {
@@ -407,16 +438,32 @@
         if (dto == null) {
             return builder.build();
         }
+        if (Objects.nonNull(dto.getBeginTime())) {
+            builder.beginTime(dto.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
+        }
         if (Objects.nonNull(dto.getEndTime())) {
             builder.endTime(dto.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
         }
         if (Objects.nonNull(dto.getExecuteTime())) {
             builder.executeTime(dto.getExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
         }
+        if (Objects.nonNull(dto.getCompletedTime())) {
+            builder.completedTime(dto.getCompletedTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
+        }
         return builder.status(dto.getStatus())
                 .mediaCount(dto.getMediaCount())
                 .name(dto.getJobName())
                 .errorCode(dto.getCode())
+                .jobId(dto.getJobId())
+                .fileId(dto.getFileId())
+                .dockSn(dto.getDockSn())
+                .workspaceId(dto.getWorkspaceId())
+                .taskType(dto.getTaskType())
+                .waylineType(dto.getWaylineType())
+                .username(dto.getUsername())
+                .rthAltitude(dto.getRthAltitude())
+                .outOfControlAction(dto.getOutOfControlAction())
+                .parentId(dto.getParentId())
                 .build();
     }
 
@@ -436,9 +483,17 @@
                         .orElse(DeviceDTO.builder().build()).getNickname())
                 .username(entity.getUsername())
                 .workspaceId(entity.getWorkspaceId())
-                .status(entity.getStatus())
+                .status(WaylineJobStatusEnum.IN_PROGRESS.getVal() == entity.getStatus() &&
+                        RedisOpsUtils.checkExist(RedisConst.WAYLINE_JOB_PAUSED_PREFIX + entity.getJobId()) ?
+                        WaylineJobStatusEnum.PAUSED.getVal() : entity.getStatus())
                 .code(entity.getErrorCode())
-                .executeTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getExecuteTime()), ZoneId.systemDefault()))
+                .beginTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getBeginTime()), ZoneId.systemDefault()))
+                .endTime(Objects.nonNull(entity.getEndTime()) ?
+                        LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getEndTime()), ZoneId.systemDefault()) : null)
+                .executeTime(Objects.nonNull(entity.getExecuteTime()) ?
+                        LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getExecuteTime()), ZoneId.systemDefault()) : null)
+                .completedTime(WaylineJobStatusEnum.find(entity.getStatus()).getEnd() ?
+                        LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getUpdateTime()), ZoneId.systemDefault()) : null)
                 .taskType(entity.getTaskType())
                 .waylineType(entity.getWaylineType())
                 .rthAltitude(entity.getRthAltitude())
@@ -449,7 +504,7 @@
             builder.endTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getEndTime()), ZoneId.systemDefault()));
         }
         if (WaylineJobStatusEnum.IN_PROGRESS.getVal() == entity.getStatus() && RedisOpsUtils.getExpire(entity.getJobId()) > 0) {
-            EventsReceiver<FlightTaskProgressReceiver> taskProgress = (EventsReceiver<FlightTaskProgressReceiver>) RedisOpsUtils.get(entity.getJobId());
+            EventsReceiver<WaylineTaskProgressReceiver> taskProgress = (EventsReceiver<WaylineTaskProgressReceiver>) RedisOpsUtils.get(RedisConst.WAYLINE_JOB_RUNNING_PREFIX + entity.getDockSn());
             if (Objects.nonNull(taskProgress.getOutput()) && Objects.nonNull(taskProgress.getOutput().getProgress())) {
                 builder.progress(taskProgress.getOutput().getProgress().getPercent());
             }
@@ -465,7 +520,7 @@
         Object mediaFileCount = RedisOpsUtils.hashGet(countKey, entity.getJobId());
         if (Objects.nonNull(mediaFileCount)) {
             builder.uploadedCount(((MediaFileCountDTO) mediaFileCount).getUploadedCount())
-                    .uploading(RedisOpsUtils.checkExist(key) && entity.getJobId().equals(RedisOpsUtils.get(key)));
+                    .uploading(RedisOpsUtils.checkExist(key) && entity.getJobId().equals(((MediaFileCountDTO)RedisOpsUtils.get(key)).getJobId()));
             return builder.build();
         }
 

--
Gitblit v1.9.3