无人机项目后端代码
sean.zhou
2023-05-18 642dac3e705380eea241648f875836e64dbc809e
Merge branch 'v1.5.0'
What's new?
1. Add new model: DJI Matrices 350 RTK.
2. Update file hms.json.
3. Fixed some issues.
12 files modified
1 files deleted
1 files added
10899 ■■■■■ changed files
pom.xml 2 ●●● patch | view | raw | blame | history
sql/cloud_sample.sql 3 ●●●● patch | view | raw | blame | history
src/main/java/com/dji/sample/component/redis/RedisConst.java 6 ●●●●● patch | view | raw | blame | history
src/main/java/com/dji/sample/component/websocket/service/impl/WebSocketManageServiceImpl.java 2 ●●● patch | view | raw | blame | history
src/main/java/com/dji/sample/manage/service/impl/DeviceServiceImpl.java 32 ●●●●● patch | view | raw | blame | history
src/main/java/com/dji/sample/wayline/model/dto/ConditionalWaylineJobKey.java 39 ●●●●● patch | view | raw | blame | history
src/main/java/com/dji/sample/wayline/model/dto/WaylineJobDTO.java 6 ●●●●● patch | view | raw | blame | history
src/main/java/com/dji/sample/wayline/model/dto/WaylineJobKey.java 36 ●●●●● patch | view | raw | blame | history
src/main/java/com/dji/sample/wayline/model/dto/WaylineTaskCreateDTO.java 6 ●●●●● patch | view | raw | blame | history
src/main/java/com/dji/sample/wayline/service/IWaylineRedisService.java 36 ●●●● patch | view | raw | blame | history
src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java 255 ●●●● patch | view | raw | blame | history
src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java 110 ●●●●● patch | view | raw | blame | history
src/main/java/com/dji/sample/wayline/service/impl/WaylineRedisServiceImpl.java 27 ●●●●● patch | view | raw | blame | history
src/main/resources/hms.json 10339 ●●●●● patch | view | raw | blame | history
pom.xml
@@ -11,7 +11,7 @@
    <groupId>com.dji</groupId>
    <artifactId>cloud-api-sample</artifactId>
    <version>1.4.0</version>
    <version>1.5.0</version>
    <name>cloud-api-sample</name>
    <properties>
sql/cloud_sample.sql
@@ -134,7 +134,8 @@
    (22,1,67,0,'Mavic 3T Camera',NULL),
    (23,2,144,0,'DJI RC Pro','Remote control for Mavic 3E/T and Mavic 3M'),
    (24,0,77,2,'Mavic 3M',NULL),
    (25,1,68,0,'Mavic 3M Camera',NULL);
    (25,1,68,0,'Mavic 3M Camera',NULL),
    (26,0,89,0,'Matrice 350 RTK',NULL);
/*!40000 ALTER TABLE `manage_device_dictionary` ENABLE KEYS */;
UNLOCK TABLES;
src/main/java/com/dji/sample/component/redis/RedisConst.java
@@ -37,11 +37,9 @@
    public static final String LOGS_FILE_PREFIX = "logs_file" + DELIMITER;
    public static final String WAYLINE_JOB_TIMED_EXECUTE = "wayline_job_timed_execute";
    public static final String WAYLINE_JOB_PREPARED = "wayline_job_prepared";
    public static final String WAYLINE_JOB_CONDITION_PREPARE = "wayline_job_condition_prepare";
    public static final String WAYLINE_JOB_CONDITION_PREFIX = WAYLINE_JOB_CONDITION_PREPARE + DELIMITER;
    public static final String WAYLINE_JOB_CONDITION_PREFIX = "wayline_job_condition" + DELIMITER;
    public static final String WAYLINE_JOB_BLOCK_PREFIX = "wayline_job_block" + DELIMITER;
src/main/java/com/dji/sample/component/websocket/service/impl/WebSocketManageServiceImpl.java
@@ -75,7 +75,7 @@
        return RedisOpsUtils.hashKeys(key)
                .stream()
                .map(SESSIONS::get)
                .filter(Objects::nonNull)
                .filter(this.getValueWithWorkspace(workspaceId)::contains)
                .collect(Collectors.toSet());
    }
