guoshilong
2023-11-13 68f80a7451a126335153ec00bb6cb520a5ae1f8d
src/main/java/com/dji/sample/manage/service/impl/DeviceLogsServiceImpl.java
@@ -10,9 +10,8 @@
import com.dji.sample.component.mqtt.service.IMessageSenderService;
import com.dji.sample.component.redis.RedisConst;
import com.dji.sample.component.redis.RedisOpsUtils;
import com.dji.sample.component.websocket.model.CustomWebSocketMessage;
import com.dji.sample.component.websocket.model.BizCodeEnum;
import com.dji.sample.component.websocket.service.ISendMessageService;
import com.dji.sample.component.websocket.service.IWebSocketManageService;
import com.dji.sample.manage.dao.IDeviceLogsMapper;
import com.dji.sample.manage.model.dto.*;
import com.dji.sample.manage.model.entity.DeviceLogsEntity;
@@ -22,6 +21,7 @@
import com.dji.sample.manage.model.param.LogsFileUpdateParam;
import com.dji.sample.manage.model.receiver.*;
import com.dji.sample.manage.service.IDeviceLogsService;
import com.dji.sample.manage.service.IDeviceRedisService;
import com.dji.sample.manage.service.ILogsFileService;
import com.dji.sample.manage.service.ITopologyService;
import com.dji.sample.media.model.StsCredentialsDTO;
@@ -70,9 +70,6 @@
    private ILogsFileService logsFileService;
    @Autowired
    private RedisOpsUtils redisOpsUtils;
    @Autowired
    private IStorageService storageService;
    @Autowired
@@ -82,7 +79,7 @@
    private ISendMessageService webSocketMessageService;
    @Autowired
    private IWebSocketManageService webSocketManageService;
    private IDeviceRedisService deviceRedisService;
    @Override
    public PaginationData<DeviceLogsDTO> getUploadedLogs(String deviceSn, DeviceLogsQueryParam param) {
@@ -104,29 +101,21 @@
    @Override
    public ResponseResult getRealTimeLogs(String deviceSn, List<String> domainList) {
        boolean exist = redisOpsUtils.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + deviceSn) > 0;
        boolean exist = deviceRedisService.checkDeviceOnline(deviceSn);
        if (!exist) {
            return ResponseResult.error("Device is offline.");
            return ResponseResult.error("设备离线");
        }
        String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF;
        LogsFileUploadList data = messageSenderService.publishWithReply(
                LogsFileUploadList.class,
                topic,
                CommonTopicResponse.builder()
                        .tid(UUID.randomUUID().toString())
                        .bid(UUID.randomUUID().toString())
                        .method(LogsFileMethodEnum.FILE_UPLOAD_LIST.getMethod())
                        .timestamp(System.currentTimeMillis())
                        .data(Map.of(MapKeyConst.MODULE_LIST, domainList))
                        .build(), 1);
        ServiceReply<List<LogsFileUpload>> data = messageSenderService.publishServicesTopic(
                new TypeReference<List<LogsFileUpload>>() {}, deviceSn, LogsFileMethodEnum.FILE_UPLOAD_LIST.getMethod(),
                Map.of(MapKeyConst.MODULE_LIST, domainList));
        for (LogsFileUpload file : data.getFiles()) {
        for (LogsFileUpload file : data.getOutput()) {
            if (file.getDeviceSn().isBlank()) {
                file.setDeviceSn(deviceSn);
            }
        }
        return ResponseResult.success(data);
        return ResponseResult.success(new LogsFileUploadList(data.getOutput(), data.getResult()));
    }
    @Override
@@ -164,15 +153,8 @@
        credentialsDTO.setParams(LogsFileUploadList.builder().files(files).build());
        String bid = UUID.randomUUID().toString();
        ServiceReply reply = messageSenderService.publishWithReply(
                TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF,
                CommonTopicResponse.<LogsUploadCredentialsDTO>builder()
                        .tid(UUID.randomUUID().toString())
                        .bid(bid)
                        .timestamp(System.currentTimeMillis())
                        .method(LogsFileMethodEnum.FILE_UPLOAD_START.getMethod())
                        .data(credentialsDTO)
                        .build());
        ServiceReply reply = messageSenderService.publishServicesTopic(
                deviceSn, LogsFileMethodEnum.FILE_UPLOAD_START.getMethod(), credentialsDTO, bid);
        if (ResponseResult.CODE_SUCCESS != reply.getResult()) {
            return ResponseResult.error(String.valueOf(reply.getResult()));
@@ -180,11 +162,11 @@
        String logsId = this.insertDeviceLogs(bid, username, deviceSn, param);
        if (!bid.equals(logsId)) {
            return ResponseResult.error("Database insert failed.");
            return ResponseResult.error("数据库插入失败");
        }
        // Save the status of the log upload.
        redisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + deviceSn, bid, LogsOutputProgressDTO.builder().logsId(logsId).build());
        RedisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + deviceSn, bid, LogsOutputProgressDTO.builder().logsId(logsId).build());
        return ResponseResult.success();
    }
