package com.dji.sample.wayline.service.impl; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper; import com.baomidou.mybatisplus.core.toolkit.Wrappers; import com.baomidou.mybatisplus.extension.conditions.update.LambdaUpdateChainWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.dji.sample.common.error.CommonErrorEnum; import com.dji.sample.common.model.CustomClaim; import com.dji.sample.common.model.Pagination; import com.dji.sample.common.model.PaginationData; import com.dji.sample.common.model.ResponseResult; import com.dji.sample.common.util.SpringBeanUtils; import com.dji.sample.component.mqtt.model.*; import com.dji.sample.component.mqtt.service.IMessageSenderService; import com.dji.sample.component.redis.RedisConst; import com.dji.sample.component.redis.RedisOpsUtils; import com.dji.sample.control.model.dto.PointDTO; import com.dji.sample.control.model.enums.CameraModeEnum; import com.dji.sample.control.model.enums.DroneAuthorityEnum; import com.dji.sample.control.model.enums.PayloadCommandsEnum; import com.dji.sample.control.model.param.*; import com.dji.sample.control.service.IControlService; import com.dji.sample.control.service.IDrcService; import com.dji.sample.geo.entity.GeoJson; import com.dji.sample.manage.model.dto.DeviceDTO; import com.dji.sample.manage.model.enums.DeviceModeCodeEnum; import com.dji.sample.manage.model.enums.DockModeCodeEnum; import com.dji.sample.manage.model.enums.DroneRcLostActionEnum; import com.dji.sample.manage.model.enums.WaylineRcLostActionEnum; import com.dji.sample.manage.model.receiver.OsdDockReceiver; import com.dji.sample.manage.model.receiver.OsdSubDeviceReceiver; import com.dji.sample.manage.service.IDeviceRedisService; import com.dji.sample.manage.service.IDeviceService; import com.dji.sample.media.model.MediaFileCountDTO; import com.dji.sample.media.model.MediaMethodEnum; import com.dji.sample.media.service.IFileService; import com.dji.sample.geo.utils.GeoUtils; import com.dji.sample.wayline.dao.IWaylineJobMapper; import com.dji.sample.wayline.model.dto.*; import com.dji.sample.wayline.model.entity.WaylineJobEntity; import com.dji.sample.wayline.model.enums.*; import com.dji.sample.wayline.model.param.*; import com.dji.sample.wayline.service.IWaylineFileService; import com.dji.sample.wayline.service.IWaylineJobService; import com.dji.sample.wayline.service.IWaylineRedisService; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.integration.annotation.ServiceActivator; import org.springframework.integration.mqtt.support.MqttHeaders; import org.springframework.messaging.MessageHeaders; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Isolation; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.io.File; import java.net.URL; import java.sql.SQLException; import java.time.*; import java.util.*; import java.util.stream.Collectors; /** * @author sean * @version 1.1 * @date 2022/6/1 */ @Service @Transactional @Slf4j public class WaylineJobServiceImpl implements IWaylineJobService { @Autowired private IWaylineJobMapper mapper; @Autowired private IWaylineFileService waylineFileService; @Autowired private IDeviceService deviceService; @Autowired private IMessageSenderService messageSender; @Autowired private ObjectMapper objectMapper; @Autowired private IFileService fileService; @Autowired private IDrcService drcService; @Autowired private IDeviceRedisService deviceRedisService; @Autowired private IWaylineRedisService waylineRedisService; @Autowired private IControlService controlService; private Optional insertWaylineJob(WaylineJobEntity jobEntity) { int id = mapper.insert(jobEntity); if (id <= 0) { return Optional.empty(); } return Optional.ofNullable(this.entity2Dto(jobEntity)); } @Override public Optional createWaylineJob(CreateJobParam param, String workspaceId, String username, Long beginTime, Long endTime) { if (Objects.isNull(param)) { return Optional.empty(); } // Immediate tasks, allocating time on the backend. WaylineJobEntity jobEntity = WaylineJobEntity.builder() .name(param.getName()) .dockSn(param.getDockSn()) .fileId(param.getFileId()) .username(username) .workspaceId(workspaceId) .jobId(UUID.randomUUID().toString()) .beginTime(beginTime) .endTime(endTime) .status(WaylineJobStatusEnum.PENDING.getVal()) .taskType(param.getTaskType().getVal()) .waylineType(param.getWaylineType().getVal()) .outOfControlAction(param.getOutOfControlAction()) .rthAltitude(param.getRthAltitude()) .mediaCount(0) .build(); return insertWaylineJob(jobEntity); } @Override public Optional createWaylineJobByParent(String workspaceId, String parentId) { Optional parentJobOpt = this.getJobByJobId(workspaceId, parentId); if (parentJobOpt.isEmpty()) { return Optional.empty(); } WaylineJobEntity jobEntity = this.dto2Entity(parentJobOpt.get()); jobEntity.setJobId(UUID.randomUUID().toString()); jobEntity.setErrorCode(null); jobEntity.setCompletedTime(null); jobEntity.setExecuteTime(null); jobEntity.setStatus(WaylineJobStatusEnum.PENDING.getVal()); jobEntity.setParentId(parentId); return this.insertWaylineJob(jobEntity); } /** * 对于即时任务,以服务器时间为准。 * * @param param */ private void fillImmediateTime(CreateJobParam param) { if (WaylineTaskTypeEnum.IMMEDIATE != param.getTaskType()) { return; } long now = System.currentTimeMillis() / 1000 - 10; // param.setTaskDays(Collections.singletonList(now)); // param.setTaskPeriods(Collections.singletonList(Collections.singletonList(now))); param.setTaskDays(List.of(now)); param.setTaskPeriods(List.of(List.of(now))); } @Override public ResponseResult publishFlightTask(CreateJobParam param, CustomClaim customClaim) throws SQLException { fillImmediateTime(param); // param.getTaskDays().sort((a, b) -> (int) (a - b)); // param.getTaskPeriods().sort((a, b) -> (int) (a.get(0) - b.get(0))); for (Long taskDay : param.getTaskDays()) { LocalDate date = LocalDate.ofInstant(Instant.ofEpochSecond(taskDay), ZoneId.systemDefault()); for (List taskPeriod : param.getTaskPeriods()) { long beginTime = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(0)), ZoneId.systemDefault())) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); long endTime = taskPeriod.size() > 1 && Objects.nonNull(taskPeriod.get(1)) ? LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(taskPeriod.get(1)), ZoneId.systemDefault())) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() : beginTime; if (WaylineTaskTypeEnum.IMMEDIATE != param.getTaskType() && endTime < System.currentTimeMillis()) { return ResponseResult.error("任务已过期"); } Optional waylineJobOpt = this.createWaylineJob(param, customClaim.getWorkspaceId(), customClaim.getUsername(), beginTime, endTime); if (waylineJobOpt.isEmpty()) { throw new SQLException("任务创建失败"); } WaylineJobDTO waylineJob = waylineJobOpt.get(); // If it is a conditional task type, add conditions to the job parameters. //如果是条件任务类型,需要在任务参数中添加条件。 addPreparedJob(waylineJob, param, beginTime, endTime); ResponseResult response = this.publishOneFlightTask(waylineJob); if (ResponseResult.CODE_SUCCESS != response.getCode()) { return response; } } } return ResponseResult.success(); } @Override public ResponseResult publishFlightTaskCondition(CreateJobParam param, CustomClaim customClaim) throws SQLException { //开始日期 LocalDate startDate = LocalDate.ofInstant(Instant.ofEpochSecond(param.getTaskDays().get(0)), ZoneId.systemDefault()); long start = LocalDateTime.of(startDate, LocalTime.ofInstant(Instant.ofEpochSecond(param.getExecuteStartTimeArr().get(0).get(0)), ZoneId.systemDefault())) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); //结束日期 LocalDate endDate = LocalDate.ofInstant(Instant.ofEpochSecond(param.getTaskDays().get(1)), ZoneId.systemDefault()); long end = LocalDateTime.of(endDate, LocalTime.ofInstant(Instant.ofEpochSecond(param.getExecuteStartTimeArr().get(param.getExecuteStartTimeArr().size() - 1).get(0)), ZoneId.systemDefault())) .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); //保存数据 WaylineJobEntity waylineJobEntity = WaylineJobEntity.builder() .jobId(UUID.randomUUID().toString()) .name(param.getName()) .dockSn(param.getDockSn()) .fileId(param.getFileId()) .username(customClaim.getUsername()) .workspaceId(customClaim.getWorkspaceId()) .beginTime(start) .endTime(end) .status(WaylineJobStatusEnum.PENDING.getVal()) .taskType(param.getTaskType().getVal()) .waylineType(param.getWaylineType().getVal()) .outOfControlAction(param.getOutOfControlAction()) .batteryCapacity(param.getMinBatteryCapacity()) .rthAltitude(param.getRthAltitude()) .mediaCount(0) .repFreVal(param.getRepFreVal()) .repFreType(param.getRepFreType()) .repRuleType(param.getRepRuleType()) .repRuleVal(param.getRepRuleVal()) .executeTimeArr(param.getTaskPeriods()) .executeStartTimeArr(param.getExecuteStartTimeArr()) .build(); Optional waylineJobOpt = insertWaylineJob(waylineJobEntity); if (waylineJobOpt.isEmpty()) { throw new SQLException("任务创建失败"); } WaylineJobDTO waylineJob = waylineJobOpt.get(); //存一条记录, List timeArr = param.getExecuteStartTimeArr().get(0); LocalDate date = LocalDate.ofInstant(Instant.ofEpochSecond(timeArr.get(0)), ZoneId.systemDefault()); LocalDateTime beginTime = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(timeArr.get(0)), ZoneId.systemDefault())); LocalDateTime endTime = timeArr.size() > 1 && Objects.nonNull(timeArr.get(1)) ? LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(timeArr.get(1)), ZoneId.systemDefault())) : beginTime; if (WaylineTaskTypeEnum.IMMEDIATE != param.getTaskType() && endTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli() < System.currentTimeMillis()) { return ResponseResult.error("任务已过期"); } //条件任务 if (param.getTaskType() == WaylineTaskTypeEnum.CONDITION) { //如果是条件任务类型,需要在任务参数中添加条件。 waylineJob.setConditions( WaylineTaskConditionDTO.builder() .executableConditions(Objects.nonNull(param.getMinStorageCapacity()) ? WaylineTaskExecutableConditionDTO.builder().storageCapacity(param.getMinStorageCapacity()).build() : null) .readyConditions(WaylineTaskReadyConditionDTO.builder() .batteryCapacity(param.getMinBatteryCapacity()) .beginTime(beginTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) .endTime(endTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) .build()) .build()); } else { waylineJob.setBeginTime(beginTime); } ResponseResult response = this.publishOneFlightTask(waylineJob); if (ResponseResult.CODE_SUCCESS != response.getCode()) { return response; } return ResponseResult.success(); } private void addPreparedJob(WaylineJobDTO waylineJob, CreateJobParam param, Long beginTime, Long endTime) { if (WaylineTaskTypeEnum.CONDITION == param.getTaskType()) { waylineJob.setConditions( WaylineTaskConditionDTO.builder() .executableConditions(Objects.nonNull(param.getMinStorageCapacity()) ? WaylineTaskExecutableConditionDTO.builder().storageCapacity(param.getMinStorageCapacity()).build() : null) .readyConditions(WaylineTaskReadyConditionDTO.builder() .batteryCapacity(param.getMinBatteryCapacity()) .beginTime(beginTime) .endTime(endTime) .build()) .build()); waylineRedisService.setConditionalWaylineJob(waylineJob); } // value: {workspace_id}:{dock_sn}:{job_id} boolean isAdd = waylineRedisService.addPreparedWaylineJob(waylineJob); if (!isAdd) { throw new RuntimeException("创建任务失败。"); } } public ResponseResult publishOneFlightTask(WaylineJobDTO waylineJob) throws SQLException { boolean isSuccess = this.prepareFlightTask(waylineJob); if (!isSuccess) { return ResponseResult.error("任务准备失败"); } // Issue an immediate task execution command. //发出立即任务执行命令 if (WaylineTaskTypeEnum.IMMEDIATE == waylineJob.getTaskType()) { boolean isExecuted = executeFlightTask(waylineJob.getWorkspaceId(), waylineJob.getJobId()); if (!isExecuted) { return ResponseResult.error("执行任务失败"); } } return ResponseResult.success(); } private Boolean prepareFlightTask(WaylineJobDTO waylineJob) throws SQLException { boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.getDockSn()); if (!isOnline) { throw new RuntimeException("设备离线。"); } // get wayline file Optional waylineFile = waylineFileService.getWaylineByWaylineId(waylineJob.getWorkspaceId(), waylineJob.getFileId()); if (waylineFile.isEmpty()) { throw new SQLException("航线文件不存在。"); } // get file url //获取航线文件地址 URL url = waylineFileService.getObjectUrl(waylineJob.getWorkspaceId(), waylineFile.get().getWaylineId()); WaylineTaskCreateDTO flightTask = WaylineTaskCreateDTO.builder() .flightId(waylineJob.getJobId()) .executeTime(waylineJob.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()) .taskType(waylineJob.getTaskType()) .waylineType(waylineJob.getWaylineType()) .rthAltitude(waylineJob.getRthAltitude()) .outOfControlAction(waylineJob.getOutOfControlAction()) .file(WaylineTaskFileDTO.builder() .url(url.toString()) .fingerprint(waylineFile.get().getSign()) .build()) .build(); //当任务类型为条件时 if (WaylineTaskTypeEnum.CONDITION == waylineJob.getTaskType()) { if (Objects.isNull(waylineJob.getConditions())) { throw new IllegalArgumentException(); } flightTask.setReadyConditions(waylineJob.getConditions().getReadyConditions()); flightTask.setExecutableConditions(waylineJob.getConditions().getExecutableConditions()); } //发布飞行指令 ServiceReply serviceReply = messageSender.publishServicesTopic( waylineJob.getDockSn(), WaylineMethodEnum.FLIGHT_TASK_PREPARE.getMethod(), flightTask, waylineJob.getJobId()); if (ResponseResult.CODE_SUCCESS != serviceReply.getResult()) { log.info("Prepare task ====> Error code: {}", serviceReply.getResult()); //飞行失败,更新数据库信息 this.updateJob(WaylineJobDTO.builder() .workspaceId(waylineJob.getWorkspaceId()) .jobId(waylineJob.getJobId()) .executeTime(LocalDateTime.now()) .status(WaylineJobStatusEnum.FAILED.getVal()) .completedTime(LocalDateTime.now()) .code(serviceReply.getResult()).build()); return false; } return true; } @Override public Boolean executeFlightTask(String workspaceId, String jobId) { // get job Optional waylineJob = this.getJobByJobId(workspaceId, jobId); if (waylineJob.isEmpty()) { throw new IllegalArgumentException("任务不存在"); } boolean isOnline = deviceRedisService.checkDeviceOnline(waylineJob.get().getDockSn()); if (!isOnline) { throw new RuntimeException("设备离线"); } WaylineJobDTO job = waylineJob.get(); WaylineTaskCreateDTO flightTask = WaylineTaskCreateDTO.builder().flightId(jobId).build(); ServiceReply serviceReply = messageSender.publishServicesTopic( job.getDockSn(), WaylineMethodEnum.FLIGHT_TASK_EXECUTE.getMethod(), flightTask, jobId); if (ResponseResult.CODE_SUCCESS != serviceReply.getResult()) { log.info("Execute job ====> Error code: {}", serviceReply.getResult()); this.updateJob(WaylineJobDTO.builder() .jobId(jobId) .executeTime(LocalDateTime.now()) .status(WaylineJobStatusEnum.FAILED.getVal()) .completedTime(LocalDateTime.now()) .code(serviceReply.getResult()).build()); // The conditional task fails and enters the blocking status. if (WaylineTaskTypeEnum.CONDITION == job.getTaskType() && WaylineErrorCodeEnum.find(serviceReply.getResult()).isBlock()) { waylineRedisService.setBlockedWaylineJob(job.getDockSn(), jobId); } return false; } this.updateJob(WaylineJobDTO.builder() .jobId(jobId) .executeTime(LocalDateTime.now()) .status(WaylineJobStatusEnum.IN_PROGRESS.getVal()) .build()); waylineRedisService.setRunningWaylineJob(job.getDockSn(), EventsReceiver.builder().bid(jobId).sn(job.getDockSn()).build()); return true; } @Override public void cancelFlightTask(String workspaceId, Collection jobIds) { List waylineJobs = getJobsByConditions(workspaceId, jobIds, WaylineJobStatusEnum.PENDING); Set waylineJobIds = waylineJobs.stream().map(WaylineJobDTO::getJobId).collect(Collectors.toSet()); // Check if the task status is correct. boolean isErr = !jobIds.removeAll(waylineJobIds) || !jobIds.isEmpty(); if (isErr) { throw new IllegalArgumentException("以下任务状态不正确,不能取消" + Arrays.toString(jobIds.toArray())); } // Group job id by dock sn. Map> dockJobs = waylineJobs.stream() .collect(Collectors.groupingBy(WaylineJobDTO::getDockSn, Collectors.mapping(WaylineJobDTO::getJobId, Collectors.toList()))); dockJobs.forEach((dockSn, idList) -> this.publishCancelTask(workspaceId, dockSn, idList)); } public void publishCancelTask(String workspaceId, String dockSn, List jobIds) { boolean isOnline = deviceRedisService.checkDeviceOnline(dockSn); if (!isOnline) { throw new RuntimeException("设备离线"); } ServiceReply serviceReply = messageSender.publishServicesTopic( dockSn, WaylineMethodEnum.FLIGHT_TASK_CANCEL.getMethod(), Map.of(MapKeyConst.FLIGHT_IDS, jobIds)); if (ResponseResult.CODE_SUCCESS != serviceReply.getResult()) { log.info("Cancel job ====> Error code: {}", serviceReply.getResult()); throw new RuntimeException("航路作业取消失败 " + dockSn); } for (String jobId : jobIds) { this.updateJob(WaylineJobDTO.builder() .workspaceId(workspaceId) .jobId(jobId) .status(WaylineJobStatusEnum.CANCEL.getVal()) .completedTime(LocalDateTime.now()) .build()); } } public List getJobsByConditions(String workspaceId, Collection jobIds, WaylineJobStatusEnum status) { return mapper.selectList( new LambdaQueryWrapper() .eq(WaylineJobEntity::getWorkspaceId, workspaceId) .eq(Objects.nonNull(status), WaylineJobEntity::getStatus, status.getVal()) .in(!CollectionUtils.isEmpty(jobIds), WaylineJobEntity::getJobId, jobIds)) .stream() .map(this::entity2Dto) .collect(Collectors.toList()); } @Override public Optional getJobByJobId(String workspaceId, String jobId) { WaylineJobEntity jobEntity = mapper.selectOne( new LambdaQueryWrapper() .eq(WaylineJobEntity::getWorkspaceId, workspaceId) .eq(WaylineJobEntity::getJobId, jobId)); return Optional.ofNullable(entity2Dto(jobEntity)); } @Override public Boolean updateJob(WaylineJobDTO dto) { try { if (dto.getStatus() == 3) { this.checkNextJob(dto); } } catch (SQLException e) { throw new RuntimeException(e); } return mapper.update(this.dto2Entity(dto), new LambdaUpdateWrapper() .eq(WaylineJobEntity::getJobId, dto.getJobId())) > 0; } @Override public PaginationData getJobsByWorkspaceId(String workspaceId, long page, long pageSize, WaylineJobQueryParam waylineJobQueryParam) { Page pageData = mapper.getPage(new Page(page, pageSize), waylineJobQueryParam, workspaceId); List records = pageData.getRecords() .stream() .map(this::entity2Dto) .collect(Collectors.toList()); return new PaginationData(records, new Pagination(pageData)); } @Override public List getChildrenJobs(String workspaceId, WaylineJobQueryParam waylineJobQueryParam) { List list = mapper.getJobs(workspaceId, waylineJobQueryParam); List records = list .stream() .map(this::entity2Dto) .collect(Collectors.toList()); return records; } @Override @ServiceActivator(inputChannel = ChannelName.INBOUND_REQUESTS_FLIGHT_TASK_RESOURCE_GET, outputChannel = ChannelName.OUTBOUND) @Transactional(isolation = Isolation.READ_UNCOMMITTED) public void flightTaskResourceGet(CommonTopicReceiver receiver, MessageHeaders headers) { Map jobIdMap = objectMapper.convertValue(receiver.getData(), new TypeReference>() { }); String jobId = jobIdMap.get(MapKeyConst.FLIGHT_ID); CommonTopicResponse.CommonTopicResponseBuilder builder = CommonTopicResponse.builder() .tid(receiver.getTid()) .bid(receiver.getBid()) .method(RequestsMethodEnum.FLIGHT_TASK_RESOURCE_GET.getMethod()) .timestamp(System.currentTimeMillis()); String topic = headers.get(MqttHeaders.RECEIVED_TOPIC).toString() + TopicConst._REPLY_SUF; Optional deviceOpt = deviceRedisService.getDeviceOnline(receiver.getGateway()); if (deviceOpt.isEmpty()) { return; } Optional waylineJobOpt = this.getJobByJobId(deviceOpt.get().getWorkspaceId(), jobId); if (waylineJobOpt.isEmpty()) { builder.data(RequestsReply.error(CommonErrorEnum.ILLEGAL_ARGUMENT)); messageSender.publish(topic, builder.build()); return; } WaylineJobDTO waylineJob = waylineJobOpt.get(); // get wayline file Optional waylineFile = waylineFileService.getWaylineByWaylineId(waylineJob.getWorkspaceId(), waylineJob.getFileId()); if (waylineFile.isEmpty()) { builder.data(RequestsReply.error(CommonErrorEnum.ILLEGAL_ARGUMENT)); messageSender.publish(topic, builder.build()); return; } // get file url URL url = null; try { url = waylineFileService.getObjectUrl(waylineJob.getWorkspaceId(), waylineFile.get().getWaylineId()); builder.data(RequestsReply.success(WaylineTaskCreateDTO.builder() .file(WaylineTaskFileDTO.builder() .url(url.toString()) .fingerprint(waylineFile.get().getSign()) .build()) .build())); } catch (SQLException | NullPointerException e) { e.printStackTrace(); builder.data(RequestsReply.error(CommonErrorEnum.ILLEGAL_ARGUMENT)); messageSender.publish(topic, builder.build()); return; } messageSender.publish(topic, builder.build()); } @Override public void uploadMediaHighestPriority(String workspaceId, String jobId) { Optional jobOpt = getJobByJobId(workspaceId, jobId); if (jobOpt.isEmpty()) { throw new RuntimeException(CommonErrorEnum.ILLEGAL_ARGUMENT.getErrorMsg()); } String dockSn = jobOpt.get().getDockSn(); String key = RedisConst.MEDIA_HIGHEST_PRIORITY_PREFIX + dockSn; if (RedisOpsUtils.checkExist(key) && jobId.equals(((MediaFileCountDTO) RedisOpsUtils.get(key)).getJobId())) { return; } ServiceReply reply = messageSender.publishServicesTopic( dockSn, MediaMethodEnum.UPLOAD_FLIGHT_TASK_MEDIA_PRIORITIZE.getMethod(), Map.of(MapKeyConst.FLIGHT_ID, jobId)); if (ResponseResult.CODE_SUCCESS != reply.getResult()) { throw new RuntimeException("设置媒体作业上传优先级失败. 错误码: " + reply.getResult()); } } private WaylineJobEntity dto2Entity(WaylineJobDTO dto) { WaylineJobEntity.WaylineJobEntityBuilder builder = WaylineJobEntity.builder(); if (dto == null) { return builder.build(); } if (Objects.nonNull(dto.getBeginTime())) { builder.beginTime(dto.getBeginTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); } if (Objects.nonNull(dto.getEndTime())) { builder.endTime(dto.getEndTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); } if (Objects.nonNull(dto.getExecuteTime())) { builder.executeTime(dto.getExecuteTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); } if (Objects.nonNull(dto.getCompletedTime())) { builder.completedTime(dto.getCompletedTime().atZone(ZoneId.systemDefault()).toInstant().toEpochMilli()); } return builder.status(dto.getStatus()) .mediaCount(dto.getMediaCount()) .name(dto.getJobName()) .errorCode(dto.getCode()) .jobId(dto.getJobId()) .fileId(dto.getFileId()) .dockSn(dto.getDockSn()) .workspaceId(dto.getWorkspaceId()) .taskType(Optional.ofNullable(dto.getTaskType()).map(WaylineTaskTypeEnum::getVal).orElse(null)) .waylineType(Optional.ofNullable(dto.getWaylineType()).map(WaylineTemplateTypeEnum::getVal).orElse(null)) .username(dto.getUsername()) .rthAltitude(dto.getRthAltitude()) .outOfControlAction(dto.getOutOfControlAction()) .parentId(dto.getParentId()) .build(); } @Override public void updateJobStatus(String workspaceId, String jobId, UpdateJobParam param) { Optional waylineJobOpt = this.getJobByJobId(workspaceId, jobId); if (waylineJobOpt.isEmpty()) { throw new RuntimeException("任务不存在"); } WaylineJobDTO waylineJob = waylineJobOpt.get(); WaylineJobStatusEnum statusEnum = this.getWaylineState(waylineJob.getDockSn()); if (statusEnum.getEnd() || WaylineJobStatusEnum.PENDING == statusEnum) { throw new RuntimeException("航路线作业状态不匹配,无法执行操作."); } switch (param.getStatus()) { case PAUSE: pauseJob(workspaceId, waylineJob.getDockSn(), jobId, statusEnum); break; case RESUME: resumeJob(workspaceId, waylineJob.getDockSn(), jobId, statusEnum); break; } } public WaylineJobStatusEnum getWaylineState(String dockSn) { Optional dockOpt = deviceRedisService.getDeviceOnline(dockSn); if (dockOpt.isEmpty() || !StringUtils.hasText(dockOpt.get().getChildDeviceSn())) { return WaylineJobStatusEnum.UNKNOWN; } Optional dockOsdOpt = deviceRedisService.getDeviceOsd(dockSn, OsdDockReceiver.class); Optional deviceOsdOpt = deviceRedisService.getDeviceOsd(dockOpt.get().getChildDeviceSn(), OsdSubDeviceReceiver.class); if (dockOsdOpt.isEmpty() || deviceOsdOpt.isEmpty() || DockModeCodeEnum.WORKING != dockOsdOpt.get().getModeCode()) { return WaylineJobStatusEnum.UNKNOWN; } OsdSubDeviceReceiver osdDevice = deviceOsdOpt.get(); if (DeviceModeCodeEnum.WAYLINE == osdDevice.getModeCode() || DeviceModeCodeEnum.MANUAL == osdDevice.getModeCode() || DeviceModeCodeEnum.TAKEOFF_AUTO == osdDevice.getModeCode()) { if (StringUtils.hasText(waylineRedisService.getPausedWaylineJobId(dockSn))) { return WaylineJobStatusEnum.PAUSED; } if (waylineRedisService.getRunningWaylineJob(dockSn).isPresent()) { return WaylineJobStatusEnum.IN_PROGRESS; } } return WaylineJobStatusEnum.UNKNOWN; } @Override public WaylineJobEntity getLatestJob(String workspaceId, WaylineJobQueryParam waylineJobQueryParam) { WaylineJobEntity waylineJobEntity = mapper.getLatest(workspaceId, waylineJobQueryParam); return waylineJobEntity; } @Override public ResponseResult flyByArea(String sn, FlyAreaParam flyAreaParam, String deviceSn) throws Exception { //获取所有中心点 // List targetList = GeoUtils.caculatePointList(flyAreaParam); PointPOJO dockPoint = flyAreaParam.getDockPoint(); GeoJson geoJson = GeoUtils.readJsonFile(flyAreaParam.getJsonPath()); List targetList = GeoUtils.caculatePointList(geoJson, dockPoint, flyAreaParam.getRadius()); //一键起飞 TakeoffToPointParam takeoffToPointParam = buildTakeoffToPointParam(dockPoint); ResponseResult takeoffToPointRes = controlService.takeoffToPoint(sn, takeoffToPointParam); //设置飞向第一个点 while (takeoffToPointRes.getCode() == ResponseResult.CODE_SUCCESS) { //获取无人机状态 DeviceModeCodeEnum deviceMode = deviceService.getDeviceMode(deviceSn); //当无人机状态为手动飞行 if (deviceMode == DeviceModeCodeEnum.MANUAL) { //飞向目标点 FlyToPointParam flyToPointParam = new FlyToPointParam(); flyToPointParam.setMaxSpeed(14); List pointDTOS = new ArrayList<>(); PointDTO pointDTO = new PointDTO(); pointDTO.setHeight(150.0); pointDTO.setLongitude(targetList.get(0).getLon()); pointDTO.setLatitude(targetList.get(0).getLat()); pointDTOS.add(pointDTO); flyToPointParam.setPoints(pointDTOS); ResponseResult flyToRes = controlService.flyToPoint(sn, flyToPointParam); if (flyToRes.getCode() == ResponseResult.CODE_SUCCESS) { //第一个点指令飞行成功后,把数组存到redis中 JSONObject jsonObject = new JSONObject(); jsonObject.put("targetList",targetList); jsonObject.put("payloadIndex",flyAreaParam.getPayloadIndex()); jsonObject.put("curIndex",0); RedisOpsUtils.set("tuban:"+sn,jsonObject); } break; } } return ResponseResult.success(targetList); } public TakeoffToPointParam buildTakeoffToPointParam(PointPOJO dockPoint) { TakeoffToPointParam takeoffToPointParam = new TakeoffToPointParam(); takeoffToPointParam.setTargetLatitude(dockPoint.getLat()); takeoffToPointParam.setTargetLongitude(dockPoint.getLon()); //设置飞行高度 takeoffToPointParam.setTargetHeight(120.0); //设置安全起飞高度 takeoffToPointParam.setSecurityTakeoffHeight(100.0); //设置返航高度 takeoffToPointParam.setRthAltitude(100.0); //设置失控操作 takeoffToPointParam.setRcLostAction(DroneRcLostActionEnum.RETURN_HOME); //设置起飞速度 takeoffToPointParam.setMaxSpeed(10.0); takeoffToPointParam.setExitWaylineWhenRcLost(WaylineRcLostActionEnum.EXECUTE_RC_LOST_ACTION); return takeoffToPointParam; } @Override public ResponseResult checkNextJob(WaylineJobDTO job) throws SQLException { WaylineJobEntity params = new WaylineJobEntity(); params.setJobId(job.getJobId()); WaylineJobEntity job1 = mapper.selectOne(Wrappers.query(params)); WaylineJobEntity currentJob = JSON.parseObject(JSON.toJSONString(job1), WaylineJobEntity.class); if (currentJob == null) { return ResponseResult.error("该任务不存在"); } //该任务没有多次时间 if (currentJob.getExecuteStartTimeArr() == null) { return ResponseResult.success(); } List> executeStartTimeArr = currentJob.getExecuteStartTimeArr(); int indexTime = -1; long currentTime = System.currentTimeMillis(); for (int i = 0; i < executeStartTimeArr.size(); i++) { Long startTime = executeStartTimeArr.get(i).get(0) * 1000; //如果当前时间小于开始时间 if (currentTime < startTime) { //当前索引就是下一次要执行的时间 indexTime = i; break; } } if (indexTime == -1) { return ResponseResult.success(); } List timeArr = currentJob.getExecuteStartTimeArr().get(indexTime); LocalDate date = LocalDate.ofInstant(Instant.ofEpochSecond(timeArr.get(0)), ZoneId.systemDefault()); LocalDateTime beginDate = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(timeArr.get(0)), ZoneId.systemDefault())); LocalDateTime endDate = timeArr.size() > 1 && Objects.nonNull(timeArr.get(1)) ? LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(timeArr.get(1)), ZoneId.systemDefault())) : beginDate; long beginTime = beginDate .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); long endTime = endDate .atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); //添加一条新数据 WaylineJobEntity waylineJobEntity = WaylineJobEntity.builder() .jobId(UUID.randomUUID().toString()) //大于1则拿前面的parentId,为1则拿第0个的id .parentId(indexTime > 1 ? currentJob.getParentId().toString() : currentJob.getId().toString()) .name(currentJob.getName()) .dockSn(currentJob.getDockSn()) .fileId(currentJob.getFileId()) .username(currentJob.getUsername()) .workspaceId(currentJob.getWorkspaceId()) .beginTime(beginTime) .endTime(endTime) .status(WaylineJobStatusEnum.PENDING.getVal()) .taskType(currentJob.getTaskType()) .waylineType(currentJob.getWaylineType()) .outOfControlAction(currentJob.getOutOfControlAction()) .batteryCapacity(currentJob.getBatteryCapacity()) .rthAltitude(currentJob.getRthAltitude()) .mediaCount(0) .repFreVal(currentJob.getRepFreVal()) .repFreType(currentJob.getRepFreType()) .repRuleType(currentJob.getRepRuleType()) .repRuleVal(currentJob.getRepRuleVal()) .executeTimeArr(currentJob.getExecuteTimeArr()) .executeStartTimeArr(currentJob.getExecuteStartTimeArr()) .build(); Optional waylineJobOpt = insertWaylineJob(waylineJobEntity); if (waylineJobOpt.isEmpty()) { return ResponseResult.error("任务创建失败"); } WaylineJobDTO waylineJob = waylineJobOpt.get(); LocalDateTime beginTimeNext = LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(timeArr.get(0)), ZoneId.systemDefault())); LocalDateTime endTimeNext = timeArr.size() > 1 && Objects.nonNull(timeArr.get(1)) ? LocalDateTime.of(date, LocalTime.ofInstant(Instant.ofEpochSecond(timeArr.get(1)), ZoneId.systemDefault())) : beginTimeNext; //条件任务 if (currentJob.getTaskType() == 2) { //如果是条件任务类型,需要在任务参数中添加条件。 waylineJob.setConditions( WaylineTaskConditionDTO.builder() .executableConditions(null) .readyConditions(WaylineTaskReadyConditionDTO.builder() .batteryCapacity(currentJob.getBatteryCapacity()) .beginTime(beginTime) .endTime(endTime) .build()) .build()); } else { waylineJob.setBeginTime(beginDate); } ResponseResult response = this.publishOneFlightTask(waylineJob); if (ResponseResult.CODE_SUCCESS != response.getCode()) { return response; } return ResponseResult.success(); } @Override public WaylineJobCountDTO patrolStatistics(String workspaceId, String queryTime) { WaylineJobCountDTO waylineJobCountDTO = new WaylineJobCountDTO(); List list = mapper.patrolStatistics(workspaceId, queryTime); if (!CollectionUtils.isEmpty(list)) { waylineJobCountDTO.setTotalNumber(list.size()); long totalTime = list.stream().mapToLong(s -> s.getEndTime() - s.getBeginTime()).sum() / 1000; StringBuffer buffer = new StringBuffer(); long h = totalTime / 3600; long m = (totalTime % 3600) / 60; buffer.append(h).append(" h "); buffer.append(m).append(" min"); waylineJobCountDTO.setTotalDuration(buffer.toString()); } return waylineJobCountDTO; } @Override public void updateJobCollect(WaylineJobEntity waylineJob) { new LambdaUpdateChainWrapper<>(mapper) .eq(WaylineJobEntity::getJobId, waylineJob.getJobId()) .set(WaylineJobEntity::getCollectStatus, waylineJob.getCollectStatus()) .set(WaylineJobEntity::getUserId, waylineJob.getUserId()) .update(); } private void pauseJob(String workspaceId, String dockSn, String jobId, WaylineJobStatusEnum statusEnum) { if (WaylineJobStatusEnum.PAUSED == statusEnum && jobId.equals(waylineRedisService.getPausedWaylineJobId(dockSn))) { waylineRedisService.setPausedWaylineJob(dockSn, jobId); return; } ServiceReply reply = messageSender.publishServicesTopic( dockSn, WaylineMethodEnum.FLIGHT_TASK_PAUSE.getMethod(), "", jobId); if (ResponseResult.CODE_SUCCESS != reply.getResult()) { throw new RuntimeException("未能恢复航路作业。错误码: " + reply.getResult()); } waylineRedisService.delRunningWaylineJob(dockSn); waylineRedisService.setPausedWaylineJob(dockSn, jobId); } private void resumeJob(String workspaceId, String dockSn, String jobId, WaylineJobStatusEnum statusEnum) { Optional> runningDataOpt = waylineRedisService.getRunningWaylineJob(dockSn); if (WaylineJobStatusEnum.IN_PROGRESS == statusEnum && jobId.equals(runningDataOpt.map(EventsReceiver::getSn).get())) { waylineRedisService.setRunningWaylineJob(dockSn, runningDataOpt.get()); return; } ServiceReply reply = messageSender.publishServicesTopic( dockSn, WaylineMethodEnum.FLIGHT_TASK_RESUME.getMethod(), "", jobId); if (ResponseResult.CODE_SUCCESS != reply.getResult()) { throw new RuntimeException("未能恢复航路作业。错误码:: " + reply.getResult()); } runningDataOpt.ifPresent(runningData -> waylineRedisService.setRunningWaylineJob(dockSn, runningData)); waylineRedisService.delPausedWaylineJob(dockSn); if (deviceService.checkDockDrcMode(dockSn)) { drcService.deviceDrcExit(workspaceId, DrcModeParam.builder().dockSn(dockSn) .clientId(drcService.getDrcModeInRedis(dockSn)).build()); } } private WaylineJobDTO entity2Dto(WaylineJobEntity entity) { if (entity == null) { return null; } WaylineJobDTO.WaylineJobDTOBuilder builder = WaylineJobDTO.builder() .jobId(entity.getJobId()) .jobName(entity.getName()) .fileId(entity.getFileId()) .fileName(waylineFileService.getWaylineByWaylineId(entity.getWorkspaceId(), entity.getFileId()) .orElse(WaylineFileDTO.builder().build()).getName()) .dockSn(entity.getDockSn()) .dockName(deviceService.getDeviceBySn(entity.getDockSn()) .orElse(DeviceDTO.builder().build()).getNickname()) .username(entity.getUsername()) .workspaceId(entity.getWorkspaceId()) .status(WaylineJobStatusEnum.IN_PROGRESS.getVal() == entity.getStatus() && entity.getJobId().equals(waylineRedisService.getPausedWaylineJobId(entity.getDockSn())) ? WaylineJobStatusEnum.PAUSED.getVal() : entity.getStatus()) .code(entity.getErrorCode()) .beginTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getBeginTime()), ZoneId.systemDefault())) .endTime(Objects.nonNull(entity.getEndTime()) ? LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getEndTime()), ZoneId.systemDefault()) : null) .executeTime(Objects.nonNull(entity.getExecuteTime()) ? LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getExecuteTime()), ZoneId.systemDefault()) : null) .completedTime(WaylineJobStatusEnum.find(entity.getStatus()).getEnd() ? LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getUpdateTime()), ZoneId.systemDefault()) : null) .taskType(WaylineTaskTypeEnum.find(entity.getTaskType())) .waylineType(WaylineTemplateTypeEnum.find(entity.getWaylineType())) .rthAltitude(entity.getRthAltitude()) .outOfControlAction(entity.getOutOfControlAction()) .mediaCount(entity.getMediaCount()) .hasChildren(entity.getHasChildren()); if (Objects.nonNull(entity.getEndTime())) { builder.endTime(LocalDateTime.ofInstant(Instant.ofEpochMilli(entity.getEndTime()), ZoneId.systemDefault())); } if (WaylineJobStatusEnum.IN_PROGRESS.getVal() == entity.getStatus()) { builder.progress(waylineRedisService.getRunningWaylineJob(entity.getDockSn()) .map(EventsReceiver::getOutput) .map(WaylineTaskProgressReceiver::getProgress) .map(WaylineTaskProgress::getPercent) .orElse(null)); } if (entity.getMediaCount() == 0) { return builder.build(); } // sync the number of media files String key = RedisConst.MEDIA_HIGHEST_PRIORITY_PREFIX + entity.getDockSn(); String countKey = RedisConst.MEDIA_FILE_PREFIX + entity.getDockSn(); Object mediaFileCount = RedisOpsUtils.hashGet(countKey, entity.getJobId()); if (Objects.nonNull(mediaFileCount)) { builder.uploadedCount(((MediaFileCountDTO) mediaFileCount).getUploadedCount()) .uploading(RedisOpsUtils.checkExist(key) && entity.getJobId().equals(((MediaFileCountDTO) RedisOpsUtils.get(key)).getJobId())); return builder.build(); } int uploadedSize = fileService.getFilesByWorkspaceAndJobId(entity.getWorkspaceId(), entity.getJobId()).size(); // All media for this job have been uploaded. if (uploadedSize >= entity.getMediaCount()) { return builder.uploadedCount(uploadedSize).build(); } RedisOpsUtils.hashSet(countKey, entity.getJobId(), MediaFileCountDTO.builder() .jobId(entity.getJobId()) .mediaCount(entity.getMediaCount()) .uploadedCount(uploadedSize).build()); return builder.build(); } }