rain
2024-08-21 2db1aa88e8ab53096a936163d686b90d8e056a99
src/main/java/com/dji/sample/manage/service/impl/DeviceHmsServiceImpl.java
@@ -9,11 +9,7 @@
import com.dji.sample.component.mqtt.model.CommonTopicReceiver;
import com.dji.sample.component.mqtt.model.MapKeyConst;
import com.dji.sample.component.mqtt.model.TopicConst;
import com.dji.sample.component.redis.RedisConst;
import com.dji.sample.component.redis.RedisOpsUtils;
import com.dji.sample.component.websocket.config.ConcurrentWebSocketSession;
import com.dji.sample.component.websocket.model.BizCodeEnum;
import com.dji.sample.component.websocket.model.CustomWebSocketMessage;
import com.dji.sample.component.websocket.service.impl.SendMessageServiceImpl;
import com.dji.sample.component.websocket.service.impl.WebSocketManageServiceImpl;
import com.dji.sample.manage.dao.IDeviceHmsMapper;
@@ -23,14 +19,17 @@
import com.dji.sample.manage.model.dto.DeviceHmsDTO;
import com.dji.sample.manage.model.dto.TelemetryDTO;
import com.dji.sample.manage.model.entity.DeviceHmsEntity;
import com.dji.sample.manage.model.enums.DeviceDomainEnum;
import com.dji.sample.manage.model.enums.HmsEnum;
import com.dji.sample.manage.model.enums.UserTypeEnum;
import com.dji.sample.manage.model.param.DeviceHmsQueryParam;
import com.dji.sample.manage.model.receiver.DeviceHmsReceiver;
import com.dji.sample.manage.model.receiver.HmsArgsReceiver;
import com.dji.sample.manage.service.IDeviceHmsService;
import com.dji.sample.manage.service.IDeviceRedisService;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.support.MqttHeaders;
@@ -54,6 +53,7 @@
 */
@Service
@Transactional
@Slf4j
public class DeviceHmsServiceImpl implements IDeviceHmsService {
    @Autowired
@@ -63,13 +63,13 @@
    private ObjectMapper objectMapper;
    @Autowired
    private RedisOpsUtils redisOps;
    @Autowired
    private SendMessageServiceImpl sendMessageService;
    @Autowired
    private WebSocketManageServiceImpl webSocketManageService;
    @Autowired
    private IDeviceRedisService deviceRedisService;
    private static final Pattern PATTERN_KEY = Pattern.compile(
            HmsEnum.FormatKeyEnum.KEY_START +
@@ -93,11 +93,8 @@
                .updateTime(0L)
                .sn(sn)
                .build();
        String key = RedisConst.HMS_PREFIX + sn;
        // Query all unread hms messages of the device in redis.
        Set<String> hmsMap = redisOps.listGetAll(key).stream().map(String::valueOf).collect(Collectors.toSet());
        DeviceDTO device = (DeviceDTO) redisOps.get(RedisConst.DEVICE_ONLINE_PREFIX + sn);
        Set<String> hmsMap = deviceRedisService.getAllHmsKeys(sn);
        List<DeviceHmsDTO> unReadList = new ArrayList<>();
        objectMapper.convertValue(((Map) (receiver.getData())).get(MapKeyConst.LIST),
@@ -117,15 +114,14 @@
        if (unReadList.isEmpty()) {
            return;
        }
        redisOps.listRPush(key, unReadList.stream().map(DeviceHmsDTO::getKey).toArray(String[]::new));
        deviceRedisService.addEndHmsKeys(sn, unReadList.stream().map(DeviceHmsDTO::getKey).toArray(String[]::new));
        // push to the web
        Collection<ConcurrentWebSocketSession> sessions = webSocketManageService.getValueWithWorkspaceAndUserType(
                device.getWorkspaceId(), UserTypeEnum.WEB.getVal());
        sendMessageService.sendBatch(sessions, CustomWebSocketMessage.builder()
                .bizCode(BizCodeEnum.DEVICE_HMS.getCode())
                .data(TelemetryDTO.<List<DeviceHmsDTO>>builder().sn(sn).host(unReadList).build())
                .timestamp(System.currentTimeMillis())
                .build());
        Optional<DeviceDTO> deviceOpt = deviceRedisService.getDeviceOnline(sn);
        if (deviceOpt.isEmpty()) {
            return;
        }
        sendMessageService.sendBatch(deviceOpt.get().getWorkspaceId(), UserTypeEnum.WEB.getVal(),
                BizCodeEnum.DEVICE_HMS.getCode(), TelemetryDTO.<List<DeviceHmsDTO>>builder().sn(sn).host(unReadList).build());
    }
    @Override
@@ -162,7 +158,7 @@
                        .eq(DeviceHmsEntity::getSn, deviceSn)
                        .eq(DeviceHmsEntity::getUpdateTime, 0L));
        // Delete unread messages cached in redis.
        redisOps.del(RedisConst.HMS_PREFIX + deviceSn);
        deviceRedisService.delHmsKeysBySn(deviceSn);
    }
    private DeviceHmsDTO entity2Dto(DeviceHmsEntity entity) {
@@ -194,8 +190,12 @@
        dto.setLevel(receiver.getLevel());
        dto.setModule(receiver.getModule());
        dto.setHmsId(UUID.randomUUID().toString());
        if (HmsEnum.DomainType.DRONE_NEST.getDomain().equals(receiver.getDomainType())) {
        Optional<DeviceDomainEnum> domainEnumOpt = Optional.ofNullable(receiver.getDeviceType())
                .map(type -> type.split("-")).map(type -> type[0]).map(Integer::parseInt).map(DeviceDomainEnum::find);
        if (domainEnumOpt.isEmpty()) {
            throw new RuntimeException("设备类型不匹配,请检查数据");
        }
        if (DeviceDomainEnum.DOCK == domainEnumOpt.get()) {
            dto.setHmsKey(HmsEnum.HmsFaqIdEnum.DOCK_TIP.getText() + receiver.getCode());
            return;
        }