sean.zhou
2023-02-24 a7aaeabc7873a0eafb4a7ecad7f65b018b7a9bc9
src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
@@ -12,8 +12,8 @@
import com.dji.sample.manage.model.dto.DeviceDTO;
import com.dji.sample.manage.model.enums.UserTypeEnum;
import com.dji.sample.media.model.MediaFileCountDTO;
import com.dji.sample.wayline.model.dto.FlightTaskProgressReceiver;
import com.dji.sample.wayline.model.dto.WaylineJobDTO;
import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver;
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
import com.dji.sample.wayline.service.IFlightTaskService;
import com.dji.sample.wayline.service.IWaylineJobService;
@@ -29,7 +29,7 @@
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
@@ -59,12 +59,15 @@
    @Override
    @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND)
    public void handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
        EventsReceiver<FlightTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
                new TypeReference<EventsReceiver<FlightTaskProgressReceiver>>(){});
        String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC));
        String dockSn  = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(),
                receivedTopic.indexOf(TopicConst.EVENTS_SUF));
        EventsReceiver<WaylineTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
                new TypeReference<EventsReceiver<WaylineTaskProgressReceiver>>(){});
        eventsReceiver.setBid(receiver.getBid());
        eventsReceiver.setSn(receiver.getGateway());
        FlightTaskProgressReceiver output = eventsReceiver.getOutput();
        WaylineTaskProgressReceiver output = eventsReceiver.getOutput();
        log.info("Task progress: {}", output.getProgress().toString());
@@ -73,16 +76,19 @@
        }
        EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
        String key = RedisConst.WAYLINE_JOB_RUNNING_PREFIX + dockSn;
        RedisOpsUtils.setWithExpire(key, eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
        if (statusEnum.getEnd()) {
            WaylineJobDTO job = WaylineJobDTO.builder()
                    .jobId(receiver.getBid())
                    .status(WaylineJobStatusEnum.SUCCESS.getVal())
                    .endTime(LocalDateTime.now())
                    .completedTime(LocalDateTime.now())
                    .mediaCount(output.getExt().getMediaCount())
                    .build();
            // record the update of the media count.
            if (Objects.nonNull(job.getMediaCount())) {
            if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) {
                RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(),
                        MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build());
            }
@@ -93,9 +99,9 @@
            }
            waylineJobService.updateJob(job);
            RedisOpsUtils.del(receiver.getBid());
            RedisOpsUtils.del(key);
            RedisOpsUtils.del(RedisConst.WAYLINE_JOB_PAUSED_PREFIX + receiver.getBid());
        }
        RedisOpsUtils.setWithExpire(receiver.getBid(), eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
        DeviceDTO device = (DeviceDTO) RedisOpsUtils.get(RedisConst.DEVICE_ONLINE_PREFIX + receiver.getGateway());
        websocketMessageService.sendBatch(
@@ -108,8 +114,7 @@
                        .build());
        if (receiver.getNeedReply() == 1) {
            String topic = headers.get(MqttHeaders.RECEIVED_TOPIC) + TopicConst._REPLY_SUF;
            messageSender.publish(topic,
            messageSender.publish(receivedTopic + TopicConst._REPLY_SUF,
                    CommonTopicResponse.builder()
                            .tid(receiver.getTid())
                            .bid(receiver.getBid())
@@ -122,39 +127,42 @@
    @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
    private void checkScheduledJob() {
        Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB);
        log.info("Check the timed jobs of the wayline. {}", jobIdValue);
        Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE);
        if (Objects.isNull(jobIdValue)) {
            return;
        }
        String jobId = String.valueOf(jobIdValue);
        double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB, jobIdValue);
        log.info("Check the timed tasks of the wayline. {}", jobIdValue);
        // format: {workspace_id}:{dock_sn}:{job_id}
        String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER);
        double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
        long now = System.currentTimeMillis();
        int offset = 30_000;
        // Expired tasks are deleted directly.
        if (time < now - offset) {
            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB, jobId);
            RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
            waylineJobService.updateJob(WaylineJobDTO.builder()
                    .jobId(jobId)
                    .jobId(jobArr[2])
                    .status(WaylineJobStatusEnum.FAILED.getVal())
                    .endTime(LocalDateTime.now())
                    .executeTime(LocalDateTime.now())
                    .completedTime(LocalDateTime.now())
                    .code(HttpStatus.SC_REQUEST_TIMEOUT).build());
            return;
        }
        if (now <= time && time <= now + offset) {
            try {
                waylineJobService.executeFlightTask(jobId);
                waylineJobService.executeFlightTask(jobArr[0], jobArr[2]);
            } catch (Exception e) {
                log.info("The scheduled task delivery failed.");
                waylineJobService.updateJob(WaylineJobDTO.builder()
                        .jobId(jobId)
                        .jobId(jobArr[2])
                        .status(WaylineJobStatusEnum.FAILED.getVal())
                        .endTime(LocalDateTime.now())
                        .executeTime(LocalDateTime.now())
                        .completedTime(LocalDateTime.now())
                        .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build());
            } finally {
                RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB, jobId);
                RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue);
            }
        }
    }