From 694b9483c7a551626244cbc222c602ea9ff74094 Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Tue, 25 Apr 2023 21:44:00 +0800
Subject: [PATCH] What's new? 1. Wayline management: added `pause wayline task` and `recover wayline task`. 2. Added command flight function. 3. Fixed some issues.
---
src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java | 184 ++++++++++++++++++++++++++++++++++++++-------
1 files changed, 153 insertions(+), 31 deletions(-)
diff --git a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
index 224602d..9418b94 100644
--- a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
+++ b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
@@ -1,22 +1,24 @@
package com.dji.sample.wayline.service.impl;
+import com.dji.sample.common.error.CommonErrorEnum;
import com.dji.sample.common.model.ResponseResult;
import com.dji.sample.component.mqtt.model.*;
import com.dji.sample.component.mqtt.service.IMessageSenderService;
import com.dji.sample.component.redis.RedisConst;
import com.dji.sample.component.redis.RedisOpsUtils;
import com.dji.sample.component.websocket.model.BizCodeEnum;
-import com.dji.sample.component.websocket.model.CustomWebSocketMessage;
import com.dji.sample.component.websocket.service.ISendMessageService;
-import com.dji.sample.component.websocket.service.IWebSocketManageService;
import com.dji.sample.manage.model.dto.DeviceDTO;
import com.dji.sample.manage.model.enums.UserTypeEnum;
+import com.dji.sample.manage.service.IDeviceRedisService;
import com.dji.sample.media.model.MediaFileCountDTO;
+import com.dji.sample.wayline.model.dto.ConditionalWaylineJobKey;
import com.dji.sample.wayline.model.dto.WaylineJobDTO;
import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver;
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
import com.dji.sample.wayline.service.IFlightTaskService;
import com.dji.sample.wayline.service.IWaylineJobService;
+import com.dji.sample.wayline.service.IWaylineRedisService;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
@@ -27,8 +29,10 @@
import org.springframework.messaging.MessageHeaders;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
+import org.springframework.util.StringUtils;
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.util.*;
import java.util.concurrent.TimeUnit;
@@ -51,14 +55,21 @@
private ISendMessageService websocketMessageService;
@Autowired
- private IWebSocketManageService webSocketManageService;
-
- @Autowired
private IWaylineJobService waylineJobService;
- @Override
- @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND)
- public void handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
+ @Autowired
+ private IDeviceRedisService deviceRedisService;
+
+ @Autowired
+ private IWaylineRedisService waylineRedisService;
+
+ /**
+ * Handle the progress messages of the flight tasks reported by the dock.
+ * @param receiver
+ * @param headers
+ */
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS)
+ public CommonTopicReceiver handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
String dockSn = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
receivedTopic.indexOf(TopicConst.EVENTS_SUF));
@@ -76,8 +87,7 @@
}
EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
- String key = RedisConst.WAYLINE_JOB_RUNNING_PREFIX + dockSn;
- RedisOpsUtils.setWithExpire(key, eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
+ waylineRedisService.setRunningWaylineJob(dockSn, eventsReceiver);
if (statusEnum.getEnd()) {
WaylineJobDTO job = WaylineJobDTO.builder()
@@ -99,30 +109,66 @@
}
waylineJobService.updateJob(job);
- RedisOpsUtils.del(key);
- RedisOpsUtils.del(RedisConst.WAYLINE_JOB_PAUSED_PREFIX + receiver.getBid());
+ waylineRedisService.delRunningWaylineJob(dockSn);
+ waylineRedisService.delPausedWaylineJob(receiver.getBid());
}
- DeviceDTO device = (DeviceDTO) RedisOpsUtils.get(RedisConst.DEVICE_ONLINE_PREFIX + receiver.getGateway());
- websocketMessageService.sendBatch(
- webSocketManageService.getValueWithWorkspaceAndUserType(
- device.getWorkspaceId(), UserTypeEnum.WEB.getVal()),
- CustomWebSocketMessage.builder()
- .data(eventsReceiver)
- .timestamp(System.currentTimeMillis())
- .bizCode(BizCodeEnum.FLIGHT_TASK_PROGRESS.getCode())
- .build());
-
- if (receiver.getNeedReply() == 1) {
- messageSender.publish(receivedTopic + TopicConst._REPLY_SUF,
- CommonTopicResponse.builder()
- .tid(receiver.getTid())
- .bid(receiver.getBid())
- .method(EventsMethodEnum.FLIGHT_TASK_PROGRESS.getMethod())
- .timestamp(System.currentTimeMillis())
- .data(RequestsReply.success())
- .build());
+ Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(receiver.getGateway());
+ if (deviceOpt.isEmpty()) {
+ return null;
}
+ websocketMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(),
+ BizCodeEnum.FLIGHT_TASK_PROGRESS.getCode(), eventsReceiver);
+
+ return receiver;
+ }
+
+ /**
+ * Notifications will be received through this interface when tasks are ready on the device.
+ * @param receiver
+ * @param headers
+ */
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_READY, outputChannel = ChannelName.OUTBOUND_EVENTS)
+ public CommonTopicReceiver handleTaskNotifications(CommonTopicReceiver receiver, MessageHeaders headers) {
+ String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
+ String dockSn = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
+ receivedTopic.indexOf(TopicConst.EVENTS_SUF));
+ List<String> flightIds = mapper.convertValue(receiver.getData(),
+ new TypeReference<Map<String, List<String>>>(){}).get(MapKeyConst.FLIGHT_IDS);
+
+ log.info("ready task list:{}", Arrays.toString(flightIds.toArray()) );
+ // Check conditional task blocking status.
+ String blockedId = waylineRedisService.getBlockedWaylineJobId(dockSn);
+ if (!StringUtils.hasText(blockedId)) {
+ return null;
+ }
+
+ Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(dockSn);
+ if (deviceOpt.isEmpty()) {
+ return null;
+ }
+ DeviceDTO device = deviceOpt.get();
+
+ try {
+ for (String jobId : flightIds) {
+ boolean isExecute = waylineJobService.executeFlightTask(device.getWorkspaceId(), jobId);
+ if (!isExecute) {
+ return null;
+ }
+ Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobId);
+ if (waylineJobOpt.isEmpty()) {
+ log.info("The conditional job has expired and will no longer be executed.");
+ return receiver;
+ }
+ WaylineJobDTO waylineJob = waylineJobOpt.get();
+ this.retryPrepareJob(new ConditionalWaylineJobKey(device.getWorkspaceId(), dockSn, jobId), waylineJob);
+ return receiver;
+ }
+ } catch (Exception e) {
+ log.error("Failed to execute conditional task.");
+ e.printStackTrace();
+ }
+ return receiver;
}
@Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
@@ -166,4 +212,80 @@
}
}
}
+
+ @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
+ private void prepareConditionJob() {
+ Optional<ConditionalWaylineJobKey> jobKeyOpt = waylineRedisService.getNearestConditionalWaylineJob();
+ if (jobKeyOpt.isEmpty()) {
+ return;
+ }
+ ConditionalWaylineJobKey jobKey = jobKeyOpt.get();
+ log.info("Check the conditional tasks of the wayline. {}", jobKey.toString());
+ // format: {workspace_id}:{dock_sn}:{job_id}
+ double time = waylineRedisService.getConditionalWaylineJobTime(jobKey);
+ long now = System.currentTimeMillis();
+ // prepare the task one day in advance.
+ int offset = 86_400_000;
+
+ if (now + offset < time) {
+ return;
+ }
+
+ WaylineJobDTO job = WaylineJobDTO.builder()
+ .jobId(jobKey.getJobId())
+ .status(WaylineJobStatusEnum.FAILED.getVal())
+ .executeTime(LocalDateTime.now())
+ .completedTime(LocalDateTime.now())
+ .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build();
+ try {
+ Optional<WaylineJobDTO> waylineJobOpt = waylineRedisService.getConditionalWaylineJob(jobKey.getJobId());
+ if (waylineJobOpt.isEmpty()) {
+ job.setCode(CommonErrorEnum.REDIS_DATA_NOT_FOUND.getErrorCode());
+ waylineJobService.updateJob(job);
+ waylineRedisService.removePrepareConditionalWaylineJob(jobKey);
+ return;
+ }
+ WaylineJobDTO waylineJob = waylineJobOpt.get();
+
+ ResponseResult result = waylineJobService.publishOneFlightTask(waylineJob);
+ waylineRedisService.removePrepareConditionalWaylineJob(jobKey);
+
+ if (ResponseResult.CODE_SUCCESS == result.getCode()) {
+ return;
+ }
+
+ // If the end time is exceeded, no more retries will be made.
+ waylineRedisService.delConditionalWaylineJob(jobKey.getJobId());
+ if (waylineJob.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() - RedisConst.WAYLINE_JOB_BLOCK_TIME * 1000 < now) {
+ return;
+ }
+
+ // Retry if the end time has not been exceeded.
+ this.retryPrepareJob(jobKey, waylineJob);
+
+ } catch (Exception e) {
+ log.info("Failed to prepare the conditional task.");
+ waylineJobService.updateJob(job);
+ }
+
+ }
+
+ private void retryPrepareJob(ConditionalWaylineJobKey jobKey, WaylineJobDTO waylineJob) {
+ Optional<WaylineJobDTO> childJobOpt = waylineJobService.createWaylineJobByParent(jobKey.getWorkspaceId(), jobKey.getJobId());
+ if (childJobOpt.isEmpty()) {
+ log.error("Failed to create wayline job.");
+ return;
+ }
+
+ WaylineJobDTO newJob = childJobOpt.get();
+ newJob.setBeginTime(LocalDateTime.now().plusSeconds(RedisConst.WAYLINE_JOB_BLOCK_TIME));
+ boolean isAdd = waylineRedisService.addPrepareConditionalWaylineJob(newJob);
+ if (!isAdd) {
+ log.error("Failed to create wayline job. {}", newJob.getJobId());
+ return;
+ }
+
+ waylineJob.setJobId(newJob.getJobId());
+ waylineRedisService.setConditionalWaylineJob(waylineJob);
+ }
}
--
Gitblit v1.9.3