@@ -193,21 +175,13 @@
    public ResponseResult pushUpdateFile(String deviceSn, LogsFileUpdateParam param) {
        LogsFileUpdateMethodEnum method = LogsFileUpdateMethodEnum.find(param.getStatus());
        if (LogsFileUpdateMethodEnum.UNKNOWN == method) {
            return ResponseResult.error("Illegal param");
            return ResponseResult.error("非法参数");
        }
        String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF;
        String bid = UUID.randomUUID().toString();
        ServiceReply reply = messageSenderService.publishWithReply(topic,
                CommonTopicResponse.<LogsFileUpdateParam>builder()
                        .tid(UUID.randomUUID().toString())
                        .bid(bid)
                        .timestamp(System.currentTimeMillis())
                        .method(LogsFileMethodEnum.FILE_UPLOAD_UPDATE.getMethod())
                        .data(param)
                        .build());
        ServiceReply reply = messageSenderService.publishServicesTopic(
                deviceSn, LogsFileMethodEnum.FILE_UPLOAD_UPDATE.getMethod(), param);
        if (ResponseResult.CODE_SUCCESS != reply.getResult()) {
            return ResponseResult.error("Error Code : " + reply.getResult());
            return ResponseResult.error("错误码:" + reply.getResult());
        }
        return ResponseResult.success();
@@ -220,24 +194,12 @@
        logsFileService.deleteFileByLogsId(logsId);
    }
    @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FILE_UPLOAD_PROGRESS, outputChannel = ChannelName.OUTBOUND)
    @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FILE_UPLOAD_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS)
    @Override
    public void handleFileUploadProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
    public CommonTopicReceiver handleFileUploadProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
        String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
        String sn  = topic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
                topic.indexOf(TopicConst.EVENTS_SUF));
        if (receiver.getNeedReply() != null && receiver.getNeedReply() == 1) {
            String replyTopic = headers.get(MqttHeaders.RECEIVED_TOPIC) + TopicConst._REPLY_SUF;
            messageSenderService.publish(replyTopic,
                    CommonTopicResponse.builder()
                            .tid(receiver.getTid())
                            .bid(receiver.getBid())
                            .method(receiver.getMethod())
                            .timestamp(System.currentTimeMillis())
                            .data(RequestsReply.success())
                            .build());
        }
        EventsReceiver<OutputLogsProgressReceiver> eventsReceiver = objectMapper.convertValue(receiver.getData(),
                new TypeReference<EventsReceiver<OutputLogsProgressReceiver>>(){});
