From 642dac3e705380eea241648f875836e64dbc809e Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Thu, 18 May 2023 17:43:06 +0800
Subject: [PATCH] Merge branch 'v1.5.0' What's new? 1. Add new model: DJI Matrices 350 RTK. 2. Update file hms.json. 3. Fixed some issues.
---
src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java | 255 ++++++++++++++++++++++++--------------------------
1 files changed, 123 insertions(+), 132 deletions(-)
diff --git a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
index 9418b94..893b0bc 100644
--- a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
+++ b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
@@ -12,10 +12,11 @@
import com.dji.sample.manage.model.enums.UserTypeEnum;
import com.dji.sample.manage.service.IDeviceRedisService;
import com.dji.sample.media.model.MediaFileCountDTO;
-import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey;
import com.dji.sample.wayline.model.dto.WaylineJobDTO;
+import com.dji.sample.wayline.model.dto.WaylineJobKey;
import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver;
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
+import com.dji.sample.wayline.model.enums.WaylineTaskTypeEnum;
import com.dji.sample.wayline.service.IFlightTaskService;
import com.dji.sample.wayline.service.IWaylineJobService;
import com.dji.sample.wayline.service.IWaylineRedisService;
@@ -25,7 +26,6 @@
import org.apache.http.HttpStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
@@ -70,9 +70,6 @@
*/
@ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS)
public CommonTopicReceiver handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
- String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
- String dockSn = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
- receivedTopic.indexOf(TopicConst.EVENTS_SUF));
EventsReceiver<WaylineTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
new TypeReference<EventsReceiver<WaylineTaskProgressReceiver>>(){});
eventsReceiver.setBid(receiver.getBid());
@@ -87,40 +84,50 @@
}
EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
- waylineRedisService.setRunningWaylineJob(dockSn, eventsReceiver);
-
- if (statusEnum.getEnd()) {
- WaylineJobDTO job = WaylineJobDTO.builder()
- .jobId(receiver.getBid())
- .status(WaylineJobStatusEnum.SUCCESS.getVal())
- .completedTime(LocalDateTime.now())
- .mediaCount(output.getExt().getMediaCount())
- .build();
-
- // record the update of the media count.
- if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) {
- RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(),
- MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build());
- }
-
- if (EventsResultStatusEnum.OK != statusEnum) {
- job.setCode(eventsReceiver.getResult());
- job.setStatus(WaylineJobStatusEnum.FAILED.getVal());
- }
-
- waylineJobService.updateJob(job);
- waylineRedisService.delRunningWaylineJob(dockSn);
- waylineRedisService.delPausedWaylineJob(receiver.getBid());
- }
+ waylineRedisService.setRunningWaylineJob(receiver.getGateway(), eventsReceiver);
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(receiver.getGateway());
if (deviceOpt.isEmpty()) {
return null;
}
+
+ if (statusEnum.getEnd()) {
+ handleEndStatus(receiver, statusEnum, output.getExt().getMediaCount(), eventsReceiver.getResult(), deviceOpt.get());
+ }
+
websocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(),
BizCodeEnum.FLIGHT_TASK_PROGRESS.getCode(), eventsReceiver);
return receiver;
+ }
+
+ private void handleEndStatus(CommonTopicReceiver receiver, EventsResultStatusEnum statusEnum, int mediaCount, int code, DeviceDTO dock) {
+
+ WaylineJobDTO job = WaylineJobDTO.builder()
+ .jobId(receiver.getBid())
+ .status(WaylineJobStatusEnum.SUCCESS.getVal())
+ .completedTime(LocalDateTime.now())
+ .mediaCount(mediaCount)
+ .build();
+
+ // record the update of the media count.
+ if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) {
+ RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(),
+ MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build());
+ }
+
+ if (EventsResultStatusEnum.OK != statusEnum) {
+ job.setCode(code);
+ job.setStatus(WaylineJobStatusEnum.FAILED.getVal());
+ }
+
+ waylineRedisService.getConditionalWaylineJob(receiver.getBid()).ifPresent(waylineJob ->
+ retryPrepareConditionJob(new WaylineJobKey(dock.getWorkspaceId(), dock.getDeviceSn(), receiver.getBid()), waylineJob));
+ waylineJobService.updateJob(job);
+ waylineRedisService.delRunningWaylineJob(receiver.getGateway());
+ waylineRedisService.delPausedWaylineJob(receiver.getBid());
+ waylineRedisService.delBlockedWaylineJobId(receiver.getGateway());
+
}
/**
@@ -130,106 +137,65 @@
*/
@ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_READY, outputChannel = ChannelName.OUTBOUND_EVENTS)
public CommonTopicReceiver handleTaskNotifications(CommonTopicReceiver receiver, MessageHeaders headers) {
- String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
- String dockSn = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
- receivedTopic.indexOf(TopicConst.EVENTS_SUF));
- List<String> flightIds = mapper.convertValue(receiver.getData(),
- new TypeReference<Map<String, List<String>>>(){}).get(MapKeyConst.FLIGHT_IDS);
+ String dockSn = receiver.getGateway();
+ Set<String> flightIds = mapper.convertValue(receiver.getData(),
+ new TypeReference<Map<String, Set<String>>>(){}).get(MapKeyConst.FLIGHT_IDS);
- log.info("ready task list:{}", Arrays.toString(flightIds.toArray()) );
+ log.info("ready task list:{}", Arrays.toString(flightIds.toArray()));
// Check conditional task blocking status.
String blockedId = waylineRedisService.getBlockedWaylineJobId(dockSn);
- if (!StringUtils.hasText(blockedId)) {
+ if (StringUtils.hasText(blockedId)) {
+ log.info("The dock is in a state of wayline congestion, and the task will not be executed.");
return null;
}
Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(dockSn);
if (deviceOpt.isEmpty()) {
+ log.info("The dock is offline.");
return null;
}
DeviceDTO device = deviceOpt.get();
-
- try {
- for (String jobId : flightIds) {
- boolean isExecute = waylineJobService.executeFlightTask(device.getWorkspaceId(), jobId);
- if (!isExecute) {
- return null;
- }
- Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobId);
- if (waylineJobOpt.isEmpty()) {
- log.info("The conditional job has expired and will no longer be executed.");
- return receiver;
- }
- WaylineJobDTO waylineJob = waylineJobOpt.get();
- this.retryPrepareJob(new ConditionalWaylineJobKey(device.getWorkspaceId(), dockSn, jobId), waylineJob);
- return receiver;
- }
- } catch (Exception e) {
- log.error("Failed to execute conditional task.");
- e.printStackTrace();
+ Optional<WaylineJobDTO> jobOpt = waylineJobService.getJobsByConditions(device.getWorkspaceId(), flightIds, WaylineJobStatusEnum.PENDING)
+ .stream().filter(job -> flightIds.contains(job.getJobId()))
+ .sorted(Comparator.comparingInt(a -> a.getTaskType().getVal()))
+ .min(Comparator.comparing(WaylineJobDTO::getBeginTime));
+ if (jobOpt.isEmpty()) {
+ return receiver;
}
+ executeReadyTask(jobOpt.get());
+
return receiver;
}
- @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
- private void checkScheduledJob() {
- Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE);
- if (Objects.isNull(jobIdValue)) {
- return;
- }
- log.info("Check the timed tasks of the wayline. {}", jobIdValue);
- // format: {workspace_id}:{dock_sn}:{job_id}
- String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER);
- double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
- long now = System.currentTimeMillis();
- int offset = 30_000;
-
- // Expired tasks are deleted directly.
- if (time < now - offset) {
- RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
- waylineJobService.updateJob(WaylineJobDTO.builder()
- .jobId(jobArr[2])
- .status(WaylineJobStatusEnum.FAILED.getVal())
- .executeTime(LocalDateTime.now())
- .completedTime(LocalDateTime.now())
- .code(HttpStatus.SC_REQUEST_TIMEOUT).build());
- return;
- }
-
- if (now <= time && time <= now + offset) {
- try {
- waylineJobService.executeFlightTask(jobArr[0], jobArr[2]);
- } catch (Exception e) {
- log.info("The scheduled task delivery failed.");
- waylineJobService.updateJob(WaylineJobDTO.builder()
- .jobId(jobArr[2])
- .status(WaylineJobStatusEnum.FAILED.getVal())
- .executeTime(LocalDateTime.now())
- .completedTime(LocalDateTime.now())
- .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build());
- } finally {
- RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
+ private void executeReadyTask(WaylineJobDTO waylineJob) {
+ try {
+ boolean isExecute = waylineJobService.executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId());
+ if (isExecute || WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) {
+ return;
}
+ Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(waylineJob.getJobId());
+ if (waylineJobOpt.isEmpty()) {
+ log.info("The conditional job has expired and will no longer be executed.");
+ return;
+ }
+ waylineJob = waylineJobOpt.get();
+ this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob);
+ } catch (Exception e) {
+ log.error("Failed to execute task. ID: {}, Name:{}", waylineJob.getJobId(), waylineJob.getJobName());
+ this.retryPrepareConditionJob(new WaylineJobKey(waylineJob.getWorkspaceId(), waylineJob.getDockSn(), waylineJob.getJobId()), waylineJob);
+ e.printStackTrace();
}
}
@Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
- private void prepareConditionJob() {
- Optional<ConditionalWaylineJobKey> jobKeyOpt = waylineRedisService.getNearestConditionalWaylineJob();
+ private void prepareWaylineJob() {
+ Optional<WaylineJobKey> jobKeyOpt = waylineRedisService.getNearestPreparedWaylineJob();
if (jobKeyOpt.isEmpty()) {
return;
}
- ConditionalWaylineJobKey jobKey = jobKeyOpt.get();
- log.info("Check the conditional tasks of the wayline. {}", jobKey.toString());
// format: {workspace_id}:{dock_sn}:{job_id}
- double time = waylineRedisService.getConditionalWaylineJobTime(jobKey);
- long now = System.currentTimeMillis();
- // prepare the task one day in advance.
- int offset = 86_400_000;
-
- if (now + offset < time) {
- return;
- }
+ WaylineJobKey jobKey = jobKeyOpt.get();
+ log.info("Check the prepared tasks of the wayline. {}", jobKey.toString());
WaylineJobDTO job = WaylineJobDTO.builder()
.jobId(jobKey.getJobId())
@@ -237,40 +203,65 @@
.executeTime(LocalDateTime.now())
.completedTime(LocalDateTime.now())
.code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build();
+ Optional<WaylineJobDTO> waylineJobOpt = getPreparedJob(jobKey, job);
+ if (waylineJobOpt.isEmpty()) {
+ return;
+ }
+
+ WaylineJobDTO waylineJob = waylineJobOpt.get();
try {
- Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId());
- if (waylineJobOpt.isEmpty()) {
- job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode());
- waylineJobService.updateJob(job);
- waylineRedisService.removePrepareConditionalWaylineJob(jobKey);
- return;
- }
- WaylineJobDTO waylineJob = waylineJobOpt.get();
-
ResponseResult result = waylineJobService.publishOneFlightTask(waylineJob);
- waylineRedisService.removePrepareConditionalWaylineJob(jobKey);
-
if (ResponseResult.CODE_SUCCESS == result.getCode()) {
return;
}
-
- // If the end time is exceeded, no more retries will be made.
- waylineRedisService.delConditionalWaylineJob(jobKey.getJobId());
- if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - RedisConst.WAYLINE_JOB_BLOCK_TIME * 1000 < now) {
- return;
- }
-
- // Retry if the end time has not been exceeded.
- this.retryPrepareJob(jobKey, waylineJob);
-
- } catch (Exception e) {
- log.info("Failed to prepare the conditional task.");
+ log.info("Failed to prepare the task. {}", result.getMessage());
+ job.setCode(result.getCode());
waylineJobService.updateJob(job);
+ // Retry if the end time has not been exceeded.
+ this.retryPrepareConditionJob(jobKey, waylineJob);
+ } catch (Exception e) {
+ log.info("Failed to prepare the task. {}", e.getLocalizedMessage());
+ waylineJobService.updateJob(job);
+ this.retryPrepareConditionJob(jobKey, waylineJob);
}
-
}
- private void retryPrepareJob(ConditionalWaylineJobKey jobKey, WaylineJobDTO waylineJob) {
+ private boolean checkTime(long time) {
+ // prepare the task one day in advance.
+ int offset = 86_400_000;
+ return System.currentTimeMillis() + offset >= time;
+ }
+
+ private Optional<WaylineJobDTO> getPreparedJob(WaylineJobKey jobKey, WaylineJobDTO job) {
+ long time = waylineRedisService.getPreparedWaylineJobTime(jobKey).longValue();
+ if (!checkTime(time)) {
+ return Optional.empty();
+ }
+
+ Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId());
+ // Determine whether the conditional task or the scheduled task has expired.
+ if (waylineJobOpt.isEmpty()) {
+ waylineJobOpt = waylineJobService.getJobByJobId(jobKey.getWorkspaceId(), jobKey.getJobId());
+ if (waylineJobOpt.isEmpty() || waylineJobOpt.get().getEndTime().isBefore(LocalDateTime.now())) {
+ job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode());
+ waylineJobService.updateJob(job);
+ return Optional.empty();
+ }
+ }
+ waylineRedisService.removePreparedWaylineJob(jobKey);
+ return waylineJobOpt;
+ }
+
+ private void retryPrepareConditionJob(WaylineJobKey jobKey, WaylineJobDTO waylineJob) {
+ if (WaylineTaskTypeEnum.CONDITION != waylineJob.getTaskType()) {
+ return;
+ }
+ // If the end time is exceeded, no more retries will be made.
+ waylineRedisService.delConditionalWaylineJob(jobKey.getJobId());
+ if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() < System.currentTimeMillis()) {
+ return;
+ }
+
Optional<WaylineJobDTO> childJobOpt = waylineJobService.createWaylineJobByParent(jobKey.getWorkspaceId(), jobKey.getJobId());
if (childJobOpt.isEmpty()) {
log.error("Failed to create wayline job.");
@@ -279,7 +270,7 @@
WaylineJobDTO newJob = childJobOpt.get();
newJob.setBeginTime(LocalDateTime.now().plusSeconds(RedisConst.WAYLINE_JOB_BLOCK_TIME));
- boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(newJob);
+ boolean isAdd = waylineRedisService.addPreparedWaylineJob(newJob);
if (!isAdd) {
log.error("Failed to create wayline job. {}", newJob.getJobId());
return;
--
Gitblit v1.9.3