From 8d9a2d656e4ae007590c622e5f7c228adacdca49 Mon Sep 17 00:00:00 2001
From: rain <167982779@qq.com>
Date: Fri, 14 Jun 2024 10:11:36 +0800
Subject: [PATCH] 统一风格
---
src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java | 137 ++++++++++++++-------------------------------
1 files changed, 42 insertions(+), 95 deletions(-)
diff --git a/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java b/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java
index eec4b41..f18a262 100644
--- a/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java
+++ b/src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java
@@ -10,21 +10,18 @@
import com.dji.sample.component.mqtt.service.IMessageSenderService;
import com.dji.sample.component.redis.RedisConst;
import com.dji.sample.component.redis.RedisOpsUtils;
-import com.dji.sample.component.websocket.model.CustomWebSocketMessage;
+import com.dji.sample.component.websocket.model.BizCodeEnum;
import com.dji.sample.component.websocket.service.ISendMessageService;
-import com.dji.sample.component.websocket.service.IWebSocketManageService;
import com.dji.sample.manage.dao.IDeviceLogsMapper;
import com.dji.sample.manage.model.dto.*;
import com.dji.sample.manage.model.entity.DeviceLogsEntity;
-import com.dji.sample.manage.model.enums.DeviceDomainEnum;
-import com.dji.sample.manage.model.enums.DeviceLogsStatusEnum;
-import com.dji.sample.manage.model.enums.LogsFileUpdateMethodEnum;
-import com.dji.sample.manage.model.enums.UserTypeEnum;
+import com.dji.sample.manage.model.enums.*;
import com.dji.sample.manage.model.param.DeviceLogsCreateParam;
import com.dji.sample.manage.model.param.DeviceLogsQueryParam;
import com.dji.sample.manage.model.param.LogsFileUpdateParam;
import com.dji.sample.manage.model.receiver.*;
import com.dji.sample.manage.service.IDeviceLogsService;
+import com.dji.sample.manage.service.IDeviceRedisService;
import com.dji.sample.manage.service.ILogsFileService;
import com.dji.sample.manage.service.ITopologyService;
import com.dji.sample.media.model.StsCredentialsDTO;
@@ -73,9 +70,6 @@
private ILogsFileService logsFileService;
@Autowired
- private RedisOpsUtils redisOpsUtils;
-
- @Autowired
private IStorageService storageService;
@Autowired
@@ -85,7 +79,7 @@
private ISendMessageService webSocketMessageService;
@Autowired
- private IWebSocketManageService webSocketManageService;
+ private IDeviceRedisService deviceRedisService;
@Override
public PaginationData<DeviceLogsDTO> getUploadedLogs(String deviceSn, DeviceLogsQueryParam param) {
@@ -107,32 +101,21 @@
@Override
public ResponseResult getRealTimeLogs(String deviceSn, List<String> domainList) {
- boolean exist = redisOpsUtils.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + deviceSn) > 0;
+ boolean exist = deviceRedisService.checkDeviceOnline(deviceSn);
if (!exist) {
- return ResponseResult.error("Device is offline.");
+ return ResponseResult.error("设备离线");
}
- String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF;
- Optional<LogsFileUploadList> serviceReplyOpt = messageSenderService.publishWithReply(
- LogsFileUploadList.class,
- topic,
- CommonTopicResponse.builder()
- .tid(UUID.randomUUID().toString())
- .bid(UUID.randomUUID().toString())
- .method(ServicesMethodEnum.FILE_UPLOAD_LIST.getMethod())
- .timestamp(System.currentTimeMillis())
- .data(Map.of(MapKeyConst.MODULE_LIST, domainList))
- .build(), 1);
- if (serviceReplyOpt.isEmpty()) {
- return ResponseResult.error("No message reply received.");
- }
- LogsFileUploadList data = serviceReplyOpt.get();
- for (LogsFileUpload file : data.getFiles()) {
+ ServiceReply<List<LogsFileUpload>> data = messageSenderService.publishServicesTopic(
+ new TypeReference<List<LogsFileUpload>>() {}, deviceSn, LogsFileMethodEnum.FILE_UPLOAD_LIST.getMethod(),
+ Map.of(MapKeyConst.MODULE_LIST, domainList));
+
+ for (LogsFileUpload file : data.getOutput()) {
if (file.getDeviceSn().isBlank()) {
file.setDeviceSn(deviceSn);
}
}
- return ResponseResult.success(data);
+ return ResponseResult.success(new LogsFileUploadList(data.getOutput(), data.getResult()));
}
@Override
@@ -170,31 +153,20 @@
credentialsDTO.setParams(LogsFileUploadList.builder().files(files).build());
String bid = UUID.randomUUID().toString();
- Optional<ServiceReply> serviceReply = messageSenderService.publishWithReply(
- TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF,
- CommonTopicResponse.<LogsUploadCredentialsDTO>builder()
- .tid(UUID.randomUUID().toString())
- .bid(bid)
- .timestamp(System.currentTimeMillis())
- .method(ServicesMethodEnum.FILE_UPLOAD_START.getMethod())
- .data(credentialsDTO)
- .build());
+ ServiceReply reply = messageSenderService.publishServicesTopic(
+ deviceSn, LogsFileMethodEnum.FILE_UPLOAD_START.getMethod(), credentialsDTO, bid);
- if (serviceReply.isEmpty()) {
- return ResponseResult.error("No message reply received.");
- }
- ServiceReply reply = serviceReply.get();
if (ResponseResult.CODE_SUCCESS != reply.getResult()) {
return ResponseResult.error(String.valueOf(reply.getResult()));
}
String logsId = this.insertDeviceLogs(bid, username, deviceSn, param);
if (!bid.equals(logsId)) {
- return ResponseResult.error("Database insert failed.");
+ return ResponseResult.error("数据库插入失败");
}
// Save the status of the log upload.
- redisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + deviceSn, bid, LogsOutputProgressDTO.builder().logsId(logsId).build());
+ RedisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + deviceSn, bid, LogsOutputProgressDTO.builder().logsId(logsId).build());
return ResponseResult.success();
}
@@ -203,25 +175,13 @@
public ResponseResult pushUpdateFile(String deviceSn, LogsFileUpdateParam param) {
LogsFileUpdateMethodEnum method = LogsFileUpdateMethodEnum.find(param.getStatus());
if (LogsFileUpdateMethodEnum.UNKNOWN == method) {
- return ResponseResult.error("Illegal param");
+ return ResponseResult.error("非法参数");
}
- String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF;
- String bid = UUID.randomUUID().toString();
- Optional<ServiceReply> serviceReply = messageSenderService.publishWithReply(topic,
- CommonTopicResponse.<LogsFileUpdateParam>builder()
- .tid(UUID.randomUUID().toString())
- .bid(bid)
- .timestamp(System.currentTimeMillis())
- .method(ServicesMethodEnum.FILE_UPLOAD_UPDATE.getMethod())
- .data(param)
- .build());
+ ServiceReply reply = messageSenderService.publishServicesTopic(
+ deviceSn, LogsFileMethodEnum.FILE_UPLOAD_UPDATE.getMethod(), param);
- if (serviceReply.isEmpty()) {
- return ResponseResult.error("No message reply received.");
- }
- ServiceReply reply = serviceReply.get();
if (ResponseResult.CODE_SUCCESS != reply.getResult()) {
- return ResponseResult.error("Error Code : " + reply.getResult());
+ return ResponseResult.error("错误码:" + reply.getResult());
}
return ResponseResult.success();
@@ -234,24 +194,12 @@
logsFileService.deleteFileByLogsId(logsId);
}
- @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FILE_UPLOAD_PROGRESS, outputChannel = ChannelName.OUTBOUND)
+ @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FILE_UPLOAD_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS)
@Override
- public void handleFileUploadProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
+ public CommonTopicReceiver handleFileUploadProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
String sn = topic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
topic.indexOf(TopicConst.EVENTS_SUF));
-
- if (receiver.getNeedReply() != null && receiver.getNeedReply() == 1) {
- String replyTopic = headers.get(MqttHeaders.RECEIVED_TOPIC) + TopicConst._REPLY_SUF;
- messageSenderService.publish(replyTopic,
- CommonTopicResponse.builder()
- .tid(receiver.getTid())
- .bid(receiver.getBid())
- .method(receiver.getMethod())
- .timestamp(System.currentTimeMillis())
- .data(ResponseResult.success())
- .build());
- }
EventsReceiver<OutputLogsProgressReceiver> eventsReceiver = objectMapper.convertValue(receiver.getData(),
new TypeReference<EventsReceiver<OutputLogsProgressReceiver>>(){});
@@ -260,7 +208,12 @@
webSocketData.setBid(receiver.getBid());
webSocketData.setSn(sn);
- DeviceDTO device = (DeviceDTO) redisOpsUtils.get(RedisConst.DEVICE_ONLINE_PREFIX + sn);
+ Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(sn);
+ if (deviceOpt.isEmpty()) {
+ return null;
+ }
+
+ DeviceDTO device = deviceOpt.get();
try {
OutputLogsProgressReceiver output = eventsReceiver.getOutput();
@@ -269,12 +222,12 @@
String key = RedisConst.LOGS_FILE_PREFIX + sn;
LogsOutputProgressDTO progress;
- boolean exist = redisOpsUtils.checkExist(key);
+ boolean exist = RedisOpsUtils.checkExist(key);
if (!exist && !statusEnum.getEnd()) {
progress = LogsOutputProgressDTO.builder().logsId(receiver.getBid()).build();
- redisOpsUtils.hashSet(key, receiver.getBid(), progress);
+ RedisOpsUtils.hashSet(key, receiver.getBid(), progress);
} else if (exist) {
- progress = (LogsOutputProgressDTO) redisOpsUtils.hashGet(key, receiver.getBid());
+ progress = (LogsOutputProgressDTO) RedisOpsUtils.hashGet(key, receiver.getBid());
} else {
progress = LogsOutputProgressDTO.builder().build();
}
@@ -283,8 +236,7 @@
// If the logs file is empty, delete the cache of this task.
List<LogsExtFileReceiver> fileReceivers = output.getExt().getFiles();
if (CollectionUtils.isEmpty(fileReceivers)) {
- redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
- return;
+ RedisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
}
// refresh cache.
@@ -310,10 +262,10 @@
});
progress.setFiles(fileProgressList);
webSocketData.setOutput(progress);
- redisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + sn, receiver.getBid(), progress);
+ RedisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + sn, receiver.getBid(), progress);
// Delete the cache at the end of the task.
if (statusEnum.getEnd()) {
- redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
+ RedisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
this.updateLogsStatus(receiver.getBid(), DeviceLogsStatusEnum.find(statusEnum).getVal());
fileReceivers.forEach(file -> logsFileService.updateFile(receiver.getBid(), file));
@@ -321,18 +273,13 @@
} catch (NullPointerException e) {
this.updateLogsStatus(receiver.getBid(), DeviceLogsStatusEnum.FAILED.getVal());
- redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
+ RedisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
}
- webSocketMessageService.sendBatch(
- webSocketManageService.getValueWithWorkspaceAndUserType(
- device.getWorkspaceId(), UserTypeEnum.WEB.getVal()),
- CustomWebSocketMessage.builder()
- .data(webSocketData)
- .timestamp(System.currentTimeMillis())
- .bizCode(receiver.getMethod())
- .build());
+ webSocketMessageService.sendBatch(device.getWorkspaceId(), UserTypeEnum.WEB.getVal(),
+ BizCodeEnum.FILE_UPLOAD_PROGRESS.getCode(), webSocketData);
+ return receiver;
}
@Override
@@ -355,9 +302,9 @@
return null;
}
String key = RedisConst.LOGS_FILE_PREFIX + entity.getDeviceSn();
- LogsOutputProgressDTO progress = new LogsOutputProgressDTO();
- if (redisOpsUtils.hashCheck(key, entity.getLogsId())) {
- progress = (LogsOutputProgressDTO) redisOpsUtils.hashGet(key, entity.getLogsId());
+ LogsOutputProgressDTO progress = null;
+ if (RedisOpsUtils.hashCheck(key, entity.getLogsId())) {
+ progress = (LogsOutputProgressDTO) RedisOpsUtils.hashGet(key, entity.getLogsId());
}
return DeviceLogsDTO.builder()
@@ -369,7 +316,7 @@
.logsInformation(entity.getLogsInfo())
.userName(entity.getUsername())
.deviceLogs(LogsFileUploadList.builder().files(logsFileService.getLogsFileByLogsId(entity.getLogsId())).build())
- .logsProgress(progress.getFiles())
+ .logsProgress(Objects.requireNonNullElse(progress, new LogsOutputProgressDTO()).getFiles())
.deviceTopo(topologyService.getDeviceTopologyByGatewaySn(entity.getDeviceSn()).orElse(null))
.build();
}
--
Gitblit v1.9.3