src/main/java/com/dji/sample/manage/service/impl/DeviceServiceImpl.java
@@ -518,9 +518,37 @@
            }
            entity.setId(deviceEntity.getId());
            mapper.updateById(entity);
            return Optional.of(deviceEntity);
            fillNullField(entity, deviceEntity);
            return Optional.of(entity);
        }
        return mapper.insert(entity) > 0 ? Optional.of(entity) : Optional.empty();
    }
    private void fillNullField(DeviceEntity entity, DeviceEntity oldEntity) {
        if (Objects.isNull(entity) || Objects.isNull(oldEntity)) {
            return;
        }
        if (Objects.isNull(entity.getWorkspaceId())) {
            entity.setWorkspaceId(oldEntity.getWorkspaceId());
        }
        if (Objects.isNull(entity.getUserId())) {
            entity.setUserId(oldEntity.getUserId());
        }
        if (Objects.isNull(entity.getChildSn())) {
            entity.setChildSn(oldEntity.getChildSn());
        }
        if (Objects.isNull(entity.getBoundStatus())) {
            entity.setBoundStatus(oldEntity.getBoundStatus());
        }
        if (Objects.isNull(entity.getBoundTime())) {
            entity.setBoundTime(oldEntity.getBoundTime());
        }
        if (Objects.isNull(entity.getFirmwareVersion())) {
            entity.setFirmwareVersion(oldEntity.getFirmwareVersion());
        }
        if (Objects.isNull(entity.getDeviceIndex())) {
            entity.setDeviceIndex(oldEntity.getDeviceIndex());
        }
    }
    /**
@@ -650,7 +678,7 @@
            return;
        }
        if (entity.getFirmwareVersion().equals(firmwareReleaseNoteOpt.get().getProductVersion())) {
            deviceDTO.setFirmwareStatus(entity.getCompatibleStatus() ?
            deviceDTO.setFirmwareStatus(Objects.requireNonNullElse(entity.getCompatibleStatus(), true) ?
                    DeviceFirmwareStatusEnum.NOT_UPGRADE.getVal() :
                    DeviceFirmwareStatusEnum.CONSISTENT_UPGRADE.getVal());
            return;
src/main/java/com/dji/sample/wayline/model/dto/ConditionalWaylineJobKey.java
File was deleted
src/main/java/com/dji/sample/wayline/model/dto/WaylineJobDTO.java
@@ -1,5 +1,7 @@
package com.dji.sample.wayline.model.dto;
import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum;
import com.dji.sample.wayline.model.enums.WaylineTemplateTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -32,9 +34,9 @@
    private String workspaceId;
    private Integer waylineType;
    private WaylineTemplateTypeEnum waylineType;
    private Integer taskType;
    private WaylineTaskTypeEnum taskType;
    private LocalDateTime executeTime;
src/main/java/com/dji/sample/wayline/model/dto/WaylineJobKey.java
New file
@@ -0,0 +1,36 @@
package com.dji.sample.wayline.model.dto;
import com.dji.sample.component.redis.RedisConst;
import lombok.Data;
/**
 * @author sean
 * @version 1.4
 * @date 2023/3/28
 */
@Data
public class WaylineJobKey {
    private String workspaceId;
    private String dockSn;
    private String jobId;
    public WaylineJobKey(String workspaceId, String dockSn, String jobId) {
        this.workspaceId = workspaceId;
        this.dockSn = dockSn;
        this.jobId = jobId;
    }
    private WaylineJobKey(String[] keyArr) {
        this(keyArr[0], keyArr[1], keyArr[2]);
    }
    public WaylineJobKey(String key) {
        this(key.split(RedisConst.DELIMITER));
    }
    public String getKey() {
        return String.join(RedisConst.DELIMITER, workspaceId, dockSn, jobId);
    }
}
src/main/java/com/dji/sample/wayline/model/dto/WaylineTaskCreateDTO.java
@@ -1,5 +1,7 @@
package com.dji.sample.wayline.model.dto;
import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum;
import com.dji.sample.wayline.model.enums.WaylineTemplateTypeEnum;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
@@ -18,9 +20,9 @@
    private String flightId;
    private Integer taskType;
    private WaylineTaskTypeEnum taskType;
    private Integer waylineType;
    private WaylineTemplateTypeEnum waylineType;
    private Long executeTime;
src/main/java/com/dji/sample/wayline/service/IWaylineRedisService.java
@@ -1,8 +1,8 @@
package com.dji.sample.wayline.service;
import com.dji.sample.component.mqtt.model.EventsReceiver;
import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey;
import com.dji.sample.wayline.model.dto.WaylineJobDTO;
import com.dji.sample.wayline.model.dto.WaylineJobKey;
import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver;
import java.util.Optional;
@@ -71,6 +71,13 @@
    String getBlockedWaylineJobId(String dockSn);
    /**
     * Delete the wayline job id blocked by the dock in redis.
     * @param dockSn
     * @return
     */
    Boolean delBlockedWaylineJobId(String dockSn);
    /**
     * Save the conditional wayline job by the dock to redis.
     * @param waylineJob
     */
