From a7aaeabc7873a0eafb4a7ecad7f65b018b7a9bc9 Mon Sep 17 00:00:00 2001
From: sean.zhou <sean.zhou@dji.com>
Date: Fri, 24 Feb 2023 19:31:23 +0800
Subject: [PATCH] What's new? 1. Add license for dock. 2. Modify the logic corresponding to the firmware file and device type. 3. Add multiple mqtt clients options. 4. Modify the structure of the interface for obtaining the device list. 5. Fixed some issues.
---
src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java | 190 ++++++++++++++++++++++++++---------------------
1 files changed, 105 insertions(+), 85 deletions(-)
diff --git a/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java b/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java
index 371ad97..8b441ba 100644
--- a/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java
+++ b/src/main/java/com/dji/sample/manage/service/impl/LiveStreamServiceImpl.java
@@ -2,18 +2,19 @@
import com.dji.sample.common.error.LiveErrorEnum;
import com.dji.sample.common.model.ResponseResult;
-import com.dji.sample.component.mqtt.model.CommonTopicReceiver;
import com.dji.sample.component.mqtt.model.CommonTopicResponse;
+import com.dji.sample.component.mqtt.model.ServiceReply;
import com.dji.sample.component.mqtt.service.IMessageSenderService;
-import com.dji.sample.manage.model.Chan;
+import com.dji.sample.component.redis.RedisConst;
+import com.dji.sample.component.redis.RedisOpsUtils;
import com.dji.sample.manage.model.dto.*;
import com.dji.sample.manage.model.enums.DeviceDomainEnum;
-import com.dji.sample.manage.model.enums.LiveMethodEnum;
+import com.dji.sample.manage.model.enums.LiveStreamMethodEnum;
import com.dji.sample.manage.model.enums.LiveUrlTypeEnum;
import com.dji.sample.manage.model.enums.LiveVideoQualityEnum;
import com.dji.sample.manage.model.param.DeviceQueryParam;
import com.dji.sample.manage.model.receiver.CapacityDeviceReceiver;
-import com.dji.sample.manage.model.receiver.ServiceReplyReceiver;
+import com.dji.sample.manage.model.receiver.LiveCapacityReceiver;
import com.dji.sample.manage.service.ICapacityCameraService;
import com.dji.sample.manage.service.IDeviceService;
import com.dji.sample.manage.service.ILiveStreamService;
@@ -26,7 +27,7 @@
import java.lang.reflect.Field;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
import static com.dji.sample.component.mqtt.model.TopicConst.*;
@@ -54,50 +55,55 @@
@Override
public List<CapacityDeviceDTO> getLiveCapacity(String workspaceId) {
- // Query all drone data in this workspace.
+ // Query all devices in this workspace.
List<DeviceDTO> devicesList = deviceService.getDevicesByParams(
DeviceQueryParam.builder()
.workspaceId(workspaceId)
- .domain(DeviceDomainEnum.SUB_DEVICE.getVal())
+ .domains(List.of(DeviceDomainEnum.SUB_DEVICE.getVal(), DeviceDomainEnum.DOCK.getVal()))
.build());
- List<CapacityDeviceDTO> capacityDevicesList = new ArrayList<>();
// Query the live capability of each drone.
- devicesList.forEach(device -> capacityDevicesList.add(CapacityDeviceDTO.builder()
- .name(device.getDeviceName())
- .sn(device.getDeviceSn())
- .camerasList(capacityCameraService.getCapacityCameraByDeviceSn(device.getDeviceSn()))
- .build()));
-
- return capacityDevicesList;
+ return devicesList.stream()
+ .filter(device -> RedisOpsUtils.checkExist(RedisConst.DEVICE_ONLINE_PREFIX + device.getDeviceSn()))
+ .map(device -> CapacityDeviceDTO.builder()
+ .name(Objects.requireNonNullElse(device.getNickname(), device.getDeviceName()))
+ .sn(device.getDeviceSn())
+ .camerasList(capacityCameraService.getCapacityCameraByDeviceSn(device.getDeviceSn()))
+ .build())
+ .collect(Collectors.toList());
}
@Override
- public Boolean saveLiveCapacity(CapacityDeviceReceiver capacityDeviceReceiver) {
- return capacityCameraService.saveCapacityCameraReceiverList(
- capacityDeviceReceiver.getCameraList(),
- capacityDeviceReceiver.getSn());
+ public void saveLiveCapacity(LiveCapacityReceiver liveCapacityReceiver, Long timestamp) {
+ // Solve timing problems
+ for (CapacityDeviceReceiver capacityDeviceReceiver : liveCapacityReceiver.getDeviceList()) {
+ long last = (long) Objects.requireNonNullElse(
+ RedisOpsUtils.get(RedisConst.LIVE_CAPACITY + capacityDeviceReceiver.getSn()), 0L);
+ if (last > timestamp) {
+ return;
+ }
+ capacityCameraService.saveCapacityCameraReceiverList(
+ capacityDeviceReceiver.getCameraList(),
+ capacityDeviceReceiver.getSn(), timestamp);
+ }
}
@Override
public ResponseResult liveStart(LiveTypeDTO liveParam) {
// Check if this lens is available live.
ResponseResult responseResult = this.checkBeforeLive(liveParam.getVideoId());
- if (responseResult.getCode() != 0) {
+ if (ResponseResult.CODE_SUCCESS != responseResult.getCode()) {
return responseResult;
}
- List<DeviceDTO> data = (List<DeviceDTO>)responseResult.getData();
+ DeviceDTO data = (DeviceDTO)responseResult.getData();
// target topic
String respTopic = THING_MODEL_PRE + PRODUCT +
- data.get(0).getDeviceSn() + SERVICES_SUF;
- Optional<ServiceReplyReceiver> receiveReplyOpt = this.publishLiveStart(respTopic, liveParam);
+ data.getDeviceSn() + SERVICES_SUF;
+ ServiceReply receiveReply = this.publishLiveStart(respTopic, liveParam);
- if (receiveReplyOpt.isEmpty()) {
- return ResponseResult.error(LiveErrorEnum.NO_REPLY);
- }
- if (receiveReplyOpt.get().getResult() != 0) {
- return ResponseResult.error(LiveErrorEnum.find(receiveReplyOpt.get().getResult()));
+ if (ResponseResult.CODE_SUCCESS != receiveReply.getResult()) {
+ return ResponseResult.error(LiveErrorEnum.find(receiveReply.getResult()));
}
LiveUrlTypeEnum urlType = LiveUrlTypeEnum.find(liveParam.getUrlType());
@@ -119,8 +125,8 @@
.toString());
break;
case RTSP:
- String url = receiveReplyOpt.get().getInfo();
- this.resolveUrlUser(url, live);
+ Object url = Objects.requireNonNullElse(receiveReply.getOutput(), receiveReply.getInfo());
+ this.resolveUrlUser(String.valueOf(url), live);
break;
case UNKNOWN:
return ResponseResult.error(LiveErrorEnum.URL_TYPE_NOT_SUPPORTED);
@@ -131,19 +137,16 @@
@Override
public ResponseResult liveStop(String videoId) {
- ResponseResult<List<DeviceDTO>> responseResult = this.checkBeforeLive(videoId);
+ ResponseResult<DeviceDTO> responseResult = this.checkBeforeLive(videoId);
if (responseResult.getCode() != 0) {
return responseResult;
}
- String respTopic = THING_MODEL_PRE + PRODUCT + responseResult.getData().get(0).getDeviceSn() + SERVICES_SUF;
+ String respTopic = THING_MODEL_PRE + PRODUCT + responseResult.getData().getDeviceSn() + SERVICES_SUF;
- Optional<ServiceReplyReceiver> receiveReplyOpt = this.publishLiveStop(respTopic, videoId);
- if (receiveReplyOpt.isEmpty()) {
- return ResponseResult.error(LiveErrorEnum.NO_REPLY);
- }
- if (receiveReplyOpt.get().getResult() != 0) {
- return ResponseResult.error(LiveErrorEnum.find(receiveReplyOpt.get().getResult()));
+ ServiceReply receiveReply = this.publishLiveStop(respTopic, videoId);
+ if (receiveReply.getResult() != 0) {
+ return ResponseResult.error(LiveErrorEnum.find(receiveReply.getResult()));
}
return ResponseResult.success();
@@ -156,23 +159,55 @@
return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS);
}
- ResponseResult<List<DeviceDTO>> responseResult = this.checkBeforeLive(liveParam.getVideoId());
+ ResponseResult<DeviceDTO> responseResult = this.checkBeforeLive(liveParam.getVideoId());
if (responseResult.getCode() != 0) {
return responseResult;
}
- String respTopic = THING_MODEL_PRE + PRODUCT + responseResult.getData().get(0).getDeviceSn() + SERVICES_SUF;
+ String respTopic = THING_MODEL_PRE + PRODUCT + responseResult.getData().getDeviceSn() + SERVICES_SUF;
- Optional<ServiceReplyReceiver> receiveReplyOpt = this.publishLiveSetQuality(respTopic, liveParam);
- if (receiveReplyOpt.isEmpty()) {
- return ResponseResult.error(LiveErrorEnum.NO_REPLY);
- }
- if (receiveReplyOpt.get().getResult() != 0) {
- return ResponseResult.error(LiveErrorEnum.find(receiveReplyOpt.get().getResult()));
+ ServiceReply receiveReply = this.publishLiveSetQuality(respTopic, liveParam);
+ if (ResponseResult.CODE_SUCCESS != receiveReply.getResult()) {
+ return ResponseResult.error(LiveErrorEnum.find(receiveReply.getResult()));
}
return ResponseResult.success();
+ }
+
+ @Override
+ public ResponseResult liveLensChange(LiveTypeDTO liveParam) {
+ if (!StringUtils.hasText(liveParam.getVideoType())) {
+ return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS);
+ }
+
+ ResponseResult<DeviceDTO> responseResult = this.checkBeforeLive(liveParam.getVideoId());
+ if (ResponseResult.CODE_SUCCESS != responseResult.getCode()) {
+ return responseResult;
+ }
+ if (DeviceDomainEnum.GATEWAY.getVal() == responseResult.getData().getDomain()) {
+ return ResponseResult.error(LiveErrorEnum.FUNCTION_NOT_SUPPORT);
+ }
+
+ String respTopic = THING_MODEL_PRE + PRODUCT + responseResult.getData().getDeviceSn() + SERVICES_SUF;
+
+ ServiceReply receiveReply = this.publishLiveLensChange(respTopic, liveParam);
+
+ if (ResponseResult.CODE_SUCCESS != receiveReply.getResult()) {
+ return ResponseResult.error(LiveErrorEnum.find(receiveReply.getResult()));
+ }
+
+ return ResponseResult.success();
+ }
+
+ private ServiceReply publishLiveLensChange(String respTopic, LiveTypeDTO liveParam) {
+ CommonTopicResponse<LiveTypeDTO> response = new CommonTopicResponse<>();
+ response.setTid(UUID.randomUUID().toString());
+ response.setBid(UUID.randomUUID().toString());
+ response.setMethod(LiveStreamMethodEnum.LIVE_LENS_CHANGE.getMethod());
+ response.setData(liveParam);
+
+ return messageSender.publishWithReply(respTopic, response);
}
/**
@@ -180,22 +215,34 @@
* @param videoId
* @return
*/
- private ResponseResult checkBeforeLive(String videoId) {
+ private ResponseResult<DeviceDTO> checkBeforeLive(String videoId) {
+ if (!StringUtils.hasText(videoId)) {
+ return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS);
+ }
String[] videoIdArr = videoId.split("/");
// drone sn / enumeration value of the location where the payload is mounted / payload lens
if (videoIdArr.length != 3) {
return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS);
}
+ Optional<DeviceDTO> deviceOpt = deviceService.getDeviceBySn(videoIdArr[0]);
+ // Check if the gateway device connected to this drone exists
+ if (deviceOpt.isEmpty()) {
+ return ResponseResult.error(LiveErrorEnum.NO_AIRCRAFT);
+ }
+
+ if (DeviceDomainEnum.DOCK.getVal() == deviceOpt.get().getDomain()) {
+ return ResponseResult.success(deviceOpt.get());
+ }
List<DeviceDTO> gatewayList = deviceService.getDevicesByParams(
DeviceQueryParam.builder()
.childSn(videoIdArr[0])
.build());
- // Check if the gateway device connected to this drone exists
if (gatewayList.isEmpty()) {
return ResponseResult.error(LiveErrorEnum.NO_FLIGHT_CONTROL);
}
- return ResponseResult.success(gatewayList);
+
+ return ResponseResult.success(gatewayList.get(0));
}
/**
@@ -250,14 +297,14 @@
* @param liveParam
* @return
*/
- private Optional<ServiceReplyReceiver> publishLiveStart(String topic, LiveTypeDTO liveParam) {
+ private ServiceReply publishLiveStart(String topic, LiveTypeDTO liveParam) {
CommonTopicResponse<LiveTypeDTO> response = new CommonTopicResponse<>();
response.setTid(UUID.randomUUID().toString());
response.setBid(UUID.randomUUID().toString());
response.setData(liveParam);
- response.setMethod(LiveMethodEnum.LIVE_START_PUSH.getMethod());
+ response.setMethod(LiveStreamMethodEnum.LIVE_START_PUSH.getMethod());
- return this.publishLive(topic, response);
+ return messageSender.publishWithReply(topic, response);
}
/**
@@ -266,17 +313,17 @@
* @param liveParam
* @return
*/
- private Optional<ServiceReplyReceiver> publishLiveSetQuality(String respTopic, LiveTypeDTO liveParam) {
+ private ServiceReply publishLiveSetQuality(String respTopic, LiveTypeDTO liveParam) {
Map<String, Object> data = new ConcurrentHashMap<>(Map.of(
"video_id", liveParam.getVideoId(),
"video_quality", liveParam.getVideoQuality()));
CommonTopicResponse<Map<String, Object>> response = new CommonTopicResponse<>();
response.setTid(UUID.randomUUID().toString());
response.setBid(UUID.randomUUID().toString());
- response.setMethod(LiveMethodEnum.LIVE_SET_QUALITY.getMethod());
+ response.setMethod(LiveStreamMethodEnum.LIVE_SET_QUALITY.getMethod());
response.setData(data);
- return this.publishLive(respTopic, response);
+ return messageSender.publishWithReply(respTopic, response);
}
/**
@@ -285,42 +332,15 @@
* @param videoId
* @return
*/
- private Optional<ServiceReplyReceiver> publishLiveStop(String topic, String videoId) {
+ private ServiceReply publishLiveStop(String topic, String videoId) {
Map<String, String> data = new ConcurrentHashMap<>(Map.of("video_id", videoId));
CommonTopicResponse<Map<String, String>> response = new CommonTopicResponse<>();
response.setTid(UUID.randomUUID().toString());
response.setBid(UUID.randomUUID().toString());
response.setData(data);
- response.setMethod(LiveMethodEnum.LIVE_STOP_PUSH.getMethod());
+ response.setMethod(LiveStreamMethodEnum.LIVE_STOP_PUSH.getMethod());
- return this.publishLive(topic, response);
- }
-
- /**
- * Send live streaming start message and receive a response at the same time
- * @param topic
- * @param response notification of whether the start is successful.
- * @return
- */
- private Optional<ServiceReplyReceiver> publishLive(String topic, CommonTopicResponse response) {
- AtomicInteger time = new AtomicInteger(0);
- // Retry three times
- while (time.getAndIncrement() < 3) {
- messageSender.publish(topic, response);
-
- Chan<CommonTopicReceiver<ServiceReplyReceiver>> chan = Chan.getInstance();
- // If the message is not received in 0.5 seconds then resend it again.
- CommonTopicReceiver<ServiceReplyReceiver> receiver = chan.get(response.getMethod());
- if (receiver == null) {
- continue;
- }
- // Need to match tid and bid.
- if (receiver.getTid().equals(response.getTid()) &&
- receiver.getBid().equals(response.getBid())) {
- return Optional.ofNullable(receiver.getData());
- }
- }
- return Optional.empty();
+ return messageSender.publishWithReply(topic, response);
}
}
\ No newline at end of file
--
Gitblit v1.9.3