@@ -246,7 +208,12 @@
        webSocketData.setBid(receiver.getBid());
        webSocketData.setSn(sn);
        DeviceDTO device = (DeviceDTO) redisOpsUtils.get(RedisConst.DEVICE_ONLINE_PREFIX + sn);
        Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(sn);
        if (deviceOpt.isEmpty()) {
            return null;
        }
        DeviceDTO device = deviceOpt.get();
        try {
            OutputLogsProgressReceiver output = eventsReceiver.getOutput();
@@ -255,12 +222,12 @@
            String key = RedisConst.LOGS_FILE_PREFIX + sn;
            LogsOutputProgressDTO progress;
            boolean exist = redisOpsUtils.checkExist(key);
            boolean exist = RedisOpsUtils.checkExist(key);
            if (!exist && !statusEnum.getEnd()) {
                progress = LogsOutputProgressDTO.builder().logsId(receiver.getBid()).build();
                redisOpsUtils.hashSet(key, receiver.getBid(), progress);
                RedisOpsUtils.hashSet(key, receiver.getBid(), progress);
            } else if (exist) {
                progress = (LogsOutputProgressDTO) redisOpsUtils.hashGet(key, receiver.getBid());
                progress = (LogsOutputProgressDTO) RedisOpsUtils.hashGet(key, receiver.getBid());
            } else {
                progress = LogsOutputProgressDTO.builder().build();
            }
@@ -269,7 +236,7 @@
            // If the logs file is empty, delete the cache of this task.
            List<LogsExtFileReceiver> fileReceivers = output.getExt().getFiles();
            if (CollectionUtils.isEmpty(fileReceivers)) {
                redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
                RedisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
            }
            // refresh cache.
@@ -295,10 +262,10 @@
            });
            progress.setFiles(fileProgressList);
            webSocketData.setOutput(progress);
            redisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + sn, receiver.getBid(), progress);
            RedisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + sn, receiver.getBid(), progress);
            // Delete the cache at the end of the task.
            if (statusEnum.getEnd()) {
                redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
                RedisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
                this.updateLogsStatus(receiver.getBid(), DeviceLogsStatusEnum.find(statusEnum).getVal());
                fileReceivers.forEach(file -> logsFileService.updateFile(receiver.getBid(), file));
@@ -306,18 +273,13 @@
        } catch (NullPointerException e) {
            this.updateLogsStatus(receiver.getBid(), DeviceLogsStatusEnum.FAILED.getVal());
            redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
            RedisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn);
        }
        webSocketMessageService.sendBatch(
                webSocketManageService.getValueWithWorkspaceAndUserType(
                        device.getWorkspaceId(), UserTypeEnum.WEB.getVal()),
                CustomWebSocketMessage.builder()
                        .data(webSocketData)
                        .timestamp(System.currentTimeMillis())
                        .bizCode(receiver.getMethod())
                        .build());
        webSocketMessageService.sendBatch(device.getWorkspaceId(), UserTypeEnum.WEB.getVal(),
                BizCodeEnum.FILE_UPLOAD_PROGRESS.getCode(), webSocketData);
        return receiver;
    }
    @Override
@@ -340,9 +302,9 @@
            return null;
        }
        String key = RedisConst.LOGS_FILE_PREFIX + entity.getDeviceSn();
        LogsOutputProgressDTO progress = new LogsOutputProgressDTO();
        if (redisOpsUtils.hashCheck(key, entity.getLogsId())) {
            progress = (LogsOutputProgressDTO) redisOpsUtils.hashGet(key, entity.getLogsId());
        LogsOutputProgressDTO progress = null;
        if (RedisOpsUtils.hashCheck(key, entity.getLogsId())) {
            progress = (LogsOutputProgressDTO) RedisOpsUtils.hashGet(key, entity.getLogsId());
        }
        return DeviceLogsDTO.builder()
@@ -354,7 +316,7 @@
                .logsInformation(entity.getLogsInfo())
                .userName(entity.getUsername())
                .deviceLogs(LogsFileUploadList.builder().files(logsFileService.getLogsFileByLogsId(entity.getLogsId())).build())
                .logsProgress(progress.getFiles())
                .logsProgress(Objects.requireNonNullElse(progress, new LogsOutputProgressDTO()).getFiles())
                .deviceTopo(topologyService.getDeviceTopologyByGatewaySn(entity.getDeviceSn()).orElse(null))
                .build();
    }