@@ -90,11 +97,30 @@
     */
    Boolean delConditionalWaylineJob(String jobId);
    Boolean addPrepareConditionalWaylineJob(WaylineJobDTO waylineJob);
    /**
     * Add the wayline job that needs to be issued.
     * @param waylineJob
     * @return
     */
    Boolean addPreparedWaylineJob(WaylineJobDTO waylineJob);
    Optional<ConditionalWaylineJobKey> getNearestConditionalWaylineJob();
    /**
     * Get the latest wayline job that needs to be issued.
     * @return
     */
    Optional<WaylineJobKey> getNearestPreparedWaylineJob();
    Double getConditionalWaylineJobTime(ConditionalWaylineJobKey jobKey);
    /**
     * Get the time when the wayline job is issued.
     * @param jobKey
     * @return
     */
    Double getPreparedWaylineJobTime(WaylineJobKey jobKey);
    Boolean removePrepareConditionalWaylineJob(ConditionalWaylineJobKey jobKey);
    /**
     * Delete the wayline job that needs to be issued in redis.
     * @param jobKey
     * @return
     */
    Boolean removePreparedWaylineJob(WaylineJobKey jobKey);
}
src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
@@ -12,10 +12,11 @@
import com.dji.sample.manage.model.enums.UserTypeEnum;
import com.dji.sample.manage.service.IDeviceRedisService;
import com.dji.sample.media.model.MediaFileCountDTO;
import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey;
import com.dji.sample.wayline.model.dto.WaylineJobDTO;
import com.dji.sample.wayline.model.dto.WaylineJobKey;
import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver;
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum;
import com.dji.sample.wayline.service.IFlightTaskService;
import com.dji.sample.wayline.service.IWaylineJobService;
import com.dji.sample.wayline.service.IWaylineRedisService;
@@ -25,7 +26,6 @@
import org.apache.http.HttpStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -70,9 +70,6 @@
     */
    @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS)
    public CommonTopicReceiver handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
        String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
        String dockSn  = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
                receivedTopic.indexOf(TopicConst.EVENTS_SUF));
        EventsReceiver<WaylineTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
                new TypeReference<EventsReceiver<WaylineTaskProgressReceiver>>(){});
        eventsReceiver.setBid(receiver.getBid());
