| | |
| | | import com.dji.sample.component.mqtt.service.IMessageSenderService; |
| | | import com.dji.sample.component.redis.RedisConst; |
| | | import com.dji.sample.component.redis.RedisOpsUtils; |
| | | import com.dji.sample.component.websocket.model.CustomWebSocketMessage; |
| | | import com.dji.sample.component.websocket.model.BizCodeEnum; |
| | | import com.dji.sample.component.websocket.service.ISendMessageService; |
| | | import com.dji.sample.component.websocket.service.IWebSocketManageService; |
| | | import com.dji.sample.manage.dao.IDeviceLogsMapper; |
| | | import com.dji.sample.manage.model.dto.*; |
| | | import com.dji.sample.manage.model.entity.DeviceLogsEntity; |
| | |
| | | import com.dji.sample.manage.model.param.LogsFileUpdateParam; |
| | | import com.dji.sample.manage.model.receiver.*; |
| | | import com.dji.sample.manage.service.IDeviceLogsService; |
| | | import com.dji.sample.manage.service.IDeviceRedisService; |
| | | import com.dji.sample.manage.service.ILogsFileService; |
| | | import com.dji.sample.manage.service.ITopologyService; |
| | | import com.dji.sample.media.model.StsCredentialsDTO; |
| | |
| | | private ILogsFileService logsFileService; |
| | | |
| | | @Autowired |
| | | private RedisOpsUtils redisOpsUtils; |
| | | |
| | | @Autowired |
| | | private IStorageService storageService; |
| | | |
| | | @Autowired |
| | |
| | | private ISendMessageService webSocketMessageService; |
| | | |
| | | @Autowired |
| | | private IWebSocketManageService webSocketManageService; |
| | | private IDeviceRedisService deviceRedisService; |
| | | |
| | | @Override |
| | | public PaginationData<DeviceLogsDTO> getUploadedLogs(String deviceSn, DeviceLogsQueryParam param) { |
| | |
| | | |
| | | @Override |
| | | public ResponseResult getRealTimeLogs(String deviceSn, List<String> domainList) { |
| | | boolean exist = redisOpsUtils.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + deviceSn) > 0; |
| | | boolean exist = deviceRedisService.checkDeviceOnline(deviceSn); |
| | | if (!exist) { |
| | | return ResponseResult.error("Device is offline."); |
| | | return ResponseResult.error("设备离线"); |
| | | } |
| | | |
| | | String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF; |
| | | LogsFileUploadList data = messageSenderService.publishWithReply( |
| | | LogsFileUploadList.class, |
| | | topic, |
| | | CommonTopicResponse.builder() |
| | | .tid(UUID.randomUUID().toString()) |
| | | .bid(UUID.randomUUID().toString()) |
| | | .method(LogsFileMethodEnum.FILE_UPLOAD_LIST.getMethod()) |
| | | .timestamp(System.currentTimeMillis()) |
| | | .data(Map.of(MapKeyConst.MODULE_LIST, domainList)) |
| | | .build(), 1); |
| | | ServiceReply<List<LogsFileUpload>> data = messageSenderService.publishServicesTopic( |
| | | new TypeReference<List<LogsFileUpload>>() {}, deviceSn, LogsFileMethodEnum.FILE_UPLOAD_LIST.getMethod(), |
| | | Map.of(MapKeyConst.MODULE_LIST, domainList)); |
| | | |
| | | for (LogsFileUpload file : data.getFiles()) { |
| | | for (LogsFileUpload file : data.getOutput()) { |
| | | if (file.getDeviceSn().isBlank()) { |
| | | file.setDeviceSn(deviceSn); |
| | | } |
| | | } |
| | | return ResponseResult.success(data); |
| | | return ResponseResult.success(new LogsFileUploadList(data.getOutput(), data.getResult())); |
| | | } |
| | | |
| | | @Override |
| | |
| | | |
| | | credentialsDTO.setParams(LogsFileUploadList.builder().files(files).build()); |
| | | String bid = UUID.randomUUID().toString(); |
| | | ServiceReply reply = messageSenderService.publishWithReply( |
| | | TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF, |
| | | CommonTopicResponse.<LogsUploadCredentialsDTO>builder() |
| | | .tid(UUID.randomUUID().toString()) |
| | | .bid(bid) |
| | | .timestamp(System.currentTimeMillis()) |
| | | .method(LogsFileMethodEnum.FILE_UPLOAD_START.getMethod()) |
| | | .data(credentialsDTO) |
| | | .build()); |
| | | ServiceReply reply = messageSenderService.publishServicesTopic( |
| | | deviceSn, LogsFileMethodEnum.FILE_UPLOAD_START.getMethod(), credentialsDTO, bid); |
| | | |
| | | if (ResponseResult.CODE_SUCCESS != reply.getResult()) { |
| | | return ResponseResult.error(String.valueOf(reply.getResult())); |
| | |
| | | |
| | | String logsId = this.insertDeviceLogs(bid, username, deviceSn, param); |
| | | if (!bid.equals(logsId)) { |
| | | return ResponseResult.error("Database insert failed."); |
| | | return ResponseResult.error("数据库插入失败"); |
| | | } |
| | | |
| | | // Save the status of the log upload. |
| | | redisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + deviceSn, bid, LogsOutputProgressDTO.builder().logsId(logsId).build()); |
| | | RedisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + deviceSn, bid, LogsOutputProgressDTO.builder().logsId(logsId).build()); |
| | | return ResponseResult.success(); |
| | | |
| | | } |
| | |
| | | public ResponseResult pushUpdateFile(String deviceSn, LogsFileUpdateParam param) { |
| | | LogsFileUpdateMethodEnum method = LogsFileUpdateMethodEnum.find(param.getStatus()); |
| | | if (LogsFileUpdateMethodEnum.UNKNOWN == method) { |
| | | return ResponseResult.error("Illegal param"); |
| | | return ResponseResult.error("非法参数"); |
| | | } |
| | | String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF; |
| | | String bid = UUID.randomUUID().toString(); |
| | | ServiceReply reply = messageSenderService.publishWithReply(topic, |
| | | CommonTopicResponse.<LogsFileUpdateParam>builder() |
| | | .tid(UUID.randomUUID().toString()) |
| | | .bid(bid) |
| | | .timestamp(System.currentTimeMillis()) |
| | | .method(LogsFileMethodEnum.FILE_UPLOAD_UPDATE.getMethod()) |
| | | .data(param) |
| | | .build()); |
| | | ServiceReply reply = messageSenderService.publishServicesTopic( |
| | | deviceSn, LogsFileMethodEnum.FILE_UPLOAD_UPDATE.getMethod(), param); |
| | | |
| | | if (ResponseResult.CODE_SUCCESS != reply.getResult()) { |
| | | return ResponseResult.error("Error Code : " + reply.getResult()); |
| | | return ResponseResult.error("错误码:" + reply.getResult()); |
| | | } |
| | | |
| | | return ResponseResult.success(); |
| | |
| | | logsFileService.deleteFileByLogsId(logsId); |
| | | } |
| | | |
| | | @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FILE_UPLOAD_PROGRESS, outputChannel = ChannelName.OUTBOUND) |
| | | @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FILE_UPLOAD_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS) |
| | | @Override |
| | | public void handleFileUploadProgress(CommonTopicReceiver receiver, MessageHeaders headers) { |
| | | public CommonTopicReceiver handleFileUploadProgress(CommonTopicReceiver receiver, MessageHeaders headers) { |
| | | String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString(); |
| | | String sn = topic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(), |
| | | topic.indexOf(TopicConst.EVENTS_SUF)); |
| | | |
| | | if (receiver.getNeedReply() != null && receiver.getNeedReply() == 1) { |
| | | String replyTopic = headers.get(MqttHeaders.RECEIVED_TOPIC) + TopicConst._REPLY_SUF; |
| | | messageSenderService.publish(replyTopic, |
| | | CommonTopicResponse.builder() |
| | | .tid(receiver.getTid()) |
| | | .bid(receiver.getBid()) |
| | | .method(receiver.getMethod()) |
| | | .timestamp(System.currentTimeMillis()) |
| | | .data(RequestsReply.success()) |
| | | .build()); |
| | | } |
| | | |
| | | EventsReceiver<OutputLogsProgressReceiver> eventsReceiver = objectMapper.convertValue(receiver.getData(), |
| | | new TypeReference<EventsReceiver<OutputLogsProgressReceiver>>(){}); |
| | |
| | | webSocketData.setBid(receiver.getBid()); |
| | | webSocketData.setSn(sn); |
| | | |
| | | DeviceDTO device = (DeviceDTO) redisOpsUtils.get(RedisConst.DEVICE_ONLINE_PREFIX + sn); |
| | | Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(sn); |
| | | if (deviceOpt.isEmpty()) { |
| | | return null; |
| | | } |
| | | |
| | | DeviceDTO device = deviceOpt.get(); |
| | | |
| | | try { |
| | | OutputLogsProgressReceiver output = eventsReceiver.getOutput(); |
| | |
| | | |
| | | String key = RedisConst.LOGS_FILE_PREFIX + sn; |
| | | LogsOutputProgressDTO progress; |
| | | boolean exist = redisOpsUtils.checkExist(key); |
| | | boolean exist = RedisOpsUtils.checkExist(key); |
| | | if (!exist && !statusEnum.getEnd()) { |
| | | progress = LogsOutputProgressDTO.builder().logsId(receiver.getBid()).build(); |
| | | redisOpsUtils.hashSet(key, receiver.getBid(), progress); |
| | | RedisOpsUtils.hashSet(key, receiver.getBid(), progress); |
| | | } else if (exist) { |
| | | progress = (LogsOutputProgressDTO) redisOpsUtils.hashGet(key, receiver.getBid()); |
| | | progress = (LogsOutputProgressDTO) RedisOpsUtils.hashGet(key, receiver.getBid()); |
| | | } else { |
| | | progress = LogsOutputProgressDTO.builder().build(); |
| | | } |
| | |
| | | // If the logs file is empty, delete the cache of this task. |
| | | List<LogsExtFileReceiver> fileReceivers = output.getExt().getFiles(); |
| | | if (CollectionUtils.isEmpty(fileReceivers)) { |
| | | redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn); |
| | | RedisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn); |
| | | } |
| | | |
| | | // refresh cache. |
| | |
| | | }); |
| | | progress.setFiles(fileProgressList); |
| | | webSocketData.setOutput(progress); |
| | | redisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + sn, receiver.getBid(), progress); |
| | | RedisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + sn, receiver.getBid(), progress); |
| | | // Delete the cache at the end of the task. |
| | | if (statusEnum.getEnd()) { |
| | | redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn); |
| | | RedisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn); |
| | | this.updateLogsStatus(receiver.getBid(), DeviceLogsStatusEnum.find(statusEnum).getVal()); |
| | | |
| | | fileReceivers.forEach(file -> logsFileService.updateFile(receiver.getBid(), file)); |
| | |
| | | } catch (NullPointerException e) { |
| | | this.updateLogsStatus(receiver.getBid(), DeviceLogsStatusEnum.FAILED.getVal()); |
| | | |
| | | redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn); |
| | | RedisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn); |
| | | } |
| | | |
| | | webSocketMessageService.sendBatch( |
| | | webSocketManageService.getValueWithWorkspaceAndUserType( |
| | | device.getWorkspaceId(), UserTypeEnum.WEB.getVal()), |
| | | CustomWebSocketMessage.builder() |
| | | .data(webSocketData) |
| | | .timestamp(System.currentTimeMillis()) |
| | | .bizCode(receiver.getMethod()) |
| | | .build()); |
| | | webSocketMessageService.sendBatch(device.getWorkspaceId(), UserTypeEnum.WEB.getVal(), |
| | | BizCodeEnum.FILE_UPLOAD_PROGRESS.getCode(), webSocketData); |
| | | |
| | | return receiver; |
| | | } |
| | | |
| | | @Override |
| | |
| | | return null; |
| | | } |
| | | String key = RedisConst.LOGS_FILE_PREFIX + entity.getDeviceSn(); |
| | | LogsOutputProgressDTO progress = new LogsOutputProgressDTO(); |
| | | if (redisOpsUtils.hashCheck(key, entity.getLogsId())) { |
| | | progress = (LogsOutputProgressDTO) redisOpsUtils.hashGet(key, entity.getLogsId()); |
| | | LogsOutputProgressDTO progress = null; |
| | | if (RedisOpsUtils.hashCheck(key, entity.getLogsId())) { |
| | | progress = (LogsOutputProgressDTO) RedisOpsUtils.hashGet(key, entity.getLogsId()); |
| | | } |
| | | |
| | | return DeviceLogsDTO.builder() |
| | |
| | | .logsInformation(entity.getLogsInfo()) |
| | | .userName(entity.getUsername()) |
| | | .deviceLogs(LogsFileUploadList.builder().files(logsFileService.getLogsFileByLogsId(entity.getLogsId())).build()) |
| | | .logsProgress(progress.getFiles()) |
| | | .logsProgress(Objects.requireNonNullElse(progress, new LogsOutputProgressDTO()).getFiles()) |
| | | .deviceTopo(topologyService.getDeviceTopologyByGatewaySn(entity.getDeviceSn()).orElse(null)) |
| | | .build(); |
| | | } |