sean.zhou
2022-11-18 56df98ce4952239fbf7d0e99dbeb0e5c71531d6f
src/main/java/com/dji/sample/wayline/service/impl/FlightTaskServiceImpl.java
@@ -12,15 +12,24 @@
import com.dji.sample.manage.model.dto.DeviceDTO;
import com.dji.sample.manage.model.enums.UserTypeEnum;
import com.dji.sample.wayline.model.dto.FlightTaskProgressReceiver;
import com.dji.sample.wayline.model.dto.WaylineJobDTO;
import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum;
import com.dji.sample.wayline.service.IFlightTaskService;
import com.dji.sample.wayline.service.IWaylineJobService;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.http.HttpStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.mqtt.support.MqttHeaders;
import org.springframework.messaging.MessageHeaders;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Service;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
 * @author sean
@@ -46,18 +55,43 @@
    @Autowired
    private RedisOpsUtils redisOps;
    @Autowired
    private IWaylineJobService waylineJobService;
    @Override
    @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND)
    public void handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) {
        EventsReceiver<FlightTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(),
                new TypeReference<EventsReceiver<FlightTaskProgressReceiver>>(){});
        eventsReceiver.setBid(receiver.getBid());
        eventsReceiver.setSn(receiver.getGateway());
        log.info("Task progress: " + eventsReceiver.getOutput().getProgress().toString());
        FlightTaskProgressReceiver output = eventsReceiver.getOutput();
        log.info("Task progress: {}", output.getProgress().toString());
        if (eventsReceiver.getResult() != ResponseResult.CODE_SUCCESS) {
            log.error("Error code: " + eventsReceiver.getResult());
            log.error("Task progress ===> Error code: " + eventsReceiver.getResult());
        }
        EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus());
        if (statusEnum.getEnd()) {
            WaylineJobDTO job = WaylineJobDTO.builder()
                    .jobId(receiver.getBid())
                    .status(WaylineJobStatusEnum.SUCCESS.getVal())
                    .endTime(LocalDateTime.now())
                    .mediaCount(output.getExt().getMediaCount())
                    .build();
            if (EventsResultStatusEnum.OK != statusEnum) {
                job.setCode(eventsReceiver.getResult());
                job.setStatus(WaylineJobStatusEnum.FAILED.getVal());
            }
            waylineJobService.updateJob(job);
            redisOps.del(receiver.getBid());
        }
        redisOps.setWithExpire(receiver.getBid(), eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND);
        DeviceDTO device = (DeviceDTO) redisOps.get(RedisConst.DEVICE_ONLINE_PREFIX + receiver.getGateway());
        websocketMessageService.sendBatch(
@@ -77,8 +111,47 @@
                            .bid(receiver.getBid())
                            .method(EventsMethodEnum.FLIGHT_TASK_PROGRESS.getMethod())
                            .timestamp(System.currentTimeMillis())
                            .data(ResponseResult.success())
                            .data(RequestsReply.success())
                            .build());
        }
    }
    @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS)
    private void checkScheduledJob() {
        Object jobIdValue = redisOps.zGetMin(RedisConst.WAYLINE_JOB);
        log.info("Check the timed jobs of the wayline. {}", jobIdValue);
        if (Objects.isNull(jobIdValue)) {
            return;
        }
        String jobId = String.valueOf(jobIdValue);
        double time = redisOps.zScore(RedisConst.WAYLINE_JOB, jobIdValue);
        long now = System.currentTimeMillis();
        int offset = 30_000;
        // Expired tasks are deleted directly.
        if (time < now - offset) {
            redisOps.zRemove(RedisConst.WAYLINE_JOB, jobId);
            waylineJobService.updateJob(WaylineJobDTO.builder()
                    .jobId(jobId)
                    .status(WaylineJobStatusEnum.FAILED.getVal())
                    .endTime(LocalDateTime.now())
                    .code(HttpStatus.SC_REQUEST_TIMEOUT).build());
            return;
        }
        if (now <= time && time <= now + offset) {
            try {
                waylineJobService.executeFlightTask(jobId);
            } catch (Exception e) {
                log.info("The scheduled task delivery failed.");
                waylineJobService.updateJob(WaylineJobDTO.builder()
                        .jobId(jobId)
                        .status(WaylineJobStatusEnum.FAILED.getVal())
                        .endTime(LocalDateTime.now())
                        .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build());
            } finally {
                redisOps.zRemove(RedisConst.WAYLINE_JOB, jobId);
            }
        }
    }
}