@@ -87,40 +84,50 @@
        }
        EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
        waylineRedisService.setRunningWaylineJob(dockSn, eventsReceiver);
        if (statusEnum.getEnd()) {
            WaylineJobDTO job = WaylineJobDTO.builder()
                    .jobId(receiver.getBid())
                    .status(WaylineJobStatusEnum.SUCCESS.getVal())
                    .completedTime(LocalDateTime.now())
                    .mediaCount(output.getExt().getMediaCount())
                    .build();
            // record the update of the media count.
            if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) {
                RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(),
                        MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build());
            }
            if (EventsResultStatusEnum.OK != statusEnum) {
                job.setCode(eventsReceiver.getResult());
                job.setStatus(WaylineJobStatusEnum.FAILED.getVal());
            }
            waylineJobService.updateJob(job);
            waylineRedisService.delRunningWaylineJob(dockSn);
            waylineRedisService.delPausedWaylineJob(receiver.getBid());
        }
        waylineRedisService.setRunningWaylineJob(receiver.getGateway(), eventsReceiver);
        Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(receiver.getGateway());
        if (deviceOpt.isEmpty()) {
            return null;
        }
        if (statusEnum.getEnd()) {
            handleEndStatus(receiver, statusEnum, output.getExt().getMediaCount(), eventsReceiver.getResult(), deviceOpt.get());
        }
        websocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(),
                        BizCodeEnum.FLIGHT_TASK_PROGRESS.getCode(), eventsReceiver);
        return receiver;
    }
    private void handleEndStatus(CommonTopicReceiver receiver, EventsResultStatusEnum statusEnum, int mediaCount, int code, DeviceDTO dock) {
        WaylineJobDTO job = WaylineJobDTO.builder()
                .jobId(receiver.getBid())
                .status(WaylineJobStatusEnum.SUCCESS.getVal())
                .completedTime(LocalDateTime.now())
                .mediaCount(mediaCount)
                .build();
        // record the update of the media count.
        if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) {
            RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(),
                    MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build());
        }
        if (EventsResultStatusEnum.OK != statusEnum) {
            job.setCode(code);
            job.setStatus(WaylineJobStatusEnum.FAILED.getVal());
        }
        waylineRedisService.getConditionalWaylineJob(receiver.getBid()).ifPresent(waylineJob ->
                retryPrepareConditionJob(new WaylineJobKey(dock.getWorkspaceId(), dock.getDeviceSn(), receiver.getBid()), waylineJob));
        waylineJobService.updateJob(job);
        waylineRedisService.delRunningWaylineJob(receiver.getGateway());
        waylineRedisService.delPausedWaylineJob(receiver.getBid());
        waylineRedisService.delBlockedWaylineJobId(receiver.getGateway());
    }
    /**
@@ -130,106 +137,65 @@
     */
    @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_READY, outputChannel = ChannelName.OUTBOUND_EVENTS)
    public CommonTopicReceiver handleTaskNotifications(CommonTopicReceiver receiver, MessageHeaders headers) {
        String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
        String dockSn  = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
                receivedTopic.indexOf(TopicConst.EVENTS_SUF));
        List<String> flightIds = mapper.convertValue(receiver.getData(),
                new TypeReference<Map<String, List<String>>>(){}).get(MapKeyConst.FLIGHT_IDS);
        String dockSn  = receiver.getGateway();
        Set<String> flightIds = mapper.convertValue(receiver.getData(),
                new TypeReference<Map<String, Set<String>>>(){}).get(MapKeyConst.FLIGHT_IDS);
        log.info("ready task list:{}", Arrays.toString(flightIds.toArray()) );
        log.info("ready task list:{}", Arrays.toString(flightIds.toArray()));
        // Check conditional task blocking status.
        String blockedId = waylineRedisService.getBlockedWaylineJobId(dockSn);
        if (!StringUtils.hasText(blockedId)) {
        if (StringUtils.hasText(blockedId)) {
            log.info("The dock is in a state of wayline congestion, and the task will not be executed.");
            return null;
        }
        Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(dockSn);
        if (deviceOpt.isEmpty()) {
            log.info("The dock is offline.");
            return null;
        }
        DeviceDTO device = deviceOpt.get();
        try {
            for (String jobId : flightIds) {
                boolean isExecute = waylineJobService.executeFlightTask(device.getWorkspaceId(), jobId);
                if (!isExecute) {
                    return null;
                }
                Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobId);
                if (waylineJobOpt.isEmpty()) {
                    log.info("The conditional job has expired and will no longer be executed.");
                    return receiver;
                }
                WaylineJobDTO waylineJob = waylineJobOpt.get();
                this.retryPrepareJob(new ConditionalWaylineJobKey(device.getWorkspaceId(), dockSn, jobId), waylineJob);
                return receiver;
            }
        } catch (Exception e) {
            log.error("Failed to execute conditional task.");
            e.printStackTrace();
        Optional<WaylineJobDTO> jobOpt = waylineJobService.getJobsByConditions(device.getWorkspaceId(), flightIds, WaylineJobStatusEnum.PENDING)
                .stream().filter(job -> flightIds.contains(job.getJobId()))
                .sorted(Comparator.comparingInt(a -> a.getTaskType().getVal()))
                .min(Comparator.comparing(WaylineJobDTO::getBeginTime));
        if (jobOpt.isEmpty()) {
            return receiver;
        }
        executeReadyTask(jobOpt.get());
        return receiver;
    }
    @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
    private void checkScheduledJob() {
        Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE);
        if (Objects.isNull(jobIdValue)) {
            return;
        }
        log.info("Check the timed tasks of the wayline. {}", jobIdValue);
        // format: {workspace_id}:{dock_sn}:{job_id}
        String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER);
        double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
        long now = System.currentTimeMillis();
        int offset = 30_000;
        // Expired tasks are deleted directly.
        if (time < now - offset) {
            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
            waylineJobService.updateJob(WaylineJobDTO.builder()
                    .jobId(jobArr[2])
                    .status(WaylineJobStatusEnum.FAILED.getVal())
                    .executeTime(LocalDateTime.now())
                    .completedTime(LocalDateTime.now())
                    .code(HttpStatus.SC_REQUEST_TIMEOUT).build());
            return;
        }
        if (now <= time && time <= now + offset) {
            try {
                waylineJobService.executeFlightTask(jobArr[0], jobArr[2]);
            } catch (Exception e) {
                log.info("The scheduled task delivery failed.");
                waylineJobService.updateJob(WaylineJobDTO.builder()
                        .jobId(jobArr[2])
                        .status(WaylineJobStatusEnum.FAILED.getVal())
                        .executeTime(LocalDateTime.now())
                        .completedTime(LocalDateTime.now())
                        .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build());
            } finally {
                RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
    private void executeReadyTask(WaylineJobDTO waylineJob) {
        try {
            boolean isExecute = waylineJobService.executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId());
            if (isExecute || WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) {
                return;
            }
            Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(waylineJob.getJobId());
            if (waylineJobOpt.isEmpty()) {
                log.info("The conditional job has expired and will no longer be executed.");
                return;
            }
            waylineJob = waylineJobOpt.get();
            this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob);
        } catch (Exception e) {
            log.error("Failed to execute task. ID: {}, Name:{}", waylineJob.getJobId(), waylineJob.getJobName());
            this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob);
            e.printStackTrace();
        }
    }
    @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
    private void prepareConditionJob() {
        Optional<ConditionalWaylineJobKey> jobKeyOpt = waylineRedisService.getNearestConditionalWaylineJob();
    private void prepareWaylineJob() {
        Optional<WaylineJobKey> jobKeyOpt = waylineRedisService.getNearestPreparedWaylineJob();
        if (jobKeyOpt.isEmpty()) {
            return;
        }
        ConditionalWaylineJobKey jobKey = jobKeyOpt.get();
        log.info("Check the conditional tasks of the wayline. {}", jobKey.toString());
        // format: {workspace_id}:{dock_sn}:{job_id}
        double time = waylineRedisService.getConditionalWaylineJobTime(jobKey);
        long now = System.currentTimeMillis();
        // prepare the task one day in advance.
        int offset = 86_400_000;
        if (now + offset < time) {
            return;
        }
        WaylineJobKey jobKey = jobKeyOpt.get();
        log.info("Check the prepared tasks of the wayline. {}", jobKey.toString());
        WaylineJobDTO job = WaylineJobDTO.builder()
                .jobId(jobKey.getJobId())
@@ -237,40 +203,65 @@
                .executeTime(LocalDateTime.now())
                .completedTime(LocalDateTime.now())
                .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build();
        Optional<WaylineJobDTO> waylineJobOpt = getPreparedJob(jobKey, job);
        if (waylineJobOpt.isEmpty()) {
            return;
        }
        WaylineJobDTO waylineJob = waylineJobOpt.get();
        try {
            Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId());
            if (waylineJobOpt.isEmpty()) {
                job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode());
                waylineJobService.updateJob(job);
                waylineRedisService.removePrepareConditionalWaylineJob(jobKey);
                return;
            }
            WaylineJobDTO waylineJob = waylineJobOpt.get();
            ResponseResult result = waylineJobService.publishOneFlightTask(waylineJob);
            waylineRedisService.removePrepareConditionalWaylineJob(jobKey);
            if (ResponseResult.CODE_SUCCESS == result.getCode()) {
                return;
            }
            // If the end time is exceeded, no more retries will be made.
            waylineRedisService.delConditionalWaylineJob(jobKey.getJobId());
            if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - RedisConst.WAYLINE_JOB_BLOCK_TIME * 1000 < now) {
                return;
            }
            // Retry if the end time has not been exceeded.
            this.retryPrepareJob(jobKey, waylineJob);
        } catch (Exception e) {
            log.info("Failed to prepare the conditional task.");
            log.info("Failed to prepare the task. {}", result.getMessage());
            job.setCode(result.getCode());
            waylineJobService.updateJob(job);
            // Retry if the end time has not been exceeded.
            this.retryPrepareConditionJob(jobKey, waylineJob);
        } catch (Exception e) {
            log.info("Failed to prepare the task. {}", e.getLocalizedMessage());
            waylineJobService.updateJob(job);
            this.retryPrepareConditionJob(jobKey, waylineJob);
        }
    }
    private void retryPrepareJob(ConditionalWaylineJobKey jobKey, WaylineJobDTO waylineJob) {
    private boolean checkTime(long time) {
        // prepare the task one day in advance.
        int offset = 86_400_000;
        return System.currentTimeMillis() + offset >= time;
    }
    private Optional<WaylineJobDTO> getPreparedJob(WaylineJobKey jobKey, WaylineJobDTO job) {
        long time = waylineRedisService.getPreparedWaylineJobTime(jobKey).longValue();
        if (!checkTime(time)) {
            return Optional.empty();
        }
        Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId());
        // Determine whether the conditional task or the scheduled task has expired.
        if (waylineJobOpt.isEmpty()) {
            waylineJobOpt = waylineJobService.getJobByJobId(jobKey.getWorkspaceId(), jobKey.getJobId());
            if (waylineJobOpt.isEmpty() || waylineJobOpt.get().getEndTime().isBefore(LocalDateTime.now())) {
                job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode());
                waylineJobService.updateJob(job);
                return Optional.empty();
            }
        }
        waylineRedisService.removePreparedWaylineJob(jobKey);
        return waylineJobOpt;
    }
    private void retryPrepareConditionJob(WaylineJobKey jobKey, WaylineJobDTO waylineJob) {
        if (WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) {
            return;
        }
        // If the end time is exceeded, no more retries will be made.
        waylineRedisService.delConditionalWaylineJob(jobKey.getJobId());
        if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() < System.currentTimeMillis()) {
            return;
        }
        Optional<WaylineJobDTO> childJobOpt = waylineJobService.createWaylineJobByParent(jobKey.getWorkspaceId(), jobKey.getJobId());
        if (childJobOpt.isEmpty()) {
            log.error("Failed to create wayline job.");
@@ -279,7 +270,7 @@
        WaylineJobDTO newJob = childJobOpt.get();
        newJob.setBeginTime(LocalDateTime.now().plusSeconds(RedisConst.WAYLINE_JOB_BLOCK_TIME));
        boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(newJob);
        boolean isAdd = waylineRedisService.addPreparedWaylineJob(newJob);
        if (!isAdd) {
            log.error("Failed to create wayline job. {}", newJob.getJobId());
            return;
src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java
@@ -27,10 +27,7 @@
import com.dji.sample.wayline.dao.IWaylineJobMapper;
import com.dji.sample.wayline.model.dto.*;
import com.dji.sample.wayline.model.entity.WaylineJobEntity;
import com.dji.sample.wayline.model.enums.WaylineErrorCodeEnum;
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
import com.dji.sample.wayline.model.enums.WaylineMethodEnum;
import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum;
import com.dji.sample.wayline.model.enums.*;
import com.dji.sample.wayline.model.param.CreateJobParam;
import com.dji.sample.wayline.model.param.UpdateJobParam;
import com.dji.sample.wayline.service.IWaylineFileService;
@@ -152,71 +149,67 @@
            return;
        }
        long now = System.currentTimeMillis() / 1000;
        if (CollectionUtils.isEmpty(param.getTaskDays())) {
            param.setTaskDays(List.of(now));
        }
        if (CollectionUtils.isEmpty(param.getTaskPeriods())) {
            param.setTaskPeriods(List.of(List.of(now)));
        }
        param.setTaskDays(Collections.singletonList(now));
        param.setTaskPeriods(Collections.singletonList(Collections.singletonList(now)));
    }
    @Override
    public ResponseResult publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException {
        fillImmediateTime(param);
        param.getTaskDays().sort((a, b) -> (int) (a - b));
        param.getTaskPeriods().sort((a, b) -> (int) (a.get(0) - b.get(0)));
        for (Long taskDay : param.getTaskDays()) {
            LocalDate date = LocalDate.ofInstant(Instant.ofEpochSecond(taskDay), ZoneId.systemDefault());
            for (List<Long> taskPeriod : param.getTaskPeriods()) {
                long beginTime = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(0)), ZoneId.systemDefault()))
                        .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
                long endTime = taskPeriod.size() > 1 ?
                long endTime = taskPeriod.size() > 1 && Objects.nonNull(taskPeriod.get(1)) ?
                        LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(1)), ZoneId.systemDefault()))
                                .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() : beginTime;
                if (WaylineTaskTypeEnum.IMMEDIATE != param.getTaskType() && endTime < System.currentTimeMillis()) {
                    return ResponseResult.error("The task has expired.");
                }
                Optional<WaylineJobDTO> waylineJobOpt = this.createWaylineJob(param, customClaim.getWorkspaceId(), customClaim.getUsername(), beginTime, endTime);
                if (waylineJobOpt.isEmpty()) {
                    throw new SQLException("Failed to create wayline job.");
                    return ResponseResult.error("Failed to create wayline job.");
                }
                WaylineJobDTO waylineJob = waylineJobOpt.get();
                // If it is a conditional task type, add conditions to the job parameters.
                addConditions(waylineJob, param, beginTime, endTime);
                if (WaylineTaskTypeEnum.IMMEDIATE == param.getTaskType()) {
                    return this.publishOneFlightTask(waylineJob);
                }
                return this.publishOneFlightTask(waylineJob);
                // If it is a conditional task type, add conditions to the job parameters.
                addPreparedJob(waylineJob, param, beginTime, endTime);
            }
        }
        return ResponseResult.error();
        return ResponseResult.success();
    }
    private void addConditions(WaylineJobDTO waylineJob, CreateJobParam param, Long beginTime, Long endTime) {
        if (WaylineTaskTypeEnum.CONDITION != param.getTaskType()) {
            return;
    private void addPreparedJob(WaylineJobDTO waylineJob, CreateJobParam param, Long beginTime, Long endTime) {
        if (WaylineTaskTypeEnum.CONDITION == param.getTaskType()) {
            waylineJob.setConditions(
                    WaylineTaskConditionDTO.builder()
                            .executableConditions(Objects.nonNull(param.getMinStorageCapacity()) ?
                                    WaylineTaskExecutableConditionDTO.builder().storageCapacity(param.getMinStorageCapacity()).build() : null)
                            .readyConditions(WaylineTaskReadyConditionDTO.builder()
                                    .batteryCapacity(param.getMinBatteryCapacity())
                                    .beginTime(beginTime)
                                    .endTime(endTime)
                                    .build())
                            .build());
            waylineRedisService.setConditionalWaylineJob(waylineJob);
        }
        waylineJob.setConditions(
                WaylineTaskConditionDTO.builder()
                        .executableConditions(Objects.nonNull(param.getMinStorageCapacity()) ?
                                WaylineTaskExecutableConditionDTO.builder().storageCapacity(param.getMinStorageCapacity()).build() : null)
                        .readyConditions(WaylineTaskReadyConditionDTO.builder()
                                .batteryCapacity(param.getMinBatteryCapacity())
                                .beginTime(beginTime)
                                .endTime(endTime)
                                .build())
                        .build());
        waylineRedisService.setConditionalWaylineJob(waylineJob);
        // key: wayline_job_condition, value: {workspace_id}:{dock_sn}:{job_id}
        boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(waylineJob);
        // value: {workspace_id}:{dock_sn}:{job_id}
        boolean isAdd = waylineRedisService.addPreparedWaylineJob(waylineJob);
        if (!isAdd) {
            throw new RuntimeException("Failed to create conditional job.");
            throw new RuntimeException("Failed to create prepare job.");
        }
    }
    public ResponseResult publishOneFlightTask(WaylineJobDTO waylineJob) throws SQLException {
        boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.getDockSn());
        if (!isOnline) {
            throw new RuntimeException("Dock is offline.");
        }
        boolean isSuccess = this.prepareFlightTask(waylineJob);
        if (!isSuccess) {
@@ -224,19 +217,10 @@
        }
        // Issue an immediate task execution command.
        if (WaylineTaskTypeEnum.IMMEDIATE.getVal() == waylineJob.getTaskType()) {
            if (!executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId())) {
        if (WaylineTaskTypeEnum.IMMEDIATE == waylineJob.getTaskType()) {
            boolean isExecuted = executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId());
            if (!isExecuted) {
                return ResponseResult.error("Failed to execute job.");
            }
        }
        if (WaylineTaskTypeEnum.TIMED.getVal() == waylineJob.getTaskType()) {
            // key: wayline_job_timed, value: {workspace_id}:{dock_sn}:{job_id}
            boolean isAdd = RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB_TIMED_EXECUTE,
                    waylineJob.getWorkspaceId() + RedisConst.DELIMITER + waylineJob.getDockSn() + RedisConst.DELIMITER + waylineJob.getJobId(),
                    waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
            if (!isAdd) {
                return ResponseResult.error("Failed to create scheduled job.");
            }
        }
@@ -244,6 +228,12 @@
    }
    private Boolean prepareFlightTask(WaylineJobDTO waylineJob) throws SQLException {
        boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.getDockSn());
        if (!isOnline) {
            throw new RuntimeException("Dock is offline.");
        }
        // get wayline file
        Optional<WaylineFileDTO> waylineFile = waylineFileService.getWaylineByWaylineId(waylineJob.getWorkspaceId(), waylineJob.getFileId());
        if (waylineFile.isEmpty()) {
@@ -266,7 +256,7 @@
                        .build())
                .build();
        if (WaylineTaskTypeEnum.CONDITION.getVal() == waylineJob.getTaskType()) {
        if (WaylineTaskTypeEnum.CONDITION == waylineJob.getTaskType()) {
            if (Objects.isNull(waylineJob.getConditions())) {
                throw new IllegalArgumentException();
            }
@@ -317,7 +307,7 @@
                    .completedTime(LocalDateTime.now())
                    .code(serviceReply.getResult()).build());
            // The conditional task fails and enters the blocking status.
            if (WaylineTaskTypeEnum.CONDITION.getVal() == job.getTaskType()
            if (WaylineTaskTypeEnum.CONDITION == job.getTaskType()
                    && WaylineErrorCodeEnum.find(serviceReply.getResult()).isBlock()) {
                waylineRedisService.setBlockedWaylineJob(job.getDockSn(), jobId);
            }
@@ -372,7 +362,6 @@
                    .status(WaylineJobStatusEnum.CANCEL.getVal())
                    .completedTime(LocalDateTime.now())
                    .build());
            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, workspaceId + RedisConst.DELIMITER + dockSn + RedisConst.DELIMITER + jobId);
        }
    }
