From 56df98ce4952239fbf7d0e99dbeb0e5c71531d6f Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Fri, 18 Nov 2022 18:29:06 +0800
Subject: [PATCH] initial v1.3.0
---
src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java | 79 ++++++++++++++++++++++++++++++++++++++-
1 files changed, 76 insertions(+), 3 deletions(-)
diff --git a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
index 46774bb..83b3247 100644
--- a/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
+++ b/src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
@@ -12,15 +12,24 @@
import com.dji.sample.manage.model.dto.DeviceDTO;
import com.dji.sample.manage.model.enums.UserTypeEnum;
import com.dji.sample.wayline.model.dto.FlightTaskProgressReceiver;
+import com.dji.sample.wayline.model.dto.WaylineJobDTO;
+import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
import com.dji.sample.wayline.service.IFlightTaskService;
+import com.dji.sample.wayline.service.IWaylineJobService;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageHeaders;
+import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
+
+import java.time.LocalDateTime;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
/**
* @author sean
@@ -46,18 +55,43 @@
@Autowired
private RedisOpsUtils redisOps;
+ @Autowired
+ private IWaylineJobService waylineJobService;
+
@Override
@ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND)
public void handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
EventsReceiver<FlightTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
new TypeReference<EventsReceiver<FlightTaskProgressReceiver>>(){});
eventsReceiver.setBid(receiver.getBid());
+ eventsReceiver.setSn(receiver.getGateway());
- log.info("Task progress: " + eventsReceiver.getOutput().getProgress().toString());
+ FlightTaskProgressReceiver output = eventsReceiver.getOutput();
+
+ log.info("Task progress: {}", output.getProgress().toString());
if (eventsReceiver.getResult() != ResponseResult.CODE_SUCCESS) {
- log.error("Error code: " + eventsReceiver.getResult());
+ log.error("Task progress ===> Error code: " + eventsReceiver.getResult());
}
+
+ EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
+ if (statusEnum.getEnd()) {
+ WaylineJobDTO job = WaylineJobDTO.builder()
+ .jobId(receiver.getBid())
+ .status(WaylineJobStatusEnum.SUCCESS.getVal())
+ .endTime(LocalDateTime.now())
+ .mediaCount(output.getExt().getMediaCount())
+ .build();
+
+ if (EventsResultStatusEnum.OK != statusEnum) {
+ job.setCode(eventsReceiver.getResult());
+ job.setStatus(WaylineJobStatusEnum.FAILED.getVal());
+ }
+
+ waylineJobService.updateJob(job);
+ redisOps.del(receiver.getBid());
+ }
+ redisOps.setWithExpire(receiver.getBid(), eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
DeviceDTO device = (DeviceDTO) redisOps.get(RedisConst.DEVICE_ONLINE_PREFIX + receiver.getGateway());
websocketMessageService.sendBatch(
@@ -77,8 +111,47 @@
.bid(receiver.getBid())
.method(EventsMethodEnum.FLIGHT_TASK_PROGRESS.getMethod())
.timestamp(System.currentTimeMillis())
- .data(ResponseResult.success())
+ .data(RequestsReply.success())
.build());
}
}
+
+ @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
+ private void checkScheduledJob() {
+ Object jobIdValue = redisOps.zGetMin(RedisConst.WAYLINE_JOB);
+ log.info("Check the timed jobs of the wayline. {}", jobIdValue);
+ if (Objects.isNull(jobIdValue)) {
+ return;
+ }
+ String jobId = String.valueOf(jobIdValue);
+ double time = redisOps.zScore(RedisConst.WAYLINE_JOB, jobIdValue);
+ long now = System.currentTimeMillis();
+ int offset = 30_000;
+
+ // Expired tasks are deleted directly.
+ if (time < now - offset) {
+ redisOps.zRemove(RedisConst.WAYLINE_JOB, jobId);
+ waylineJobService.updateJob(WaylineJobDTO.builder()
+ .jobId(jobId)
+ .status(WaylineJobStatusEnum.FAILED.getVal())
+ .endTime(LocalDateTime.now())
+ .code(HttpStatus.SC_REQUEST_TIMEOUT).build());
+ return;
+ }
+
+ if (now <= time && time <= now + offset) {
+ try {
+ waylineJobService.executeFlightTask(jobId);
+ } catch (Exception e) {
+ log.info("The scheduled task delivery failed.");
+ waylineJobService.updateJob(WaylineJobDTO.builder()
+ .jobId(jobId)
+ .status(WaylineJobStatusEnum.FAILED.getVal())
+ .endTime(LocalDateTime.now())
+ .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build());
+ } finally {
+ redisOps.zRemove(RedisConst.WAYLINE_JOB, jobId);
+ }
+ }
+ }
}
--
Gitblit v1.9.3