From 68f80a7451a126335153ec00bb6cb520a5ae1f8d Mon Sep 17 00:00:00 2001
From: guoshilong <123456>
Date: Mon, 13 Nov 2023 17:02:45 +0800
Subject: [PATCH] 定时任务逻辑修改

---
 src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java |  306 ++++++++++++++++++++++++++++++++++++--------------
 1 files changed, 221 insertions(+), 85 deletions(-)

diff --git a/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java b/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java
index 7e8882c..b8e7a79 100644
--- a/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java
+++ b/src/main/java/com/dji/sample/manage/service/impl/DeviceFirmwareServiceImpl.java
@@ -1,39 +1,55 @@
 package com.dji.sample.manage.service.impl;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
+import com.baomidou.mybatisplus.core.toolkit.Wrappers;
+import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
+import com.dji.sample.common.model.Pagination;
+import com.dji.sample.common.model.PaginationData;
 import com.dji.sample.common.model.ResponseResult;
 import com.dji.sample.component.mqtt.model.*;
 import com.dji.sample.component.mqtt.service.impl.MessageSenderServiceImpl;
+import com.dji.sample.component.oss.model.OssConfiguration;
+import com.dji.sample.component.oss.service.impl.OssServiceContext;
 import com.dji.sample.component.redis.RedisConst;
 import com.dji.sample.component.redis.RedisOpsUtils;
-import com.dji.sample.component.websocket.model.CustomWebSocketMessage;
+import com.dji.sample.component.websocket.model.BizCodeEnum;
 import com.dji.sample.component.websocket.service.IWebSocketManageService;
 import com.dji.sample.component.websocket.service.impl.SendMessageServiceImpl;
 import com.dji.sample.manage.dao.IDeviceFirmwareMapper;
-import com.dji.sample.manage.model.dto.DeviceDTO;
-import com.dji.sample.manage.model.dto.DeviceFirmwareDTO;
-import com.dji.sample.manage.model.dto.DeviceFirmwareNoteDTO;
-import com.dji.sample.manage.model.dto.DeviceFirmwareUpgradeDTO;
+import com.dji.sample.manage.model.dto.*;
 import com.dji.sample.manage.model.entity.DeviceFirmwareEntity;
 import com.dji.sample.manage.model.enums.UserTypeEnum;
+import com.dji.sample.manage.model.param.DeviceFirmwareQueryParam;
+import com.dji.sample.manage.model.param.DeviceFirmwareUploadParam;
 import com.dji.sample.manage.model.param.DeviceOtaCreateParam;
+import com.dji.sample.manage.model.receiver.FirmwareProgressExtReceiver;
 import com.dji.sample.manage.service.IDeviceFirmwareService;
-import com.dji.sample.manage.service.IDeviceService;
+import com.dji.sample.manage.service.IDeviceRedisService;
+import com.dji.sample.manage.service.IFirmwareModelService;
 import com.fasterxml.jackson.core.type.TypeReference;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.integration.annotation.ServiceActivator;
-import org.springframework.integration.mqtt.support.MqttHeaders;
 import org.springframework.messaging.MessageHeaders;
 import org.springframework.stereotype.Service;
+import org.springframework.util.DigestUtils;
+import org.springframework.util.StringUtils;
+import org.springframework.web.multipart.MultipartFile;
 
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.time.Instant;
 import java.time.LocalDate;
 import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
