| | |
| | | import com.dji.sample.manage.model.dto.DeviceDTO; |
| | | import com.dji.sample.manage.model.enums.UserTypeEnum; |
| | | import com.dji.sample.media.model.MediaFileCountDTO; |
| | | import com.dji.sample.wayline.model.dto.FlightTaskProgressReceiver; |
| | | import com.dji.sample.wayline.model.dto.WaylineJobDTO; |
| | | import com.dji.sample.wayline.model.dto.WaylineTaskProgressReceiver; |
| | | import com.dji.sample.wayline.model.enums.WaylineJobStatusEnum; |
| | | import com.dji.sample.wayline.service.IFlightTaskService; |
| | | import com.dji.sample.wayline.service.IWaylineJobService; |
| | |
| | | import org.springframework.stereotype.Service; |
| | | |
| | | import java.time.LocalDateTime; |
| | | import java.util.Objects; |
| | | import java.util.*; |
| | | import java.util.concurrent.TimeUnit; |
| | | |
| | | /** |
| | |
| | | @Override |
| | | @ServiceActivator(inputChannel = ChannelName.INBOUND_EVENTS_FLIGHT_TASK_PROGRESS, outputChannel = ChannelName.OUTBOUND) |
| | | public void handleProgress(CommonTopicReceiver receiver, MessageHeaders headers) { |
| | | EventsReceiver<FlightTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(), |
| | | new TypeReference<EventsReceiver<FlightTaskProgressReceiver>>(){}); |
| | | String receivedTopic = String.valueOf(headers.get(MqttHeaders.RECEIVED_TOPIC)); |
| | | String dockSn = receivedTopic.substring((TopicConst.THING_MODEL_PRE + TopicConst.PRODUCT).length(), |
| | | receivedTopic.indexOf(TopicConst.EVENTS_SUF)); |
| | | EventsReceiver<WaylineTaskProgressReceiver> eventsReceiver = mapper.convertValue(receiver.getData(), |
| | | new TypeReference<EventsReceiver<WaylineTaskProgressReceiver>>(){}); |
| | | eventsReceiver.setBid(receiver.getBid()); |
| | | eventsReceiver.setSn(receiver.getGateway()); |
| | | |
| | | FlightTaskProgressReceiver output = eventsReceiver.getOutput(); |
| | | WaylineTaskProgressReceiver output = eventsReceiver.getOutput(); |
| | | |
| | | log.info("Task progress: {}", output.getProgress().toString()); |
| | | |
| | |
| | | } |
| | | |
| | | EventsResultStatusEnum statusEnum = EventsResultStatusEnum.find(output.getStatus()); |
| | | String key = RedisConst.WAYLINE_JOB_RUNNING_PREFIX + dockSn; |
| | | RedisOpsUtils.setWithExpire(key, eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND); |
| | | |
| | | if (statusEnum.getEnd()) { |
| | | WaylineJobDTO job = WaylineJobDTO.builder() |
| | | .jobId(receiver.getBid()) |
| | | .status(WaylineJobStatusEnum.SUCCESS.getVal()) |
| | | .endTime(LocalDateTime.now()) |
| | | .completedTime(LocalDateTime.now()) |
| | | .mediaCount(output.getExt().getMediaCount()) |
| | | .build(); |
| | | |
| | | // record the update of the media count. |
| | | if (Objects.nonNull(job.getMediaCount())) { |
| | | if (Objects.nonNull(job.getMediaCount()) && job.getMediaCount() != 0) { |
| | | RedisOpsUtils.hashSet(RedisConst.MEDIA_FILE_PREFIX + receiver.getGateway(), job.getJobId(), |
| | | MediaFileCountDTO.builder().jobId(receiver.getBid()).mediaCount(job.getMediaCount()).uploadedCount(0).build()); |
| | | } |
| | |
| | | } |
| | | |
| | | waylineJobService.updateJob(job); |
| | | RedisOpsUtils.del(receiver.getBid()); |
| | | RedisOpsUtils.del(key); |
| | | RedisOpsUtils.del(RedisConst.WAYLINE_JOB_PAUSED_PREFIX + receiver.getBid()); |
| | | } |
| | | RedisOpsUtils.setWithExpire(receiver.getBid(), eventsReceiver, RedisConst.DEVICE_ALIVE_SECOND * RedisConst.DEVICE_ALIVE_SECOND); |
| | | |
| | | DeviceDTO device = (DeviceDTO) RedisOpsUtils.get(RedisConst.DEVICE_ONLINE_PREFIX + receiver.getGateway()); |
| | | websocketMessageService.sendBatch( |
| | |
| | | .build()); |
| | | |
| | | if (receiver.getNeedReply() == 1) { |
| | | String topic = headers.get(MqttHeaders.RECEIVED_TOPIC) + TopicConst._REPLY_SUF; |
| | | messageSender.publish(topic, |
| | | messageSender.publish(receivedTopic + TopicConst._REPLY_SUF, |
| | | CommonTopicResponse.builder() |
| | | .tid(receiver.getTid()) |
| | | .bid(receiver.getBid()) |
| | |
| | | |
| | | @Scheduled(initialDelay = 10, fixedRate = 5, timeUnit = TimeUnit.SECONDS) |
| | | private void checkScheduledJob() { |
| | | Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB); |
| | | log.info("Check the timed jobs of the wayline. {}", jobIdValue); |
| | | Object jobIdValue = RedisOpsUtils.zGetMin(RedisConst.WAYLINE_JOB_TIMED_EXECUTE); |
| | | if (Objects.isNull(jobIdValue)) { |
| | | return; |
| | | } |
| | | String jobId = String.valueOf(jobIdValue); |
| | | double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB, jobIdValue); |
| | | log.info("Check the timed tasks of the wayline. {}", jobIdValue); |
| | | // format: {workspace_id}:{dock_sn}:{job_id} |
| | | String[] jobArr = String.valueOf(jobIdValue).split(RedisConst.DELIMITER); |
| | | double time = RedisOpsUtils.zScore(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); |
| | | long now = System.currentTimeMillis(); |
| | | int offset = 30_000; |
| | | |
| | | // Expired tasks are deleted directly. |
| | | if (time < now - offset) { |
| | | RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB, jobId); |
| | | RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); |
| | | waylineJobService.updateJob(WaylineJobDTO.builder() |
| | | .jobId(jobId) |
| | | .jobId(jobArr[2]) |
| | | .status(WaylineJobStatusEnum.FAILED.getVal()) |
| | | .endTime(LocalDateTime.now()) |
| | | .executeTime(LocalDateTime.now()) |
| | | .completedTime(LocalDateTime.now()) |
| | | .code(HttpStatus.SC_REQUEST_TIMEOUT).build()); |
| | | return; |
| | | } |
| | | |
| | | if (now <= time && time <= now + offset) { |
| | | try { |
| | | waylineJobService.executeFlightTask(jobId); |
| | | waylineJobService.executeFlightTask(jobArr[0], jobArr[2]); |
| | | } catch (Exception e) { |
| | | log.info("The scheduled task delivery failed."); |
| | | waylineJobService.updateJob(WaylineJobDTO.builder() |
| | | .jobId(jobId) |
| | | .jobId(jobArr[2]) |
| | | .status(WaylineJobStatusEnum.FAILED.getVal()) |
| | | .endTime(LocalDateTime.now()) |
| | | .executeTime(LocalDateTime.now()) |
| | | .completedTime(LocalDateTime.now()) |
| | | .code(HttpStatus.SC_INTERNAL_SERVER_ERROR).build()); |
| | | } finally { |
| | | RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB, jobId); |
| | | RedisOpsUtils.zRemove(RedisConst.WAYLINE_JOB_TIMED_EXECUTE, jobIdValue); |
| | | } |
| | | } |
| | | } |