@@ -382,8 +371,7 @@
                new LambdaQueryWrapper<WaylineJobEntity>()
                        .eq(WaylineJobEntity::getWorkspaceId, workspaceId)
                        .eq(Objects.nonNull(status), WaylineJobEntity::getStatus, status.getVal())
                        .and(!CollectionUtils.isEmpty(jobIds),
                                wrapper -> jobIds.forEach(id -> wrapper.eq(WaylineJobEntity::getJobId, id).or())))
                        .in(!CollectionUtils.isEmpty(jobIds), WaylineJobEntity::getJobId, jobIds))
                .stream()
                .map(this::entity2Dto)
                .collect(Collectors.toList());
@@ -524,8 +512,8 @@
                .fileId(dto.getFileId())
                .dockSn(dto.getDockSn())
                .workspaceId(dto.getWorkspaceId())
                .taskType(dto.getTaskType())
                .waylineType(dto.getWaylineType())
                .taskType(Optional.ofNullable(dto.getTaskType()).map(WaylineTaskTypeEnum::getVal).orElse(null))
                .waylineType(Optional.ofNullable(dto.getWaylineType()).map(WaylineTemplateTypeEnum::getVal).orElse(null))
                .username(dto.getUsername())
                .rthAltitude(dto.getRthAltitude())
                .outOfControlAction(dto.getOutOfControlAction())