+import java.time.format.DateTimeFormatter;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.zip.ZipEntry;
+import java.util.zip.ZipInputStream;
 
 /**
  * @author sean
@@ -48,9 +64,6 @@
     private IDeviceFirmwareMapper mapper;
 
     @Autowired
-    private RedisOpsUtils redisOps;
-
-    @Autowired
     private MessageSenderServiceImpl messageSenderService;
 
     @Autowired
@@ -63,41 +76,45 @@
     private IWebSocketManageService webSocketManageService;
 
     @Autowired
-    private IDeviceService deviceService;
+    private OssServiceContext ossServiceContext;
+
+    @Autowired
+    private IFirmwareModelService firmwareModelService;
+
+    @Autowired
+    private IDeviceRedisService deviceRedisService;
 
     @Override
-    public Optional<DeviceFirmwareDTO> getFirmware(String deviceName, String version) {
+    public Optional<DeviceFirmwareDTO> getFirmware(String workspaceId, String deviceName, String version) {
         return Optional.ofNullable(entity2Dto(mapper.selectOne(
                 new LambdaQueryWrapper<DeviceFirmwareEntity>()
-                        .eq(DeviceFirmwareEntity::getDeviceName, deviceName)
-                        .eq(DeviceFirmwareEntity::getFirmwareVersion, version))));
+                        .eq(DeviceFirmwareEntity::getWorkspaceId, workspaceId)
+                        .eq(DeviceFirmwareEntity::getFirmwareVersion, version)
+                        .eq(DeviceFirmwareEntity::getStatus, true),
+                deviceName)));
     }
 
     @Override
     public Optional<DeviceFirmwareNoteDTO> getLatestFirmwareReleaseNote(String deviceName) {
         return Optional.ofNullable(entity2NoteDto(mapper.selectOne(
-                new LambdaQueryWrapper<DeviceFirmwareEntity>()
-                        .eq(DeviceFirmwareEntity::getDeviceName, deviceName)
+                Wrappers.lambdaQuery(DeviceFirmwareEntity.class)
                         .eq(DeviceFirmwareEntity::getStatus, true)
-                        .orderByDesc(DeviceFirmwareEntity::getReleaseDate)
-                        .last(" limit 1 "))));
+                        .orderByDesc(DeviceFirmwareEntity::getReleaseDate, DeviceFirmwareEntity::getFirmwareVersion),
+                deviceName)));
     }
 
     @Override
-    public List<DeviceOtaCreateParam> getDeviceOtaFirmware(List<DeviceFirmwareUpgradeDTO> upgradeDTOS) {
+    public List<DeviceOtaCreateParam> getDeviceOtaFirmware(String workspaceId, List<DeviceFirmwareUpgradeDTO> upgradeDTOS) {
         List<DeviceOtaCreateParam> deviceOtaList = new ArrayList<>();
         upgradeDTOS.forEach(upgradeDevice -> {
-            boolean exist = deviceService.checkDeviceOnline(upgradeDevice.getSn());
+            boolean exist = deviceRedisService.checkDeviceOnline(upgradeDevice.getSn());
             if (!exist) {
-                throw new IllegalArgumentException("Device is offline.");
+                throw new IllegalArgumentException("设备离线");
             }
             Optional<DeviceFirmwareDTO> firmwareOpt = this.getFirmware(
-                    upgradeDevice.getDeviceName(), upgradeDevice.getProductVersion());
+                    workspaceId, upgradeDevice.getDeviceName(), upgradeDevice.getProductVersion());
             if (firmwareOpt.isEmpty()) {
-                throw new IllegalArgumentException("This firmware version does not exist.");
-            }
-            if (!firmwareOpt.get().getFirmwareStatus()) {
-                throw new IllegalArgumentException("This firmware version is not available.");
+                throw new IllegalArgumentException("此固件版本不存在或不可用");
             }
             DeviceOtaCreateParam ota = dto2OtaCreateDto(firmwareOpt.get());
             ota.setSn(upgradeDevice.getSn());
@@ -107,19 +124,15 @@
         return deviceOtaList;
     }
 
-    @Override
-    @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_OTA_PROGRESS, outputChannel = ChannelName.OUTBOUND)
-    public void handleOtaProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
-        String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString();
-        String sn  = topic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
-                topic.indexOf(TopicConst.EVENTS_SUF));
+    @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_OTA_PROGRESS, outputChannel = ChannelName.OUTBOUND_EVENTS)
+    public CommonTopicReceiver handleOtaProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
+        String sn  = receiver.getGateway();
 
-        EventsReceiver<EventsOutputReceiver> eventsReceiver = objectMapper.convertValue(receiver.getData(),
-                new TypeReference<EventsReceiver<EventsOutputReceiver>>(){});
+        EventsReceiver<EventsOutputProgressReceiver<FirmwareProgressExtReceiver>> eventsReceiver = objectMapper.convertValue(receiver.getData(),
+                new TypeReference<EventsReceiver<EventsOutputProgressReceiver<FirmwareProgressExtReceiver>>>(){});
         eventsReceiver.setBid(receiver.getBid());
-        eventsReceiver.setSn(sn);
 
-        EventsOutputReceiver output = eventsReceiver.getOutput();
+        EventsOutputProgressReceiver<FirmwareProgressExtReceiver> output = eventsReceiver.getOutput();
         log.info("SN: {}, {} ===> Upgrading progress: {}",
                 sn, receiver.getMethod(), output.getProgress().toString());
 
@@ -127,55 +140,175 @@
             log.error("SN: {}, {} ===> Error code: {}", sn, receiver.getMethod(), eventsReceiver.getResult());
         }
 
-        DeviceDTO device = (DeviceDTO) redisOps.get(RedisConst.DEVICE_ONLINE_PREFIX + sn);
-        String childDeviceSn = device.getChildDeviceSn();
-        boolean upgrade = redisOps.getExpire(RedisConst.FIRMWARE_UPGRADING_PREFIX + sn) > 0;
-        boolean childUpgrade = redisOps.getExpire(RedisConst.FIRMWARE_UPGRADING_PREFIX + childDeviceSn) > 0;
+        Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(sn);
+        if (deviceOpt.isEmpty()) {
+            return null;
+        }
 
-        // Determine whether it is the ending state, delete the update state key in redis after the job ends.
         EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
-        if (upgrade) {
-            if (statusEnum.getEnd()) {
-                // Delete the cache after the update is complete.
-                redisOps.del(RedisConst.FIRMWARE_UPGRADING_PREFIX + sn);
-            } else {
-                // Update the update progress of the dock in redis.
-                redisOps.setWithExpire(
-                        RedisConst.FIRMWARE_UPGRADING_PREFIX + sn, output.getProgress().getPercent(),
-                        RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
-            }
-        }
-        if (childUpgrade) {
-            if (statusEnum.getEnd()) {
-                redisOps.del(RedisConst.FIRMWARE_UPGRADING_PREFIX + childDeviceSn);
-            } else {
-                // Update the update progress of the drone in redis.
-                redisOps.setWithExpire(
-                        RedisConst.FIRMWARE_UPGRADING_PREFIX + childDeviceSn, output.getProgress().getPercent(),
-                        RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
-            }
-        }
+        DeviceDTO device = deviceOpt.get();
+        handleProgress(device.getWorkspaceId(), sn, eventsReceiver, statusEnum.getEnd());
+        handleProgress(device.getWorkspaceId(), device.getChildDeviceSn(), eventsReceiver, statusEnum.getEnd());
 
-        webSocketMessageService.sendBatch(
-                webSocketManageService.getValueWithWorkspaceAndUserType(
-                        device.getWorkspaceId(), UserTypeEnum.WEB.getVal()),
-                CustomWebSocketMessage.builder()
-                        .data(eventsReceiver)
-                        .timestamp(System.currentTimeMillis())
-                        .bizCode(receiver.getMethod())
-                        .build());
+        return receiver;
+    }
 
-        if (receiver.getNeedReply() != null && receiver.getNeedReply() == 1) {
-            String replyTopic = headers.get(MqttHeaders.RECEIVED_TOPIC) + TopicConst._REPLY_SUF;
-            messageSenderService.publish(replyTopic,
-                    CommonTopicResponse.builder()
-                            .tid(receiver.getTid())
-                            .bid(receiver.getBid())
-                            .method(receiver.getMethod())
-                            .timestamp(System.currentTimeMillis())
-                            .data(RequestsReply.success())
+    private void handleProgress(String workspaceId, String sn,
+                    EventsReceiver<EventsOutputProgressReceiver<FirmwareProgressExtReceiver>> events, boolean isEnd) {
+        boolean upgrade = deviceRedisService.getFirmwareUpgradingProgress(sn).isPresent();
+        if (!upgrade) {
+            return;
+        }
+        if (isEnd) {
+            // Delete the cache after the update is complete.
+            deviceRedisService.delFirmwareUpgrading(sn);
+        } else {
+            // Update the update progress of the dock in redis.
+            deviceRedisService.setFirmwareUpgrading(sn, events);
+        }
+        events.setSn(sn);
+        webSocketMessageService.sendBatch(workspaceId, UserTypeEnum.WEB.getVal(), BizCodeEnum.OTA_PROGRESS.getCode(), events);
+    }
+
+    @Override
+    public Boolean checkFileExist(String workspaceId, String fileMd5) {
+        return RedisOpsUtils.checkExist(RedisConst.FILE_UPLOADING_PREFIX + workspaceId + fileMd5) ||
+                mapper.selectCount(new LambdaQueryWrapper<DeviceFirmwareEntity>()
+                    .eq(DeviceFirmwareEntity::getWorkspaceId, workspaceId)
+                    .eq(DeviceFirmwareEntity::getFileMd5, fileMd5))
+                > 0;
+    }
+
+    @Override
+    public PaginationData<DeviceFirmwareDTO> getAllFirmwarePagination(String workspaceId, DeviceFirmwareQueryParam param) {
+        Page<DeviceFirmwareEntity> page = mapper.selectPage(new Page<>(param.getPage(), param.getPageSize()),
+                new LambdaQueryWrapper<DeviceFirmwareEntity>()
+                        .eq(DeviceFirmwareEntity::getWorkspaceId, workspaceId)
+                        .eq(Objects.nonNull(param.getStatus()), DeviceFirmwareEntity::getStatus, param.getStatus())
+                        .like(StringUtils.hasText(param.getProductVersion()), DeviceFirmwareEntity::getFirmwareVersion, param.getProductVersion())
+                        .orderByDesc(DeviceFirmwareEntity::getReleaseDate), param.getDeviceName());
+
+        List<DeviceFirmwareDTO> data = page.getRecords().stream().map(this::entity2Dto).collect(Collectors.toList());
+        return new PaginationData<DeviceFirmwareDTO>(data, new Pagination(page));
+    }
+
+
+    @Override
+    public void importFirmwareFile(String workspaceId, String creator, DeviceFirmwareUploadParam param, MultipartFile file) {
+        String key = RedisConst.FILE_UPLOADING_PREFIX + workspaceId;
+        String existKey = key + file.getOriginalFilename();
+        if (RedisOpsUtils.getExpire(existKey) > 0) {
+            throw new RuntimeException("请稍后再试");
+        }
+        RedisOpsUtils.setWithExpire(existKey, true, RedisConst.DEVICE_ALIVE_SECOND);
+        try (InputStream is = file.getInputStream()) {
+            long size = is.available();
+            String md5 = DigestUtils.md5DigestAsHex(is);
+            key += md5;
+            boolean exist = checkFileExist(workspaceId, md5);
+            if (exist) {
+                throw new RuntimeException("文件已存在");
+            }
+            RedisOpsUtils.set(key, System.currentTimeMillis());
+            Optional<DeviceFirmwareDTO> firmwareOpt = verifyFirmwareFile(file);
+            if (firmwareOpt.isEmpty()) {
+                throw new RuntimeException("文件格式不正确");
+            }
+
+            String firmwareId = UUID.randomUUID().toString();
+            String objectKey = OssConfiguration.objectDirPrefix + File.separator + firmwareId + FirmwareFileProperties.FIRMWARE_FILE_SUFFIX;
+
+            ossServiceContext.putObject(OssConfiguration.bucket, objectKey, file.getInputStream());
+            log.info("upload success. {}", file.getOriginalFilename());
+            DeviceFirmwareDTO firmware = DeviceFirmwareDTO.builder()
+                    .releaseNote(param.getReleaseNote())
+                    .firmwareStatus(param.getStatus())
+                    .fileMd5(md5)
+                    .objectKey(objectKey)
+                    .fileName(file.getOriginalFilename())
+                    .workspaceId(workspaceId)
+                    .username(creator)
+                    .fileSize(size)
+                    .productVersion(firmwareOpt.get().getProductVersion())
+                    .releasedTime(firmwareOpt.get().getReleasedTime())
+                    .firmwareId(firmwareId)
+                    .build();
+
+            saveFirmwareInfo(firmware, param.getDeviceName());
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            RedisOpsUtils.del(key);
+        }
+    }
+
+    @Override
+    public void saveFirmwareInfo(DeviceFirmwareDTO firmware, List<String> deviceNames) {
+        DeviceFirmwareEntity entity = dto2Entity(firmware);
+        mapper.insert(entity);
+        firmwareModelService.saveFirmwareDeviceName(
+                FirmwareModelDTO.builder().firmwareId(entity.getFirmwareId()).deviceNames(deviceNames).build());
+    }
+
+    @Override
+    public void updateFirmwareInfo(DeviceFirmwareDTO firmware) {
+        mapper.update(dto2Entity(firmware),
+                new LambdaUpdateWrapper<DeviceFirmwareEntity>()
+                        .eq(DeviceFirmwareEntity::getFirmwareId, firmware.getFirmwareId()));
+    }
+
+    /**
+     * Parse firmware file information.
+     * @param file
+     * @return
+     */
+    private Optional<DeviceFirmwareDTO> verifyFirmwareFile(MultipartFile file) {
+        try (ZipInputStream unzipFile = new ZipInputStream(file.getInputStream(), StandardCharsets.UTF_8)) {
+            ZipEntry nextEntry = unzipFile.getNextEntry();
+            while (Objects.nonNull(nextEntry)) {
+                String configName = nextEntry.getName();
+                if (!configName.contains(File.separator) && configName.endsWith(FirmwareFileProperties.FIRMWARE_CONFIG_FILE_SUFFIX + FirmwareFileProperties.FIRMWARE_SIG_FILE_SUFFIX)) {
+                    String[] filenameArr = configName.split(FirmwareFileProperties.FIRMWARE_FILE_DELIMITER);
+                    String date = filenameArr[FirmwareFileProperties.FILENAME_RELEASE_DATE_INDEX];
+                    int index = date.indexOf(".");
+                    if (index != -1) {
+                        date = date.substring(0, index);
+                    }
+                    return Optional.of(DeviceFirmwareDTO.builder()
+                            .releasedTime(LocalDate.parse(
+                                    date,
+                                    DateTimeFormatter.ofPattern(FirmwareFileProperties.FILENAME_RELEASE_DATE_FORMAT)))
+                            // delete the string v.
+                            .productVersion(filenameArr[FirmwareFileProperties.FILENAME_VERSION_INDEX].substring(1))
                             .build());
+                }
+                nextEntry = unzipFile.getNextEntry();
+            }
+
+        } catch (IOException e) {
+            e.printStackTrace();
         }
+        return Optional.empty();
+    }
+
+    private DeviceFirmwareEntity dto2Entity(DeviceFirmwareDTO dto) {
+        if (dto == null) {
+            return null;
+        }
+        return DeviceFirmwareEntity.builder()
+                .fileName(dto.getFileName())
+                .fileMd5(dto.getFileMd5())
+                .fileSize(dto.getFileSize())
+                .firmwareId(dto.getFirmwareId())
+                .firmwareVersion(dto.getProductVersion())
+                .objectKey(dto.getObjectKey())
+                .releaseDate(Objects.nonNull(dto.getReleasedTime()) ?
+                        dto.getReleasedTime().atStartOfDay(ZoneId.systemDefault()).toInstant().toEpochMilli() : null)
+                .releaseNote(dto.getReleaseNote())
+                .status(dto.getFirmwareStatus())
+                .workspaceId(dto.getWorkspaceId())
+                .username(dto.getUsername())
+                .build();
     }
 
     private DeviceFirmwareNoteDTO entity2NoteDto (DeviceFirmwareEntity entity) {
@@ -195,15 +328,18 @@
             return null;
         }
         return DeviceFirmwareDTO.builder()
