package com.dji.sample.manage.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.dji.sample.common.error.LiveErrorEnum; import com.dji.sample.common.model.ResponseResult; import com.dji.sample.component.mqtt.model.CommonTopicResponse; import com.dji.sample.component.mqtt.model.ServiceReply; import com.dji.sample.component.mqtt.service.IMessageSenderService; import com.dji.sample.component.redis.RedisConst; import com.dji.sample.component.redis.RedisOpsUtils; import com.dji.sample.manage.dao.IDeviceSetMapper; import com.dji.sample.manage.model.dto.*; import com.dji.sample.manage.model.entity.DeviceSetEntity; import com.dji.sample.manage.model.enums.DeviceDomainEnum; import com.dji.sample.manage.model.enums.LiveStreamMethodEnum; import com.dji.sample.manage.model.enums.LiveUrlTypeEnum; import com.dji.sample.manage.model.enums.LiveVideoQualityEnum; import com.dji.sample.manage.model.param.DeviceQueryParam; import com.dji.sample.manage.model.receiver.CapacityDeviceReceiver; import com.dji.sample.manage.model.receiver.LiveCapacityReceiver; import com.dji.sample.manage.service.*; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import org.springframework.util.StringUtils; import java.lang.reflect.Field; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import static com.dji.sample.component.mqtt.model.TopicConst.*; /** * @author sean.zhou * @date 2021/11/22 * @version 0.1 */ @Service @Transactional public class LiveStreamServiceImpl implements ILiveStreamService { @Autowired private ICapacityCameraService capacityCameraService; @Autowired private IDeviceService deviceService; @Autowired private IDeviceSetMapper deviceSetMapper; @Autowired private IWorkspaceService workspaceService; @Autowired private IMessageSenderService messageSender; @Autowired private IDeviceRedisService deviceRedisService; @Override public List getLiveCapacity(String workspaceId,String sn) { // Query all devices in this workspace. //查询该工作区中的所有设备。 List devicesList = deviceService.getDevicesByParams( DeviceQueryParam.builder() .workspaceId(workspaceId) .deviceSn(sn) .domains(List.of(DeviceDomainEnum.SUB_DEVICE.getVal(), DeviceDomainEnum.DOCK.getVal())) .build()); // Query the live capability of each drone. return devicesList.stream() //过滤出在线设备 .filter(device -> deviceRedisService.checkDeviceOnline(device.getDeviceSn())) .map(device -> CapacityDeviceDTO.builder() .name(Objects.requireNonNullElse(device.getNickname(), device.getDeviceName())) .sn(device.getDeviceSn()) .camerasList(capacityCameraService.getCapacityCameraByDeviceSn(device.getDeviceSn())) .build()) .collect(Collectors.toList()); } @Override public void saveLiveCapacity(LiveCapacityReceiver liveCapacityReceiver, Long timestamp) { // Solve timing problems for (CapacityDeviceReceiver capacityDeviceReceiver : liveCapacityReceiver.getDeviceList()) { long last = (long) Objects.requireNonNullElse( RedisOpsUtils.get(RedisConst.LIVE_CAPACITY + capacityDeviceReceiver.getSn()), 0L); if (last > timestamp) { return; } capacityCameraService.saveCapacityCameraReceiverList( capacityDeviceReceiver.getCameraList(), capacityDeviceReceiver.getSn(), timestamp); } } @Override public ResponseResult liveStart(LiveTypeDTO liveParam) { // String streamId_2 = liveParam.getVideoId().replace("/","_"); // String streamId_1 = liveParam.getVideoId().replace("_","/"); // liveParam.setVideoId(streamId_2); // Check if this lens is available live. //检查镜头是否可用 ResponseResult responseResult = this.checkBeforeLive(liveParam.getVideoId()); if (ResponseResult.CODE_SUCCESS != responseResult.getCode()) { return responseResult; } DeviceDTO data = (DeviceDTO)responseResult.getData(); // target topic //thing/product/{gateway_sn}/services 云平台向设备发送的服务 String respTopic = THING_MODEL_PRE + PRODUCT + data.getDeviceSn() + SERVICES_SUF; //获取返回结果 ServiceReply receiveReply = this.publishLiveStart(respTopic, liveParam); //相机已经在直播中,请勿重复开启直播 if(receiveReply.getResult() == 513003) { LiveDTO live = new LiveDTO(); // live.setUrl(liveParam.getUrl().replace("rtmp", "https").replace("735","700") + ".flv"); LiveUrlGB28181DTO gb28181 = urlToGB28181(liveParam.getUrl()); live.setUrl(new StringBuilder() .append("https://wrj.shuixiongit.com/zb/rtp/") .append(gb28181.getAgentID()) .append("_") .append(gb28181.getChannel()) .append(".live.flv") .toString()); return ResponseResult.success(live); } if (ResponseResult.CODE_SUCCESS != receiveReply.getResult()) { return ResponseResult.error(LiveErrorEnum.find(receiveReply.getResult())); } LiveUrlTypeEnum urlType = LiveUrlTypeEnum.find(liveParam.getUrlType()); LiveDTO live = new LiveDTO(); //对不同的协议类型做处理 switch (urlType) { case RTMP: // live.setUrl(liveParam.getUrl().replace("rtmp", "webrtc")); live.setUrl(liveParam.getUrl().replace("rtmp", "https").replace("735","700") + ".flv"); break; // case GB28181: // LiveUrlGB28181DTO gb28181 = urlToGB28181(liveParam.getUrl()); // live.setUrl(new StringBuilder() // .append("webrtc://") // .append(gb28181.getServerIP()) // .append("/live/") // .append(gb28181.getAgentID()) // .append("@") // .append(gb28181.getChannel()) // .toString()); // break; case GB28181: LiveUrlGB28181DTO gb28181 = urlToGB28181(liveParam.getUrl()); live.setUrl(new StringBuilder() .append("https://wrj.shuixiongit.com/zb/rtp/") .append(gb28181.getAgentID()) .append("_") .append(gb28181.getChannel()) .append(".live.flv") .toString()); break; case RTSP: String url = receiveReply.getOutput().toString(); this.resolveUrlUser(url, live); break; case UNKNOWN: return ResponseResult.error(LiveErrorEnum.URL_TYPE_NOT_SUPPORTED); } return ResponseResult.success(live); } @Override public ResponseResult liveAddress(String deviceSn,String deviceId) { DeviceSetEntity deviceSet= deviceSetMapper.selectOne(new LambdaQueryWrapper() .eq(DeviceSetEntity::getDeviceSn,deviceSn) .eq(DeviceSetEntity::getDeviceId,deviceId) ); String workspaceId=getIdBySn(deviceSn); List dto=getLiveCapacity(workspaceId,deviceSn); String vedioId=deviceSn+"/165-0-7/normal-0"; String url="https://"+deviceSet.getServerIp()+"/zb/rtp/"+deviceSet.getAgentId()+"_"+deviceSet.getChannel()+".live.flv"; LiveTypeDTO liveParam=new LiveTypeDTO(); liveParam.setUrl(url); liveParam.setUrlType(3); liveParam.setVideoId(vedioId); liveParam.setVideoQuality(0); ResponseResult responseResult = this.checkBeforeLive(liveParam.getVideoId()); if (ResponseResult.CODE_SUCCESS != responseResult.getCode()) { return responseResult; } DeviceDTO data = (DeviceDTO)responseResult.getData(); String respTopic = THING_MODEL_PRE + PRODUCT + data.getDeviceSn() + SERVICES_SUF; //获取返回结果 ServiceReply receiveReply = this.publishLiveStart(respTopic, liveParam); System.out.println(receiveReply.getResult()); LiveDTO live = new LiveDTO(); live.setUrl(url); //相机已经在直播中,请勿重复开启直播 if(receiveReply.getResult() == 513003) { return ResponseResult.success(live); } return ResponseResult.success(live); } public static String getVedioId(String data,String sn) { return findDeviceBySn(data, sn); } public static String findDeviceBySn(String dtoListString, String snToFind) { String regex = "CapacityDeviceDTO\\(sn=" + snToFind + ", .*? index=([\\w\\-]+).*? index=([\\w\\-]+).*?\\)"; Pattern pattern = Pattern.compile(regex); Matcher matcher = pattern.matcher(dtoListString); if (matcher.find()) { String index1 = matcher.group(1); String index2 = matcher.group(2); return snToFind + "/" + index1 + "/" + index2; } return null; // 如果未找到匹配的sn,则返回null或者适当的默认值 } @Override public ResponseResult liveStop(String videoId) { ResponseResult responseResult = this.checkBeforeLive(videoId); if (responseResult.getCode() != 0) { return responseResult; } //thing/product/{gateway_sn}/services String respTopic = THING_MODEL_PRE + PRODUCT + responseResult.getData().getDeviceSn() + SERVICES_SUF; videoId = videoId.replace("_","/"); ServiceReply receiveReply = this.publishLiveStop(respTopic, videoId); if (receiveReply.getResult() != 0) { return ResponseResult.error(LiveErrorEnum.find(receiveReply.getResult())); } return ResponseResult.success(); } @Override public ResponseResult liveSetQuality(LiveTypeDTO liveParam) { if (liveParam.getVideoQuality() == null || LiveVideoQualityEnum.UNKNOWN == LiveVideoQualityEnum.find(liveParam.getVideoQuality())) { return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS); } ResponseResult responseResult = this.checkBeforeLive(liveParam.getVideoId()); if (responseResult.getCode() != 0) { return responseResult; } String respTopic = THING_MODEL_PRE + PRODUCT + responseResult.getData().getDeviceSn() + SERVICES_SUF; ServiceReply receiveReply = this.publishLiveSetQuality(respTopic, liveParam); if (ResponseResult.CODE_SUCCESS != receiveReply.getResult()) { return ResponseResult.error(LiveErrorEnum.find(receiveReply.getResult())); } return ResponseResult.success(); } @Override public ResponseResult liveLensChange(LiveTypeDTO liveParam) { if (!StringUtils.hasText(liveParam.getVideoType())) { return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS); } ResponseResult responseResult = this.checkBeforeLive(liveParam.getVideoId()); if (ResponseResult.CODE_SUCCESS != responseResult.getCode()) { return responseResult; } if (DeviceDomainEnum.GATEWAY.getVal() == responseResult.getData().getDomain()) { return ResponseResult.error(LiveErrorEnum.FUNCTION_NOT_SUPPORT); } String respTopic = THING_MODEL_PRE + PRODUCT + responseResult.getData().getDeviceSn() + SERVICES_SUF; ServiceReply receiveReply = this.publishLiveLensChange(respTopic, liveParam); if (ResponseResult.CODE_SUCCESS != receiveReply.getResult()) { return ResponseResult.error(LiveErrorEnum.find(receiveReply.getResult())); } return ResponseResult.success(); } private ServiceReply publishLiveLensChange(String respTopic, LiveTypeDTO liveParam) { CommonTopicResponse response = new CommonTopicResponse<>(); response.setTid(UUID.randomUUID().toString()); response.setBid(UUID.randomUUID().toString()); response.setMethod(LiveStreamMethodEnum.LIVE_LENS_CHANGE.getMethod()); response.setData(liveParam); return messageSender.publishWithReply(ServiceReply.class, respTopic, response); } /** * Check if this lens is available live. * 检查镜头是否可用 * @param videoId * @return */ private ResponseResult checkBeforeLive(String videoId) { if (!StringUtils.hasText(videoId)) { return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS); } String[] videoIdArr = videoId.split("/"); // drone sn / enumeration value of the location where the payload is mounted / payload lens if (videoIdArr.length != 3) { return ResponseResult.error(LiveErrorEnum.ERROR_PARAMETERS); } Optional deviceOpt = deviceService.getDeviceBySn(videoIdArr[0]); // Check if the gateway device connected to this drone exists if (deviceOpt.isEmpty()) { return ResponseResult.error(LiveErrorEnum.NO_AIRCRAFT); } if (DeviceDomainEnum.DOCK.getVal() == deviceOpt.get().getDomain()) { return ResponseResult.success(deviceOpt.get()); } List gatewayList = deviceService.getDevicesByParams( DeviceQueryParam.builder() .childSn(videoIdArr[0]) .build()); if (gatewayList.isEmpty()) { return ResponseResult.error(LiveErrorEnum.NO_FLIGHT_CONTROL); } return ResponseResult.success(gatewayList.get(0)); } /** * When using rtsp live, the account and password are parsed from the information returned by the pilot. * @param url * @param live */ private void resolveUrlUser(String url, LiveDTO live) { if (!StringUtils.hasText(url)) { return; } int start = url.indexOf("//"); int end = url.lastIndexOf("@"); String user = url.substring(start + 2, end); url = url.replace(user + "@", ""); String[] userArr = user.split(":"); live.setUsername(userArr[0]); live.setPassword(userArr[1]); live.setUrl(url); } /** * When using GB28181 live, url parameters are resolved into objects. * @param url * @return */ private LiveUrlGB28181DTO urlToGB28181(String url) { String[] arr = url.split("\\=|\\&"); LiveUrlGB28181DTO gb28181 = new LiveUrlGB28181DTO(); try { Class clazz = LiveUrlGB28181DTO.class; for (int i = 0; i < arr.length - 1; i += 2) { Field field = clazz.getDeclaredField(arr[i]); field.setAccessible(true); if (field.getType().equals(Integer.class)) { field.set(gb28181, Integer.valueOf(arr[i + 1])); continue; } field.set(gb28181, arr[i + 1]); } } catch (NoSuchFieldException | IllegalAccessException e) { e.printStackTrace(); } return gb28181; } /** * Send a message to the pilot via mqtt to start the live streaming. *通过mqtt向飞行员发送消息以启动直播。 * @param topic * @param liveParam * @return */ private ServiceReply publishLiveStart(String topic, LiveTypeDTO liveParam) { CommonTopicResponse response = new CommonTopicResponse<>(); response.setTid(UUID.randomUUID().toString()); response.setBid(UUID.randomUUID().toString()); response.setData(liveParam); response.setMethod(LiveStreamMethodEnum.LIVE_START_PUSH.getMethod()); return messageSender.publishWithReply(ServiceReply.class, topic, response); } private String getIdBySn(String dockSn){ DeviceSetEntity entity=deviceSetMapper.selectOne(new LambdaQueryWrapper().eq(DeviceSetEntity::getDeviceSn,dockSn)); return entity.getWorkspaceId(); } /** * Send a message to the pilot via mqtt to set quality. * @param respTopic * @param liveParam * @return */ private ServiceReply publishLiveSetQuality(String respTopic, LiveTypeDTO liveParam) { Map data = new ConcurrentHashMap<>(Map.of( "video_id", liveParam.getVideoId(), "video_quality", liveParam.getVideoQuality())); CommonTopicResponse> response = new CommonTopicResponse<>(); response.setTid(UUID.randomUUID().toString()); response.setBid(UUID.randomUUID().toString()); response.setMethod(LiveStreamMethodEnum.LIVE_SET_QUALITY.getMethod()); response.setData(data); return messageSender.publishWithReply(ServiceReply.class, respTopic, response); } /** * Send a message to the pilot via mqtt to stop the live streaming. * @param topic * @param videoId * @return */ private ServiceReply publishLiveStop(String topic, String videoId) { Map data = new ConcurrentHashMap<>(Map.of("video_id", videoId)); CommonTopicResponse> response = new CommonTopicResponse<>(); response.setTid(UUID.randomUUID().toString()); response.setBid(UUID.randomUUID().toString()); response.setData(data); response.setMethod(LiveStreamMethodEnum.LIVE_STOP_PUSH.getMethod()); return messageSender.publishWithReply(ServiceReply.class, topic, response); } }