pom.xml
@@ -11,7 +11,7 @@ <groupId>com.dji</groupId> <artifactId>cloud-api-sample</artifactId> <version>1.4.0</version> <version>1.5.0</version> <name>cloud-api-sample</name> <properties> sql/cloud_sample.sql
@@ -134,7 +134,8 @@ (22,1,67,0,'Mavic 3T Camera',NULL), (23,2,144,0,'DJI RC Pro','Remote control for Mavic 3E/T and Mavic 3M'), (24,0,77,2,'Mavic 3M',NULL), (25,1,68,0,'Mavic 3M Camera',NULL); (25,1,68,0,'Mavic 3M Camera',NULL), (26,0,89,0,'Matrice 350 RTK',NULL); /*!40000 ALTER TABLE `manage_device_dictionary` ENABLE KEYS */; UNLOCK TABLES; src/main/java/com/dji/sample/component/redis/RedisConst.java
@@ -37,11 +37,9 @@ public static final String LOGS_FILE_PREFIX = "logs_file" + DELIMITER; public static final String WAYLINE_JOB_TIMED_EXECUTE = "wayline_job_timed_execute"; public static final String WAYLINE_JOB_PREPARED = "wayline_job_prepared"; public static final String WAYLINE_JOB_CONDITION_PREPARE = "wayline_job_condition_prepare"; public static final String WAYLINE_JOB_CONDITION_PREFIX = WAYLINE_JOB_CONDITION_PREPARE + DELIMITER; public static final String WAYLINE_JOB_CONDITION_PREFIX = "wayline_job_condition" + DELIMITER; public static final String WAYLINE_JOB_BLOCK_PREFIX = "wayline_job_block" + DELIMITER; src/main/java/com/dji/sample/component/websocket/service/impl/WebSocketManageServiceImpl.java
@@ -75,7 +75,7 @@ return RedisOpsUtils.hashKeys(key) .stream() .map(SESSIONS::get) .filter(Objects::nonNull) .filter(this.getValueWithWorkspace(workspaceId)::contains) .collect(Collectors.toSet()); } src/main/java/com/dji/sample/manage/service/impl/DeviceServiceImpl.java
@@ -518,9 +518,37 @@ } entity.setId(deviceEntity.getId()); mapper.updateById(entity); return Optional.of(deviceEntity); fillNullField(entity, deviceEntity); return Optional.of(entity); } return mapper.insert(entity) > 0 ? Optional.of(entity) : Optional.empty(); } private void fillNullField(DeviceEntity entity, DeviceEntity oldEntity) { if (Objects.isNull(entity) || Objects.isNull(oldEntity)) { return; } if (Objects.isNull(entity.getWorkspaceId())) { entity.setWorkspaceId(oldEntity.getWorkspaceId()); } if (Objects.isNull(entity.getUserId())) { entity.setUserId(oldEntity.getUserId()); } if (Objects.isNull(entity.getChildSn())) { entity.setChildSn(oldEntity.getChildSn()); } if (Objects.isNull(entity.getBoundStatus())) { entity.setBoundStatus(oldEntity.getBoundStatus()); } if (Objects.isNull(entity.getBoundTime())) { entity.setBoundTime(oldEntity.getBoundTime()); } if (Objects.isNull(entity.getFirmwareVersion())) { entity.setFirmwareVersion(oldEntity.getFirmwareVersion()); } if (Objects.isNull(entity.getDeviceIndex())) { entity.setDeviceIndex(oldEntity.getDeviceIndex()); } } /** @@ -650,7 +678,7 @@ return; } if (entity.getFirmwareVersion().equals(firmwareReleaseNoteOpt.get().getProductVersion())) { deviceDTO.setFirmwareStatus(entity.getCompatibleStatus() ? deviceDTO.setFirmwareStatus(Objects.requireNonNullElse(entity.getCompatibleStatus(), true) ? DeviceFirmwareStatusEnum.NOT_UPGRADE.getVal() : DeviceFirmwareStatusEnum.CONSISTENT_UPGRADE.getVal()); return; src/main/java/com/dji/sample/wayline/model/dto/ConditionalWaylineJobKey.java
File was deleted src/main/java/com/dji/sample/wayline/model/dto/WaylineJobDTO.java
@@ -1,5 +1,7 @@ package com.dji.sample.wayline.model.dto; import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum; import com.dji.sample.wayline.model.enums.WaylineTemplateTypeEnum; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -32,9 +34,9 @@ private String workspaceId; private Integer waylineType; private WaylineTemplateTypeEnum waylineType; private Integer taskType; private WaylineTaskTypeEnum taskType; private LocalDateTime executeTime; src/main/java/com/dji/sample/wayline/model/dto/WaylineJobKey.java
New file @@ -0,0 +1,36 @@ package com.dji.sample.wayline.model.dto; import com.dji.sample.component.redis.RedisConst; import lombok.Data; /** * @author sean * @version 1.4 * @date 2023/3/28 */ @Data public class WaylineJobKey { private String workspaceId; private String dockSn; private String jobId; public WaylineJobKey(String workspaceId, String dockSn, String jobId) { this.workspaceId = workspaceId; this.dockSn = dockSn; this.jobId = jobId; } private WaylineJobKey(String[] keyArr) { this(keyArr[0], keyArr[1], keyArr[2]); } public WaylineJobKey(String key) { this(key.split(RedisConst.DELIMITER)); } public String getKey() { return String.join(RedisConst.DELIMITER, workspaceId, dockSn, jobId); } } src/main/java/com/dji/sample/wayline/model/dto/WaylineTaskCreateDTO.java
@@ -1,5 +1,7 @@ package com.dji.sample.wayline.model.dto; import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum; import com.dji.sample.wayline.model.enums.WaylineTemplateTypeEnum; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; @@ -18,9 +20,9 @@ private String flightId; private Integer taskType; private WaylineTaskTypeEnum taskType; private Integer waylineType; private WaylineTemplateTypeEnum waylineType; private Long executeTime; src/main/java/com/dji/sample/wayline/service/IWaylineRedisService.java
@@ -1,8 +1,8 @@ package com.dji.sample.wayline.service; import com.dji.sample.component.mqtt.model.EventsReceiver; import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey; import com.dji.sample.wayline.model.dto.WaylineJobDTO; import com.dji.sample.wayline.model.dto.WaylineJobKey; import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver; import java.util.Optional; @@ -71,6 +71,13 @@ String getBlockedWaylineJobId(String dockSn); /** * Delete the wayline job id blocked by the dock in redis. * @param dockSn * @return */ Boolean delBlockedWaylineJobId(String dockSn); /** * Save the conditional wayline job by the dock to redis. * @param waylineJob */ @@ -90,11 +97,30 @@ */ Boolean delConditionalWaylineJob(String jobId); Boolean addPrepareConditionalWaylineJob(WaylineJobDTO waylineJob); /** * Add the wayline job that needs to be issued. * @param waylineJob * @return */ Boolean addPreparedWaylineJob(WaylineJobDTO waylineJob); Optional<ConditionalWaylineJobKey> getNearestConditionalWaylineJob(); /** * Get the latest wayline job that needs to be issued. * @return */ Optional<WaylineJobKey> getNearestPreparedWaylineJob(); Double getConditionalWaylineJobTime(ConditionalWaylineJobKey jobKey); /** * Get the time when the wayline job is issued. * @param jobKey * @return */ Double getPreparedWaylineJobTime(WaylineJobKey jobKey); Boolean removePrepareConditionalWaylineJob(ConditionalWaylineJobKey jobKey); /** * Delete the wayline job that needs to be issued in redis. * @param jobKey * @return */ Boolean removePreparedWaylineJob(WaylineJobKey jobKey); } src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
@@ -12,10 +12,11 @@ import com.dji.sample.manage.model.enums.UserTypeEnum; import com.dji.sample.manage.service.IDeviceRedisService; import com.dji.sample.media.model.MediaFileCountDTO; import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey; import com.dji.sample.wayline.model.dto.WaylineJobDTO; import com.dji.sample.wayline.model.dto.WaylineJobKey; import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver; import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum; import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum; import com.dji.sample.wayline.service.IFlightTaskService; import com.dji.sample.wayline.service.IWaylineJobService; import com.dji.sample.wayline.service.IWaylineRedisService; @@ -25,7 +26,6 @@ import org.apache.http.HttpStatus; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageHeaders; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Service; @@ -70,9 +70,6 @@ */ @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS) public CommonTopicReceiver handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) { String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); String dockSn = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(), receivedTopic.indexOf(TopicConst.EVENTS_SUF)); EventsReceiver<WaylineTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(), new TypeReference<EventsReceiver<WaylineTaskProgressReceiver>>(){}); eventsReceiver.setBid(receiver.getBid()); @@ -87,40 +84,50 @@ } EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus()); waylineRedisService.setRunningWaylineJob(dockSn, eventsReceiver); if (statusEnum.getEnd()) { WaylineJobDTO job = WaylineJobDTO.builder() .jobId(receiver.getBid()) .status(WaylineJobStatusEnum.SUCCESS.getVal()) .completedTime(LocalDateTime.now()) .mediaCount(output.getExt().getMediaCount()) .build(); // record the update of the media count. if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) { RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(), MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build()); } if (EventsResultStatusEnum.OK != statusEnum) { job.setCode(eventsReceiver.getResult()); job.setStatus(WaylineJobStatusEnum.FAILED.getVal()); } waylineJobService.updateJob(job); waylineRedisService.delRunningWaylineJob(dockSn); waylineRedisService.delPausedWaylineJob(receiver.getBid()); } waylineRedisService.setRunningWaylineJob(receiver.getGateway(), eventsReceiver); Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(receiver.getGateway()); if (deviceOpt.isEmpty()) { return null; } if (statusEnum.getEnd()) { handleEndStatus(receiver, statusEnum, output.getExt().getMediaCount(), eventsReceiver.getResult(), deviceOpt.get()); } websocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(), BizCodeEnum.FLIGHT_TASK_PROGRESS.getCode(), eventsReceiver); return receiver; } private void handleEndStatus(CommonTopicReceiver receiver, EventsResultStatusEnum statusEnum, int mediaCount, int code, DeviceDTO dock) { WaylineJobDTO job = WaylineJobDTO.builder() .jobId(receiver.getBid()) .status(WaylineJobStatusEnum.SUCCESS.getVal()) .completedTime(LocalDateTime.now()) .mediaCount(mediaCount) .build(); // record the update of the media count. if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) { RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(), MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build()); } if (EventsResultStatusEnum.OK != statusEnum) { job.setCode(code); job.setStatus(WaylineJobStatusEnum.FAILED.getVal()); } waylineRedisService.getConditionalWaylineJob(receiver.getBid()).ifPresent(waylineJob -> retryPrepareConditionJob(new WaylineJobKey(dock.getWorkspaceId(), dock.getDeviceSn(), receiver.getBid()), waylineJob)); waylineJobService.updateJob(job); waylineRedisService.delRunningWaylineJob(receiver.getGateway()); waylineRedisService.delPausedWaylineJob(receiver.getBid()); waylineRedisService.delBlockedWaylineJobId(receiver.getGateway()); } /** @@ -130,106 +137,65 @@ */ @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_READY, outputChannel = ChannelName.OUTBOUND_EVENTS) public CommonTopicReceiver handleTaskNotifications(CommonTopicReceiver receiver, MessageHeaders headers) { String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); String dockSn = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(), receivedTopic.indexOf(TopicConst.EVENTS_SUF)); List<String> flightIds = mapper.convertValue(receiver.getData(), new TypeReference<Map<String, List<String>>>(){}).get(MapKeyConst.FLIGHT_IDS); String dockSn = receiver.getGateway(); Set<String> flightIds = mapper.convertValue(receiver.getData(), new TypeReference<Map<String, Set<String>>>(){}).get(MapKeyConst.FLIGHT_IDS); log.info("ready task list:{}", Arrays.toString(flightIds.toArray()) ); log.info("ready task list:{}", Arrays.toString(flightIds.toArray())); // Check conditional task blocking status. String blockedId = waylineRedisService.getBlockedWaylineJobId(dockSn); if (!StringUtils.hasText(blockedId)) { if (StringUtils.hasText(blockedId)) { log.info("The dock is in a state of wayline congestion, and the task will not be executed."); return null; } Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(dockSn); if (deviceOpt.isEmpty()) { log.info("The dock is offline."); return null; } DeviceDTO device = deviceOpt.get(); try { for (String jobId : flightIds) { boolean isExecute = waylineJobService.executeFlightTask(device.getWorkspaceId(), jobId); if (!isExecute) { return null; } Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobId); if (waylineJobOpt.isEmpty()) { log.info("The conditional job has expired and will no longer be executed."); return receiver; } WaylineJobDTO waylineJob = waylineJobOpt.get(); this.retryPrepareJob(new ConditionalWaylineJobKey(device.getWorkspaceId(), dockSn, jobId), waylineJob); return receiver; } } catch (Exception e) { log.error("Failed to execute conditional task."); e.printStackTrace(); Optional<WaylineJobDTO> jobOpt = waylineJobService.getJobsByConditions(device.getWorkspaceId(), flightIds, WaylineJobStatusEnum.PENDING) .stream().filter(job -> flightIds.contains(job.getJobId())) .sorted(Comparator.comparingInt(a -> a.getTaskType().getVal())) .min(Comparator.comparing(WaylineJobDTO::getBeginTime)); if (jobOpt.isEmpty()) { return receiver; } executeReadyTask(jobOpt.get()); return receiver; } @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) private void checkScheduledJob() { Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE); if (Objects.isNull(jobIdValue)) { return; } log.info("Check the timed tasks of the wayline. {}", jobIdValue); // format: {workspace_id}:{dock_sn}:{job_id} String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER); double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); long now = System.currentTimeMillis(); int offset = 30_000; // Expired tasks are deleted directly. if (time < now - offset) { RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); waylineJobService.updateJob(WaylineJobDTO.builder() .jobId(jobArr[2]) .status(WaylineJobStatusEnum.FAILED.getVal()) .executeTime(LocalDateTime.now()) .completedTime(LocalDateTime.now()) .code(HttpStatus.SC_REQUEST_TIMEOUT).build()); return; } if (now <= time && time <= now + offset) { try { waylineJobService.executeFlightTask(jobArr[0], jobArr[2]); } catch (Exception e) { log.info("The scheduled task delivery failed."); waylineJobService.updateJob(WaylineJobDTO.builder() .jobId(jobArr[2]) .status(WaylineJobStatusEnum.FAILED.getVal()) .executeTime(LocalDateTime.now()) .completedTime(LocalDateTime.now()) .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build()); } finally { RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); private void executeReadyTask(WaylineJobDTO waylineJob) { try { boolean isExecute = waylineJobService.executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId()); if (isExecute || WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) { return; } Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(waylineJob.getJobId()); if (waylineJobOpt.isEmpty()) { log.info("The conditional job has expired and will no longer be executed."); return; } waylineJob = waylineJobOpt.get(); this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob); } catch (Exception e) { log.error("Failed to execute task. ID: {}, Name:{}", waylineJob.getJobId(), waylineJob.getJobName()); this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob); e.printStackTrace(); } } @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) private void prepareConditionJob() { Optional<ConditionalWaylineJobKey> jobKeyOpt = waylineRedisService.getNearestConditionalWaylineJob(); private void prepareWaylineJob() { Optional<WaylineJobKey> jobKeyOpt = waylineRedisService.getNearestPreparedWaylineJob(); if (jobKeyOpt.isEmpty()) { return; } ConditionalWaylineJobKey jobKey = jobKeyOpt.get(); log.info("Check the conditional tasks of the wayline. {}", jobKey.toString()); // format: {workspace_id}:{dock_sn}:{job_id} double time = waylineRedisService.getConditionalWaylineJobTime(jobKey); long now = System.currentTimeMillis(); // prepare the task one day in advance. int offset = 86_400_000; if (now + offset < time) { return; } WaylineJobKey jobKey = jobKeyOpt.get(); log.info("Check the prepared tasks of the wayline. {}", jobKey.toString()); WaylineJobDTO job = WaylineJobDTO.builder() .jobId(jobKey.getJobId()) @@ -237,40 +203,65 @@ .executeTime(LocalDateTime.now()) .completedTime(LocalDateTime.now()) .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build(); Optional<WaylineJobDTO> waylineJobOpt = getPreparedJob(jobKey, job); if (waylineJobOpt.isEmpty()) { return; } WaylineJobDTO waylineJob = waylineJobOpt.get(); try { Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId()); if (waylineJobOpt.isEmpty()) { job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode()); waylineJobService.updateJob(job); waylineRedisService.removePrepareConditionalWaylineJob(jobKey); return; } WaylineJobDTO waylineJob = waylineJobOpt.get(); ResponseResult result = waylineJobService.publishOneFlightTask(waylineJob); waylineRedisService.removePrepareConditionalWaylineJob(jobKey); if (ResponseResult.CODE_SUCCESS == result.getCode()) { return; } // If the end time is exceeded, no more retries will be made. waylineRedisService.delConditionalWaylineJob(jobKey.getJobId()); if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - RedisConst.WAYLINE_JOB_BLOCK_TIME * 1000 < now) { return; } // Retry if the end time has not been exceeded. this.retryPrepareJob(jobKey, waylineJob); } catch (Exception e) { log.info("Failed to prepare the conditional task."); log.info("Failed to prepare the task. {}", result.getMessage()); job.setCode(result.getCode()); waylineJobService.updateJob(job); // Retry if the end time has not been exceeded. this.retryPrepareConditionJob(jobKey, waylineJob); } catch (Exception e) { log.info("Failed to prepare the task. {}", e.getLocalizedMessage()); waylineJobService.updateJob(job); this.retryPrepareConditionJob(jobKey, waylineJob); } } private void retryPrepareJob(ConditionalWaylineJobKey jobKey, WaylineJobDTO waylineJob) { private boolean checkTime(long time) { // prepare the task one day in advance. int offset = 86_400_000; return System.currentTimeMillis() + offset >= time; } private Optional<WaylineJobDTO> getPreparedJob(WaylineJobKey jobKey, WaylineJobDTO job) { long time = waylineRedisService.getPreparedWaylineJobTime(jobKey).longValue(); if (!checkTime(time)) { return Optional.empty(); } Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId()); // Determine whether the conditional task or the scheduled task has expired. if (waylineJobOpt.isEmpty()) { waylineJobOpt = waylineJobService.getJobByJobId(jobKey.getWorkspaceId(), jobKey.getJobId()); if (waylineJobOpt.isEmpty() || waylineJobOpt.get().getEndTime().isBefore(LocalDateTime.now())) { job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode()); waylineJobService.updateJob(job); return Optional.empty(); } } waylineRedisService.removePreparedWaylineJob(jobKey); return waylineJobOpt; } private void retryPrepareConditionJob(WaylineJobKey jobKey, WaylineJobDTO waylineJob) { if (WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) { return; } // If the end time is exceeded, no more retries will be made. waylineRedisService.delConditionalWaylineJob(jobKey.getJobId()); if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() < System.currentTimeMillis()) { return; } Optional<WaylineJobDTO> childJobOpt = waylineJobService.createWaylineJobByParent(jobKey.getWorkspaceId(), jobKey.getJobId()); if (childJobOpt.isEmpty()) { log.error("Failed to create wayline job."); @@ -279,7 +270,7 @@ WaylineJobDTO newJob = childJobOpt.get(); newJob.setBeginTime(LocalDateTime.now().plusSeconds(RedisConst.WAYLINE_JOB_BLOCK_TIME)); boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(newJob); boolean isAdd = waylineRedisService.addPreparedWaylineJob(newJob); if (!isAdd) { log.error("Failed to create wayline job. {}", newJob.getJobId()); return; src/main/java/com/dji/sample/wayline/service/impl/WaylineJobServiceImpl.java
@@ -27,10 +27,7 @@ import com.dji.sample.wayline.dao.IWaylineJobMapper; import com.dji.sample.wayline.model.dto.*; import com.dji.sample.wayline.model.entity.WaylineJobEntity; import com.dji.sample.wayline.model.enums.WaylineErrorCodeEnum; import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum; import com.dji.sample.wayline.model.enums.WaylineMethodEnum; import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum; import com.dji.sample.wayline.model.enums.*; import com.dji.sample.wayline.model.param.CreateJobParam; import com.dji.sample.wayline.model.param.UpdateJobParam; import com.dji.sample.wayline.service.IWaylineFileService; @@ -152,71 +149,67 @@ return; } long now = System.currentTimeMillis() / 1000; if (CollectionUtils.isEmpty(param.getTaskDays())) { param.setTaskDays(List.of(now)); } if (CollectionUtils.isEmpty(param.getTaskPeriods())) { param.setTaskPeriods(List.of(List.of(now))); } param.setTaskDays(Collections.singletonList(now)); param.setTaskPeriods(Collections.singletonList(Collections.singletonList(now))); } @Override public ResponseResult publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException { fillImmediateTime(param); param.getTaskDays().sort((a, b) -> (int) (a - b)); param.getTaskPeriods().sort((a, b) -> (int) (a.get(0) - b.get(0))); for (Long taskDay : param.getTaskDays()) { LocalDate date = LocalDate.ofInstant(Instant.ofEpochSecond(taskDay), ZoneId.systemDefault()); for (List<Long> taskPeriod : param.getTaskPeriods()) { long beginTime = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(0)), ZoneId.systemDefault())) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); long endTime = taskPeriod.size() > 1 ? long endTime = taskPeriod.size() > 1 && Objects.nonNull(taskPeriod.get(1)) ? LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(1)), ZoneId.systemDefault())) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() : beginTime; if (WaylineTaskTypeEnum.IMMEDIATE != param.getTaskType() && endTime < System.currentTimeMillis()) { return ResponseResult.error("The task has expired."); } Optional<WaylineJobDTO> waylineJobOpt = this.createWaylineJob(param, customClaim.getWorkspaceId(), customClaim.getUsername(), beginTime, endTime); if (waylineJobOpt.isEmpty()) { throw new SQLException("Failed to create wayline job."); return ResponseResult.error("Failed to create wayline job."); } WaylineJobDTO waylineJob = waylineJobOpt.get(); // If it is a conditional task type, add conditions to the job parameters. addConditions(waylineJob, param, beginTime, endTime); if (WaylineTaskTypeEnum.IMMEDIATE == param.getTaskType()) { return this.publishOneFlightTask(waylineJob); } return this.publishOneFlightTask(waylineJob); // If it is a conditional task type, add conditions to the job parameters. addPreparedJob(waylineJob, param, beginTime, endTime); } } return ResponseResult.error(); return ResponseResult.success(); } private void addConditions(WaylineJobDTO waylineJob, CreateJobParam param, Long beginTime, Long endTime) { if (WaylineTaskTypeEnum.CONDITION != param.getTaskType()) { return; private void addPreparedJob(WaylineJobDTO waylineJob, CreateJobParam param, Long beginTime, Long endTime) { if (WaylineTaskTypeEnum.CONDITION == param.getTaskType()) { waylineJob.setConditions( WaylineTaskConditionDTO.builder() .executableConditions(Objects.nonNull(param.getMinStorageCapacity()) ? WaylineTaskExecutableConditionDTO.builder().storageCapacity(param.getMinStorageCapacity()).build() : null) .readyConditions(WaylineTaskReadyConditionDTO.builder() .batteryCapacity(param.getMinBatteryCapacity()) .beginTime(beginTime) .endTime(endTime) .build()) .build()); waylineRedisService.setConditionalWaylineJob(waylineJob); } waylineJob.setConditions( WaylineTaskConditionDTO.builder() .executableConditions(Objects.nonNull(param.getMinStorageCapacity()) ? WaylineTaskExecutableConditionDTO.builder().storageCapacity(param.getMinStorageCapacity()).build() : null) .readyConditions(WaylineTaskReadyConditionDTO.builder() .batteryCapacity(param.getMinBatteryCapacity()) .beginTime(beginTime) .endTime(endTime) .build()) .build()); waylineRedisService.setConditionalWaylineJob(waylineJob); // key: wayline_job_condition, value: {workspace_id}:{dock_sn}:{job_id} boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(waylineJob); // value: {workspace_id}:{dock_sn}:{job_id} boolean isAdd = waylineRedisService.addPreparedWaylineJob(waylineJob); if (!isAdd) { throw new RuntimeException("Failed to create conditional job."); throw new RuntimeException("Failed to create prepare job."); } } public ResponseResult publishOneFlightTask(WaylineJobDTO waylineJob) throws SQLException { boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.getDockSn()); if (!isOnline) { throw new RuntimeException("Dock is offline."); } boolean isSuccess = this.prepareFlightTask(waylineJob); if (!isSuccess) { @@ -224,19 +217,10 @@ } // Issue an immediate task execution command. if (WaylineTaskTypeEnum.IMMEDIATE.getVal() == waylineJob.getTaskType()) { if (!executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId())) { if (WaylineTaskTypeEnum.IMMEDIATE == waylineJob.getTaskType()) { boolean isExecuted = executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId()); if (!isExecuted) { return ResponseResult.error("Failed to execute job."); } } if (WaylineTaskTypeEnum.TIMED.getVal() == waylineJob.getTaskType()) { // key: wayline_job_timed, value: {workspace_id}:{dock_sn}:{job_id} boolean isAdd = RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, waylineJob.getWorkspaceId() + RedisConst.DELIMITER + waylineJob.getDockSn() + RedisConst.DELIMITER + waylineJob.getJobId(), waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); if (!isAdd) { return ResponseResult.error("Failed to create scheduled job."); } } @@ -244,6 +228,12 @@ } private Boolean prepareFlightTask(WaylineJobDTO waylineJob) throws SQLException { boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.getDockSn()); if (!isOnline) { throw new RuntimeException("Dock is offline."); } // get wayline file Optional<WaylineFileDTO> waylineFile = waylineFileService.getWaylineByWaylineId(waylineJob.getWorkspaceId(), waylineJob.getFileId()); if (waylineFile.isEmpty()) { @@ -266,7 +256,7 @@ .build()) .build(); if (WaylineTaskTypeEnum.CONDITION.getVal() == waylineJob.getTaskType()) { if (WaylineTaskTypeEnum.CONDITION == waylineJob.getTaskType()) { if (Objects.isNull(waylineJob.getConditions())) { throw new IllegalArgumentException(); } @@ -317,7 +307,7 @@ .completedTime(LocalDateTime.now()) .code(serviceReply.getResult()).build()); // The conditional task fails and enters the blocking status. if (WaylineTaskTypeEnum.CONDITION.getVal() == job.getTaskType() if (WaylineTaskTypeEnum.CONDITION == job.getTaskType() && WaylineErrorCodeEnum.find(serviceReply.getResult()).isBlock()) { waylineRedisService.setBlockedWaylineJob(job.getDockSn(), jobId); } @@ -372,7 +362,6 @@ .status(WaylineJobStatusEnum.CANCEL.getVal()) .completedTime(LocalDateTime.now()) .build()); RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, workspaceId + RedisConst.DELIMITER + dockSn + RedisConst.DELIMITER + jobId); } } @@ -382,8 +371,7 @@ new LambdaQueryWrapper<WaylineJobEntity>() .eq(WaylineJobEntity::getWorkspaceId, workspaceId) .eq(Objects.nonNull(status), WaylineJobEntity::getStatus, status.getVal()) .and(!CollectionUtils.isEmpty(jobIds), wrapper -> jobIds.forEach(id -> wrapper.eq(WaylineJobEntity::getJobId, id).or()))) .in(!CollectionUtils.isEmpty(jobIds), WaylineJobEntity::getJobId, jobIds)) .stream() .map(this::entity2Dto) .collect(Collectors.toList()); @@ -524,8 +512,8 @@ .fileId(dto.getFileId()) .dockSn(dto.getDockSn()) .workspaceId(dto.getWorkspaceId()) .taskType(dto.getTaskType()) .waylineType(dto.getWaylineType()) .taskType(Optional.ofNullable(dto.getTaskType()).map(WaylineTaskTypeEnum::getVal).orElse(null)) .waylineType(Optional.ofNullable(dto.getWaylineType()).map(WaylineTemplateTypeEnum::getVal).orElse(null)) .username(dto.getUsername()) .rthAltitude(dto.getRthAltitude()) .outOfControlAction(dto.getOutOfControlAction()) @@ -645,8 +633,8 @@ LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getExecuteTime()), ZoneId.systemDefault()) : null) .completedTime(WaylineJobStatusEnum.find(entity.getStatus()).getEnd() ? LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getUpdateTime()), ZoneId.systemDefault()) : null) .taskType(entity.getTaskType()) .waylineType(entity.getWaylineType()) .taskType(WaylineTaskTypeEnum.find(entity.getTaskType())) .waylineType(WaylineTemplateTypeEnum.find(entity.getWaylineType())) .rthAltitude(entity.getRthAltitude()) .outOfControlAction(entity.getOutOfControlAction()) .mediaCount(entity.getMediaCount()); src/main/java/com/dji/sample/wayline/service/impl/WaylineRedisServiceImpl.java
@@ -3,8 +3,8 @@ import com.dji.sample.component.mqtt.model.EventsReceiver; import com.dji.sample.component.redis.RedisConst; import com.dji.sample.component.redis.RedisOpsUtils; import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey; import com.dji.sample.wayline.model.dto.WaylineJobDTO; import com.dji.sample.wayline.model.dto.WaylineJobKey; import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver; import com.dji.sample.wayline.service.IWaylineRedisService; import org.springframework.stereotype.Service; @@ -65,12 +65,17 @@ } @Override public Boolean delBlockedWaylineJobId(String dockSn) { return RedisOpsUtils.del(RedisConst.WAYLINE_JOB_BLOCK_PREFIX + dockSn); } @Override public void setConditionalWaylineJob(WaylineJobDTO waylineJob) { if (!StringUtils.hasText(waylineJob.getJobId())) { throw new RuntimeException("Job id can't be null."); } RedisOpsUtils.setWithExpire(RedisConst.WAYLINE_JOB_CONDITION_PREFIX + waylineJob.getJobId(), waylineJob, (Duration.between(waylineJob.getEndTime(), LocalDateTime.now()).getSeconds())); Math.abs(Duration.between(waylineJob.getEndTime(), LocalDateTime.now()).getSeconds())); } @Override @@ -84,29 +89,29 @@ } @Override public Boolean addPrepareConditionalWaylineJob(WaylineJobDTO waylineJob) { public Boolean addPreparedWaylineJob(WaylineJobDTO waylineJob) { if (Objects.isNull(waylineJob.getBeginTime())) { return false; } // value: {workspace_id}:{dock_sn}:{job_id} return RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB_CONDITION_PREPARE, return RedisOpsUtils.zAdd(RedisConst.WAYLINE_JOB_PREPARED, waylineJob.getWorkspaceId() + RedisConst.DELIMITER + waylineJob.getDockSn() + RedisConst.DELIMITER + waylineJob.getJobId(), waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); } @Override public Optional<ConditionalWaylineJobKey> getNearestConditionalWaylineJob() { return Optional.ofNullable(RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_CONDITION_PREPARE)) .map(Object::toString).map(ConditionalWaylineJobKey::new); public Optional<WaylineJobKey> getNearestPreparedWaylineJob() { return Optional.ofNullable(RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_PREPARED)) .map(Object::toString).map(WaylineJobKey::new); } @Override public Double getConditionalWaylineJobTime(ConditionalWaylineJobKey jobKey) { return RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_CONDITION_PREPARE, jobKey.getKey()); public Double getPreparedWaylineJobTime(WaylineJobKey jobKey) { return RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_PREPARED, jobKey.getKey()); } @Override public Boolean removePrepareConditionalWaylineJob(ConditionalWaylineJobKey jobKey) { return RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_CONDITION_PREPARE, jobKey.getKey()); public Boolean removePreparedWaylineJob(WaylineJobKey jobKey) { return RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_PREPARED, jobKey.getKey()); } } src/main/resources/hms.json
Diff too large