-                .deviceName(entity.getDeviceName())
+                .deviceName(Arrays.asList(entity.getDeviceName().split(",")))
                 .fileMd5(entity.getFileMd5())
                 .fileSize(entity.getFileSize())
-                .fileUrl(entity.getFileUrl())
+                .objectKey(entity.getObjectKey())
                 .firmwareId(entity.getFirmwareId())
                 .fileName(entity.getFileName())
                 .productVersion(entity.getFirmwareVersion())
                 .releasedTime(LocalDate.ofInstant(Instant.ofEpochMilli(entity.getReleaseDate()), ZoneId.systemDefault()))
+                .releaseNote(entity.getReleaseNote())
                 .firmwareStatus(entity.getStatus())
+                .workspaceId(entity.getWorkspaceId())
+                .username(entity.getUsername())
                 .build();
     }
 
@@ -213,7 +349,7 @@
         }
         return DeviceOtaCreateParam.builder()
                 .fileSize(dto.getFileSize())
-                .fileUrl(dto.getFileUrl())
+                .fileUrl(ossServiceContext.getObjectUrl(OssConfiguration.bucket, dto.getObjectKey()).toString())
                 .fileName(dto.getFileName())
                 .md5(dto.getFileMd5())
                 .productVersion(dto.getProductVersion())

--
Gitblit v1.9.3