@@ -645,8 +633,8 @@
                        LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getExecuteTime()), ZoneId.systemDefault()) : null)
                .completedTime(WaylineJobStatusEnum.find(entity.getStatus()).getEnd() ?
                        LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getUpdateTime()), ZoneId.systemDefault()) : null)
                .taskType(entity.getTaskType())
                .waylineType(entity.getWaylineType())
                .taskType(WaylineTaskTypeEnum.find(entity.getTaskType()))
                .waylineType(WaylineTemplateTypeEnum.find(entity.getWaylineType()))
                .rthAltitude(entity.getRthAltitude())
                .outOfControlAction(entity.getOutOfControlAction())
                .mediaCount(entity.getMediaCount());
src/main/java/com/dji/sample/wayline/service/impl/WaylineRedisServiceImpl.java
@@ -3,8 +3,8 @@
import com.dji.sample.component.mqtt.model.EventsReceiver;
import com.dji.sample.component.redis.RedisConst;
import com.dji.sample.component.redis.RedisOpsUtils;
import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey;
import com.dji.sample.wayline.model.dto.WaylineJobDTO;
import com.dji.sample.wayline.model.dto.WaylineJobKey;
import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver;
import com.dji.sample.wayline.service.IWaylineRedisService;
import org.springframework.stereotype.Service;
@@ -65,12 +65,17 @@
    }
    @Override
    public Boolean delBlockedWaylineJobId(String dockSn) {
        return RedisOpsUtils.del(RedisConst.WAYLINE_JOB_BLOCK_PREFIX + dockSn);
    }
    @Override
    public void setConditionalWaylineJob(WaylineJobDTO waylineJob) {
        if (!StringUtils.hasText(waylineJob.getJobId())) {
            throw new RuntimeException("Job id can't be null.");
        }
        RedisOpsUtils.setWithExpire(RedisConst.WAYLINE_JOB_CONDITION_PREFIX + waylineJob.getJobId(), waylineJob,
                (Duration.between(waylineJob.getEndTime(), LocalDateTime.now()).getSeconds()));
                Math.abs(Duration.between(waylineJob.getEndTime(), LocalDateTime.now()).getSeconds()));
    }
    @Override
