From 694b9483c7a551626244cbc222c602ea9ff74094 Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Tue, 25 Apr 2023 21:44:00 +0800
Subject: [PATCH] What's new? 1. Wayline management: added `pause wayline task` and `recover wayline task`. 2. Added command flight function. 3. Fixed some issues.
---
src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java | 103 ++++++++++++++++++---------------------------------
1 files changed, 37 insertions(+), 66 deletions(-)
diff --git a/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java b/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java
index 6b99d38..1747040 100644
--- a/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java
+++ b/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java
@@ -13,8 +13,7 @@
import com.dji.sample.component.oss.service.impl.OssServiceContext;
import com.dji.sample.component.redis.RedisConst;
import com.dji.sample.component.redis.RedisOpsUtils;
-import com.dji.sample.component.websocket.config.ConcurrentWebSocketSession;
-import com.dji.sample.component.websocket.model.CustomWebSocketMessage;
+import com.dji.sample.component.websocket.model.BizCodeEnum;
import com.dji.sample.component.websocket.service.IWebSocketManageService;
import com.dji.sample.component.websocket.service.impl.SendMessageServiceImpl;
import com.dji.sample.manage.dao.IDeviceFirmwareMapper;
@@ -24,15 +23,15 @@
import com.dji.sample.manage.model.param.DeviceFirmwareQueryParam;
import com.dji.sample.manage.model.param.DeviceFirmwareUploadParam;
import com.dji.sample.manage.model.param.DeviceOtaCreateParam;
+import com.dji.sample.manage.model.receiver.FirmwareProgressExtReceiver;
import com.dji.sample.manage.service.IDeviceFirmwareService;
-import com.dji.sample.manage.service.IDeviceService;
+import com.dji.sample.manage.service.IDeviceRedisService;
import com.dji.sample.manage.service.IFirmwareModelService;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.stereotype.Service;
import org.springframework.util.DigestUtils;
@@ -77,13 +76,13 @@
private IWebSocketManageService webSocketManageService;
@Autowired
- private IDeviceService deviceService;
-
- @Autowired
private OssServiceContext ossServiceContext;
@Autowired
private IFirmwareModelService firmwareModelService;
+
+ @Autowired
+ private IDeviceRedisService deviceRedisService;
@Override
public Optional<DeviceFirmwareDTO> getFirmware(String workspaceId, String deviceName, String version) {
@@ -108,7 +107,7 @@
public List<DeviceOtaCreateParam> getDeviceOtaFirmware(String workspaceId, List<DeviceFirmwareUpgradeDTO> upgradeDTOS) {
List<DeviceOtaCreateParam> deviceOtaList = new ArrayList<>();
upgradeDTOS.forEach(upgradeDevice -> {
- boolean exist = deviceService.checkDeviceOnline(upgradeDevice.getSn());
+ boolean exist = deviceRedisService.checkDeviceOnline(upgradeDevice.getSn());
if (!exist) {
throw new IllegalArgumentException("Device is offline.");
}
@@ -125,18 +124,15 @@
return deviceOtaList;
}
- @Override
- @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_OTA_PROGRESS, outputChannel = ChannelName.OUTBOUND)
- public void handleOtaProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
- String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
- String sn = topic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
- topic.indexOf(TopicConst.EVENTS_SUF));
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_OTA_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS)
+ public CommonTopicReceiver handleOtaProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
+ String sn = receiver.getGateway();
- EventsReceiver<EventsOutputReceiver> eventsReceiver = objectMapper.convertValue(receiver.getData(),
- new TypeReference<EventsReceiver<EventsOutputReceiver>>(){});
+ EventsReceiver<EventsOutputProgressReceiver<FirmwareProgressExtReceiver>> eventsReceiver = objectMapper.convertValue(receiver.getData(),
+ new TypeReference<EventsReceiver<EventsOutputProgressReceiver<FirmwareProgressExtReceiver>>>(){});
eventsReceiver.setBid(receiver.getBid());
- EventsOutputReceiver output = eventsReceiver.getOutput();
+ EventsOutputProgressReceiver<FirmwareProgressExtReceiver> output = eventsReceiver.getOutput();
log.info("SN: {}, {} ===> Upgrading progress: {}",
sn, receiver.getMethod(), output.getProgress().toString());
@@ -144,59 +140,34 @@
log.error("SN: {}, {} ===> Error code: {}", sn, receiver.getMethod(), eventsReceiver.getResult());
}
- DeviceDTO device = (DeviceDTO) RedisOpsUtils.get(RedisConst.DEVICE_ONLINE_PREFIX + sn);
- String childDeviceSn = device.getChildDeviceSn();
- boolean upgrade = RedisOpsUtils.getExpire(RedisConst.FIRMWARE_UPGRADING_PREFIX + sn) > 0;
- boolean childUpgrade = RedisOpsUtils.getExpire(RedisConst.FIRMWARE_UPGRADING_PREFIX + childDeviceSn) > 0;
+ Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(sn);
+ if (deviceOpt.isEmpty()) {
+ return null;
+ }
- // Determine whether it is the ending state, delete the update state key in redis after the job ends.
EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
- Collection<ConcurrentWebSocketSession> sessions = webSocketManageService.getValueWithWorkspaceAndUserType(
- device.getWorkspaceId(), UserTypeEnum.WEB.getVal());
- CustomWebSocketMessage<Object> build = CustomWebSocketMessage.builder()
- .data(eventsReceiver)
- .timestamp(System.currentTimeMillis())
- .bizCode(receiver.getMethod())
- .build();
- if (upgrade) {
- if (statusEnum.getEnd()) {
- // Delete the cache after the update is complete.
- RedisOpsUtils.del(RedisConst.FIRMWARE_UPGRADING_PREFIX + sn);
- } else {
- // Update the update progress of the dock in redis.
- RedisOpsUtils.setWithExpire(
- RedisConst.FIRMWARE_UPGRADING_PREFIX + sn, output.getProgress().getPercent(),
- RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
- }
- eventsReceiver.setSn(sn);
- webSocketMessageService.sendBatch(sessions, build);
- }
- if (childUpgrade) {
- if (!StringUtils.hasText(eventsReceiver.getSn())) {
- eventsReceiver.setSn(childDeviceSn);
- webSocketMessageService.sendBatch(sessions, build);
- }
- if (statusEnum.getEnd()) {
- RedisOpsUtils.del(RedisConst.FIRMWARE_UPGRADING_PREFIX + childDeviceSn);
- } else {
- // Update the update progress of the drone in redis.
- RedisOpsUtils.setWithExpire(
- RedisConst.FIRMWARE_UPGRADING_PREFIX + childDeviceSn, output.getProgress().getPercent(),
- RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
- }
- }
+ DeviceDTO device = deviceOpt.get();
+ handleProgress(device.getWorkspaceId(), sn, eventsReceiver, statusEnum.getEnd());
+ handleProgress(device.getWorkspaceId(), device.getChildDeviceSn(), eventsReceiver, statusEnum.getEnd());
- if (receiver.getNeedReply() != null && receiver.getNeedReply() == 1) {
- String replyTopic = headers.get(MqttHeaders.RECEIVED_TOPIC) + TopicConst._REPLY_SUF;
- messageSenderService.publish(replyTopic,
- CommonTopicResponse.builder()
- .tid(receiver.getTid())
- .bid(receiver.getBid())
- .method(receiver.getMethod())
- .timestamp(System.currentTimeMillis())
- .data(RequestsReply.success())
- .build());
+ return receiver;
+ }
+
+ private void handleProgress(String workspaceId, String sn,
+ EventsReceiver<EventsOutputProgressReceiver<FirmwareProgressExtReceiver>> events, boolean isEnd) {
+ boolean upgrade = deviceRedisService.getFirmwareUpgradingProgress(sn).isPresent();
+ if (!upgrade) {
+ return;
}
+ if (isEnd) {
+ // Delete the cache after the update is complete.
+ deviceRedisService.delFirmwareUpgrading(sn);
+ } else {
+ // Update the update progress of the dock in redis.
+ deviceRedisService.setFirmwareUpgrading(sn, events);
+ }
+ events.setSn(sn);
+ webSocketMessageService.sendBatch(workspaceId, UserTypeEnum.WEB.getVal(), BizCodeEnum.OTA_PROGRESS.getCode(), events);
}
@Override
--
Gitblit v1.9.3