package com.dji.sample.manage.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.dji.sample.common.model.Pagination; import com.dji.sample.common.model.PaginationData; import com.dji.sample.common.model.ResponseResult; import com.dji.sample.component.mqtt.model.*; import com.dji.sample.component.mqtt.service.IMessageSenderService; import com.dji.sample.component.redis.RedisConst; import com.dji.sample.component.redis.RedisOpsUtils; import com.dji.sample.component.websocket.model.CustomWebSocketMessage; import com.dji.sample.component.websocket.service.ISendMessageService; import com.dji.sample.component.websocket.service.IWebSocketManageService; import com.dji.sample.manage.dao.IDeviceLogsMapper; import com.dji.sample.manage.model.dto.*; import com.dji.sample.manage.model.entity.DeviceLogsEntity; import com.dji.sample.manage.model.enums.DeviceDomainEnum; import com.dji.sample.manage.model.enums.DeviceLogsStatusEnum; import com.dji.sample.manage.model.enums.LogsFileUpdateMethodEnum; import com.dji.sample.manage.model.enums.UserTypeEnum; import com.dji.sample.manage.model.param.DeviceLogsCreateParam; import com.dji.sample.manage.model.param.DeviceLogsQueryParam; import com.dji.sample.manage.model.param.LogsFileUpdateParam; import com.dji.sample.manage.model.receiver.*; import com.dji.sample.manage.service.IDeviceLogsService; import com.dji.sample.manage.service.ILogsFileService; import com.dji.sample.manage.service.ITopologyService; import com.dji.sample.media.model.StsCredentialsDTO; import com.dji.sample.storage.service.IStorageService; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.net.URL; import java.time.Instant; import java.time.LocalDateTime; import java.time.ZoneId; import java.util.*; import java.util.stream.Collectors; /** * @author sean * @version 1.2 * @date 2022/9/7 */ @Service @Transactional @Slf4j public class DeviceLogsServiceImpl implements IDeviceLogsService { private static final String LOGS_FILE_SUFFIX = ".tar"; @Autowired private IDeviceLogsMapper mapper; @Autowired private ITopologyService topologyService; @Autowired private IMessageSenderService messageSenderService; @Autowired private ILogsFileService logsFileService; @Autowired private RedisOpsUtils redisOpsUtils; @Autowired private IStorageService storageService; @Autowired private ObjectMapper objectMapper; @Autowired private ISendMessageService webSocketMessageService; @Autowired private IWebSocketManageService webSocketManageService; @Override public PaginationData getUploadedLogs(String deviceSn, DeviceLogsQueryParam param) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper() .eq(DeviceLogsEntity::getDeviceSn, deviceSn) .between(Objects.nonNull(param.getBeginTime()) && Objects.nonNull(param.getEndTime()), DeviceLogsEntity::getCreateTime, param.getBeginTime(), param.getEndTime()) .eq(Objects.nonNull(param.getStatus()), DeviceLogsEntity::getStatus, param.getStatus()) .like(StringUtils.hasText(param.getLogsInformation()), DeviceLogsEntity::getLogsInfo, param.getLogsInformation()) .orderByDesc(DeviceLogsEntity::getCreateTime); Page pagination = mapper.selectPage(new Page<>(param.getPage(), param.getPageSize()), queryWrapper); List deviceLogsList = pagination.getRecords().stream().map(this::entity2Dto).collect(Collectors.toList()); return new PaginationData(deviceLogsList, new Pagination(pagination)); } @Override public ResponseResult getRealTimeLogs(String deviceSn, List domainList) { boolean exist = redisOpsUtils.getExpire(RedisConst.DEVICE_ONLINE_PREFIX + deviceSn) > 0; if (!exist) { return ResponseResult.error("Device is offline."); } String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF; Optional serviceReplyOpt = messageSenderService.publishWithReply( LogsFileUploadList.class, topic, CommonTopicResponse.builder() .tid(UUID.randomUUID().toString()) .bid(UUID.randomUUID().toString()) .method(ServicesMethodEnum.FILE_UPLOAD_LIST.getMethod()) .timestamp(System.currentTimeMillis()) .data(Map.of(MapKeyConst.MODULE_LIST, domainList)) .build(), 1); if (serviceReplyOpt.isEmpty()) { return ResponseResult.error("No message reply received."); } LogsFileUploadList data = serviceReplyOpt.get(); for (LogsFileUpload file : data.getFiles()) { if (file.getDeviceSn().isBlank()) { file.setDeviceSn(deviceSn); } } return ResponseResult.success(data); } @Override public String insertDeviceLogs(String bid, String username, String deviceSn, DeviceLogsCreateParam param) { DeviceLogsEntity entity = DeviceLogsEntity.builder() .deviceSn(deviceSn) .username(username) .happenTime(param.getHappenTime()) .logsInfo(Objects.requireNonNullElse(param.getLogsInformation(), "")) .logsId(bid) .status(DeviceLogsStatusEnum.UPLOADING.getVal()) .build(); boolean insert = mapper.insert(entity) > 0; if (!insert) { return ""; } for (LogsFileUpload file : param.getFiles()) { insert = logsFileService.insertFile(file, entity.getLogsId()); if (!insert) { return ""; } } return bid; } @Override public ResponseResult pushFileUpload(String username, String deviceSn, DeviceLogsCreateParam param) { StsCredentialsDTO stsCredentials = storageService.getSTSCredentials(); LogsUploadCredentialsDTO credentialsDTO = new LogsUploadCredentialsDTO(stsCredentials); // Set the storage name of the file. List files = param.getFiles(); files.forEach(file -> file.setObjectKey(credentialsDTO.getObjectKeyPrefix() + "/" + UUID.randomUUID().toString() + LOGS_FILE_SUFFIX)); credentialsDTO.setParams(LogsFileUploadList.builder().files(files).build()); String bid = UUID.randomUUID().toString(); Optional serviceReply = messageSenderService.publishWithReply( TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF, CommonTopicResponse.builder() .tid(UUID.randomUUID().toString()) .bid(bid) .timestamp(System.currentTimeMillis()) .method(ServicesMethodEnum.FILE_UPLOAD_START.getMethod()) .data(credentialsDTO) .build()); if (serviceReply.isEmpty()) { return ResponseResult.error("No message reply received."); } ServiceReply reply = serviceReply.get(); if (ResponseResult.CODE_SUCCESS != reply.getResult()) { return ResponseResult.error(String.valueOf(reply.getResult())); } String logsId = this.insertDeviceLogs(bid, username, deviceSn, param); if (!bid.equals(logsId)) { return ResponseResult.error("Database insert failed."); } // Save the status of the log upload. redisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + deviceSn, bid, LogsOutputProgressDTO.builder().logsId(logsId).build()); return ResponseResult.success(); } @Override public ResponseResult pushUpdateFile(String deviceSn, LogsFileUpdateParam param) { LogsFileUpdateMethodEnum method = LogsFileUpdateMethodEnum.find(param.getStatus()); if (LogsFileUpdateMethodEnum.UNKNOWN == method) { return ResponseResult.error("Illegal param"); } String topic = TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT + deviceSn + TopicConst.SERVICES_SUF; String bid = UUID.randomUUID().toString(); Optional serviceReply = messageSenderService.publishWithReply(topic, CommonTopicResponse.builder() .tid(UUID.randomUUID().toString()) .bid(bid) .timestamp(System.currentTimeMillis()) .method(ServicesMethodEnum.FILE_UPLOAD_UPDATE.getMethod()) .data(param) .build()); if (serviceReply.isEmpty()) { return ResponseResult.error("No message reply received."); } ServiceReply reply = serviceReply.get(); if (ResponseResult.CODE_SUCCESS != reply.getResult()) { return ResponseResult.error("Error Code : " + reply.getResult()); } return ResponseResult.success(); } @Override public void deleteLogs(String deviceSn, String logsId) { mapper.delete(new LambdaUpdateWrapper() .eq(DeviceLogsEntity::getLogsId, logsId).eq(DeviceLogsEntity::getDeviceSn, deviceSn)); logsFileService.deleteFileByLogsId(logsId); } @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FILE_UPLOAD_PROGRESS, outputChannel = ChannelName.OUTBOUND) @Override public void handleFileUploadProgress(CommonTopicReceiver receiver, MessageHeaders headers) { String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString(); String sn = topic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(), topic.indexOf(TopicConst.EVENTS_SUF)); if (receiver.getNeedReply() != null && receiver.getNeedReply() == 1) { String replyTopic = headers.get(MqttHeaders.RECEIVED_TOPIC) + TopicConst._REPLY_SUF; messageSenderService.publish(replyTopic, CommonTopicResponse.builder() .tid(receiver.getTid()) .bid(receiver.getBid()) .method(receiver.getMethod()) .timestamp(System.currentTimeMillis()) .data(ResponseResult.success()) .build()); } EventsReceiver eventsReceiver = objectMapper.convertValue(receiver.getData(), new TypeReference>(){}); EventsReceiver webSocketData = new EventsReceiver<>(); webSocketData.setBid(receiver.getBid()); webSocketData.setSn(sn); DeviceDTO device = (DeviceDTO) redisOpsUtils.get(RedisConst.DEVICE_ONLINE_PREFIX + sn); try { OutputLogsProgressReceiver output = eventsReceiver.getOutput(); EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus()); log.info("Logs upload progress: {}", output.toString()); String key = RedisConst.LOGS_FILE_PREFIX + sn; LogsOutputProgressDTO progress; boolean exist = redisOpsUtils.checkExist(key); if (!exist && !statusEnum.getEnd()) { progress = LogsOutputProgressDTO.builder().logsId(receiver.getBid()).build(); redisOpsUtils.hashSet(key, receiver.getBid(), progress); } else if (exist) { progress = (LogsOutputProgressDTO) redisOpsUtils.hashGet(key, receiver.getBid()); } else { progress = LogsOutputProgressDTO.builder().build(); } progress.setStatus(output.getStatus()); // If the logs file is empty, delete the cache of this task. List fileReceivers = output.getExt().getFiles(); if (CollectionUtils.isEmpty(fileReceivers)) { redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn); return; } // refresh cache. List fileProgressList = new ArrayList<>(); fileReceivers.forEach(file -> { LogsProgressReceiver logsProgress = file.getProgress(); if (!StringUtils.hasText(file.getDeviceSn())) { if (String.valueOf(DeviceDomainEnum.DOCK.getVal()).equals(file.getDeviceModelDomain())) { file.setDeviceSn(sn); } else if (String.valueOf(DeviceDomainEnum.SUB_DEVICE.getVal()).equals(file.getDeviceModelDomain())) { file.setDeviceSn(device.getChildDeviceSn()); } } fileProgressList.add(LogsProgressDTO.builder() .deviceSn(file.getDeviceSn()) .deviceModelDomain(file.getDeviceModelDomain()) .result(logsProgress.getResult()) .status(logsProgress.getStatus()) .uploadRate(logsProgress.getUploadRate()) .progress(((logsProgress.getCurrentStep() - 1) * 100 + logsProgress.getProgress()) / logsProgress.getTotalStep()) .build()); }); progress.setFiles(fileProgressList); webSocketData.setOutput(progress); redisOpsUtils.hashSet(RedisConst.LOGS_FILE_PREFIX + sn, receiver.getBid(), progress); // Delete the cache at the end of the task. if (statusEnum.getEnd()) { redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn); this.updateLogsStatus(receiver.getBid(), DeviceLogsStatusEnum.find(statusEnum).getVal()); fileReceivers.forEach(file -> logsFileService.updateFile(receiver.getBid(), file)); } } catch (NullPointerException e) { this.updateLogsStatus(receiver.getBid(), DeviceLogsStatusEnum.FAILED.getVal()); redisOpsUtils.del(RedisConst.LOGS_FILE_PREFIX + sn); } webSocketMessageService.sendBatch( webSocketManageService.getValueWithWorkspaceAndUserType( device.getWorkspaceId(), UserTypeEnum.WEB.getVal()), CustomWebSocketMessage.builder() .data(webSocketData) .timestamp(System.currentTimeMillis()) .bizCode(receiver.getMethod()) .build()); } @Override public void updateLogsStatus(String logsId, Integer value) { mapper.update(DeviceLogsEntity.builder().status(value).build(), new LambdaUpdateWrapper().eq(DeviceLogsEntity::getLogsId, logsId)); if (DeviceLogsStatusEnum.DONE.getVal() == value) { logsFileService.updateFileUploadStatus(logsId, true); } } @Override public URL getLogsFileUrl(String logsId, String fileId) { return logsFileService.getLogsFileUrl(logsId, fileId); } private DeviceLogsDTO entity2Dto(DeviceLogsEntity entity) { if (Objects.isNull(entity)) { return null; } String key = RedisConst.LOGS_FILE_PREFIX + entity.getDeviceSn(); LogsOutputProgressDTO progress = new LogsOutputProgressDTO(); if (redisOpsUtils.hashCheck(key, entity.getLogsId())) { progress = (LogsOutputProgressDTO) redisOpsUtils.hashGet(key, entity.getLogsId()); } return DeviceLogsDTO.builder() .logsId(entity.getLogsId()) .createTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getCreateTime()), ZoneId.systemDefault())) .happenTime(Objects.isNull(entity.getHappenTime()) ? null : LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getHappenTime()), ZoneId.systemDefault())) .status(entity.getStatus()) .logsInformation(entity.getLogsInfo()) .userName(entity.getUsername()) .deviceLogs(LogsFileUploadList.builder().files(logsFileService.getLogsFileByLogsId(entity.getLogsId())).build()) .logsProgress(progress.getFiles()) .deviceTopo(topologyService.getDeviceTopologyByGatewaySn(entity.getDeviceSn()).orElse(null)) .build(); } }