@@ -84,29 +89,29 @@
    }
    @Override
    public Boolean addPrepareConditionalWaylineJob(WaylineJobDTO waylineJob) {
    public Boolean addPreparedWaylineJob(WaylineJobDTO waylineJob) {
        if (Objects.isNull(waylineJob.getBeginTime())) {
            return false;
        }
        // value: {workspace_id}:{dock_sn}:{job_id}
        return RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB_CONDITION_PREPARE,
        return RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB_PREPARED,
                waylineJob.getWorkspaceId() + RedisConst.DELIMITER + waylineJob.getDockSn() + RedisConst.DELIMITER + waylineJob.getJobId(),
                waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli());
    }
    @Override
    public Optional<ConditionalWaylineJobKey> getNearestConditionalWaylineJob() {
        return Optional.ofNullable(RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_CONDITION_PREPARE))
                .map(Object::toString).map(ConditionalWaylineJobKey::new);
    public Optional<WaylineJobKey> getNearestPreparedWaylineJob() {
        return Optional.ofNullable(RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_PREPARED))
                .map(Object::toString).map(WaylineJobKey::new);
    }
    @Override
    public Double getConditionalWaylineJobTime(ConditionalWaylineJobKey jobKey) {
        return RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_CONDITION_PREPARE, jobKey.getKey());
    public Double getPreparedWaylineJobTime(WaylineJobKey jobKey) {
        return RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_PREPARED, jobKey.getKey());
    }
    @Override
    public Boolean removePrepareConditionalWaylineJob(ConditionalWaylineJobKey jobKey) {
        return RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_CONDITION_PREPARE, jobKey.getKey());
    public Boolean removePreparedWaylineJob(WaylineJobKey jobKey) {
        return RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_PREPARED, jobKey.getKey());
    }
}
src/main/resources/hms.json
Diff too large