From 2e98b20bea4463e4465e3c19059d0744a09aec06 Mon Sep 17 00:00:00 2001
From: zhongrj <646384940@qq.com>
Date: Tue, 27 Jun 2023 14:34:41 +0800
Subject: [PATCH] gb28181版本升级-补充
---
src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java | 512 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 512 insertions(+), 0 deletions(-)
diff --git a/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
new file mode 100644
index 0000000..00980f9
--- /dev/null
+++ b/src/main/java/com/genersoft/iot/vmp/service/impl/StreamPushServiceImpl.java
@@ -0,0 +1,512 @@
+package com.genersoft.iot.vmp.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.TypeReference;
+import com.genersoft.iot.vmp.conf.MediaConfig;
+import com.genersoft.iot.vmp.conf.UserSetting;
+import com.genersoft.iot.vmp.gb28181.bean.*;
+import com.genersoft.iot.vmp.gb28181.event.EventPublisher;
+import com.genersoft.iot.vmp.gb28181.event.subscribe.catalog.CatalogEvent;
+import com.genersoft.iot.vmp.media.zlm.ZLMRESTfulUtils;
+import com.genersoft.iot.vmp.media.zlm.dto.*;
+import com.genersoft.iot.vmp.service.IGbStreamService;
+import com.genersoft.iot.vmp.service.IMediaServerService;
+import com.genersoft.iot.vmp.service.IStreamPushService;
+import com.genersoft.iot.vmp.service.bean.StreamPushItemFromRedis;
+import com.genersoft.iot.vmp.storager.IRedisCatchStorage;
+import com.genersoft.iot.vmp.storager.dao.*;
+import com.genersoft.iot.vmp.utils.DateUtil;
+import com.github.pagehelper.PageHelper;
+import com.github.pagehelper.PageInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.datasource.DataSourceTransactionManager;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.TransactionDefinition;
+import org.springframework.transaction.TransactionStatus;
+import org.springframework.util.ObjectUtils;
+import org.springframework.util.StringUtils;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+@Service
+public class StreamPushServiceImpl implements IStreamPushService {
+
+ private final static Logger logger = LoggerFactory.getLogger(StreamPushServiceImpl.class);
+
+ @Autowired
+ private GbStreamMapper gbStreamMapper;
+
+ @Autowired
+ private StreamPushMapper streamPushMapper;
+
+ @Autowired
+ private StreamProxyMapper streamProxyMapper;
+
+ @Autowired
+ private ParentPlatformMapper parentPlatformMapper;
+
+ @Autowired
+ private PlatformCatalogMapper platformCatalogMapper;
+
+ @Autowired
+ private PlatformGbStreamMapper platformGbStreamMapper;
+
+ @Autowired
+ private IGbStreamService gbStreamService;
+
+ @Autowired
+ private EventPublisher eventPublisher;
+
+ @Autowired
+ private ZLMRESTfulUtils zlmresTfulUtils;
+
+ @Autowired
+ private IRedisCatchStorage redisCatchStorage;
+
+ @Autowired
+ private UserSetting userSetting;
+
+ @Autowired
+ private IMediaServerService mediaServerService;
+
+ @Autowired
+ DataSourceTransactionManager dataSourceTransactionManager;
+
+ @Autowired
+ TransactionDefinition transactionDefinition;
+
+ @Autowired
+ private MediaConfig mediaConfig;
+
+
+ @Override
+ public List<StreamPushItem> handleJSON(String jsonData, MediaServerItem mediaServerItem) {
+ if (jsonData == null) {
+ return null;
+ }
+
+ Map<String, StreamPushItem> result = new HashMap<>();
+
+ List<MediaItem> mediaItems = JSON.parseObject(jsonData, new TypeReference<List<MediaItem>>() {});
+ for (MediaItem item : mediaItems) {
+
+ // 不保存国标推理以及拉流代理的流
+ if (item.getOriginType() == OriginType.RTSP_PUSH.ordinal()
+ || item.getOriginType() == OriginType.RTMP_PUSH.ordinal()
+ || item.getOriginType() == OriginType.RTC_PUSH.ordinal() ) {
+ String key = item.getApp() + "_" + item.getStream();
+ StreamPushItem streamPushItem = result.get(key);
+ if (streamPushItem == null) {
+ streamPushItem = transform(item);
+ result.put(key, streamPushItem);
+ }
+ }
+ }
+
+ return new ArrayList<>(result.values());
+ }
+ @Override
+ public StreamPushItem transform(MediaItem item) {
+ StreamPushItem streamPushItem = new StreamPushItem();
+ streamPushItem.setApp(item.getApp());
+ streamPushItem.setMediaServerId(item.getMediaServerId());
+ streamPushItem.setStream(item.getStream());
+ streamPushItem.setAliveSecond(item.getAliveSecond());
+ streamPushItem.setOriginSock(item.getOriginSock());
+ streamPushItem.setTotalReaderCount(item.getTotalReaderCount());
+ streamPushItem.setOriginType(item.getOriginType());
+ streamPushItem.setOriginTypeStr(item.getOriginTypeStr());
+ streamPushItem.setOriginUrl(item.getOriginUrl());
+ streamPushItem.setCreateTime(DateUtil.getNow());
+ streamPushItem.setAliveSecond(item.getAliveSecond());
+ streamPushItem.setStatus(true);
+ streamPushItem.setStreamType("push");
+ streamPushItem.setVhost(item.getVhost());
+ streamPushItem.setServerId(item.getSeverId());
+ return streamPushItem;
+ }
+
+ @Override
+ public PageInfo<StreamPushItem> getPushList(Integer page, Integer count, String query, Boolean pushing, String mediaServerId) {
+ PageHelper.startPage(page, count);
+ List<StreamPushItem> all = streamPushMapper.selectAllForList(query, pushing, mediaServerId);
+ return new PageInfo<>(all);
+ }
+
+ @Override
+ public List<StreamPushItem> getPushList(String mediaServerId) {
+ return streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
+ }
+
+ @Override
+ public boolean saveToGB(GbStream stream) {
+ stream.setStreamType("push");
+ stream.setStatus(true);
+ stream.setCreateTime(DateUtil.getNow());
+ stream.setStreamType("push");
+ stream.setMediaServerId(mediaConfig.getId());
+ int add = gbStreamMapper.add(stream);
+ return add > 0;
+ }
+
+ @Override
+ public boolean removeFromGB(GbStream stream) {
+ // 判断是否需要发送事件
+ gbStreamService.sendCatalogMsg(stream, CatalogEvent.DEL);
+ platformGbStreamMapper.delByAppAndStream(stream.getApp(), stream.getStream());
+ int del = gbStreamMapper.del(stream.getApp(), stream.getStream());
+ MediaServerItem mediaInfo = mediaServerService.getOne(stream.getMediaServerId());
+ JSONObject mediaList = zlmresTfulUtils.getMediaList(mediaInfo, stream.getApp(), stream.getStream());
+ if (mediaList != null) {
+ if (mediaList.getInteger("code") == 0) {
+ JSONArray data = mediaList.getJSONArray("data");
+ if (data == null) {
+ streamPushMapper.del(stream.getApp(), stream.getStream());
+ }
+ }
+ }
+ return del > 0;
+ }
+
+
+ @Override
+ public StreamPushItem getPush(String app, String streamId) {
+ return streamPushMapper.selectOne(app, streamId);
+ }
+
+ @Override
+ public boolean stop(String app, String streamId) {
+ StreamPushItem streamPushItem = streamPushMapper.selectOne(app, streamId);
+ gbStreamService.sendCatalogMsg(streamPushItem, CatalogEvent.DEL);
+
+ platformGbStreamMapper.delByAppAndStream(app, streamId);
+ gbStreamMapper.del(app, streamId);
+ int delStream = streamPushMapper.del(app, streamId);
+ if (delStream > 0) {
+ MediaServerItem mediaServerItem = mediaServerService.getOne(streamPushItem.getMediaServerId());
+ zlmresTfulUtils.closeStreams(mediaServerItem,app, streamId);
+ }
+ return true;
+ }
+
+ @Override
+ public void zlmServerOnline(String mediaServerId) {
+ // 同步zlm推流信息
+ MediaServerItem mediaServerItem = mediaServerService.getOne(mediaServerId);
+ if (mediaServerItem == null) {
+ return;
+ }
+ // 数据库记录
+ List<StreamPushItem> pushList = getPushList(mediaServerId);
+ Map<String, StreamPushItem> pushItemMap = new HashMap<>();
+ // redis记录
+ List<MediaItem> mediaItems = redisCatchStorage.getStreams(mediaServerId, "PUSH");
+ Map<String, MediaItem> streamInfoPushItemMap = new HashMap<>();
+ if (pushList.size() > 0) {
+ for (StreamPushItem streamPushItem : pushList) {
+ if (ObjectUtils.isEmpty(streamPushItem.getGbId())) {
+ pushItemMap.put(streamPushItem.getApp() + streamPushItem.getStream(), streamPushItem);
+ }
+ }
+ }
+ if (mediaItems.size() > 0) {
+ for (MediaItem mediaItem : mediaItems) {
+ streamInfoPushItemMap.put(mediaItem.getApp() + mediaItem.getStream(), mediaItem);
+ }
+ }
+ zlmresTfulUtils.getMediaList(mediaServerItem, (mediaList ->{
+ if (mediaList == null) {
+ return;
+ }
+ String dataStr = mediaList.getString("data");
+
+ Integer code = mediaList.getInteger("code");
+ List<StreamPushItem> streamPushItems = null;
+ if (code == 0 ) {
+ if (dataStr != null) {
+ streamPushItems = handleJSON(dataStr, mediaServerItem);
+ }
+ }
+
+ if (streamPushItems != null) {
+ for (StreamPushItem streamPushItem : streamPushItems) {
+ pushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
+ streamInfoPushItemMap.remove(streamPushItem.getApp() + streamPushItem.getStream());
+ }
+ }
+ List<StreamPushItem> offlinePushItems = new ArrayList<>(pushItemMap.values());
+ if (offlinePushItems.size() > 0) {
+ String type = "PUSH";
+ int runLimit = 300;
+ if (offlinePushItems.size() > runLimit) {
+ for (int i = 0; i < offlinePushItems.size(); i += runLimit) {
+ int toIndex = i + runLimit;
+ if (i + runLimit > offlinePushItems.size()) {
+ toIndex = offlinePushItems.size();
+ }
+ List<StreamPushItem> streamPushItemsSub = offlinePushItems.subList(i, toIndex);
+ streamPushMapper.delAll(streamPushItemsSub);
+ }
+ }else {
+ streamPushMapper.delAll(offlinePushItems);
+ }
+
+ }
+ Collection<MediaItem> offlineMediaItemList = streamInfoPushItemMap.values();
+ if (offlineMediaItemList.size() > 0) {
+ String type = "PUSH";
+ for (MediaItem offlineMediaItem : offlineMediaItemList) {
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("serverId", userSetting.getServerId());
+ jsonObject.put("app", offlineMediaItem.getApp());
+ jsonObject.put("stream", offlineMediaItem.getStream());
+ jsonObject.put("register", false);
+ jsonObject.put("mediaServerId", mediaServerId);
+ redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+ // 移除redis内流的信息
+ redisCatchStorage.removeStream(mediaServerItem.getId(), "PUSH", offlineMediaItem.getApp(), offlineMediaItem.getStream());
+ }
+ }
+ }));
+ }
+
+ @Override
+ public void zlmServerOffline(String mediaServerId) {
+ List<StreamPushItem> streamPushItems = streamPushMapper.selectAllByMediaServerIdWithOutGbID(mediaServerId);
+ // 移除没有GBId的推流
+ streamPushMapper.deleteWithoutGBId(mediaServerId);
+ gbStreamMapper.deleteWithoutGBId("push", mediaServerId);
+ // 其他的流设置未启用
+ streamPushMapper.updateStatusByMediaServerId(mediaServerId, false);
+ streamProxyMapper.updateStatusByMediaServerId(mediaServerId, false);
+ // 发送流停止消息
+ String type = "PUSH";
+ // 发送redis消息
+ List<MediaItem> streamInfoList = redisCatchStorage.getStreams(mediaServerId, type);
+ if (streamInfoList.size() > 0) {
+ for (MediaItem mediaItem : streamInfoList) {
+ // 移除redis内流的信息
+ redisCatchStorage.removeStream(mediaServerId, type, mediaItem.getApp(), mediaItem.getStream());
+ JSONObject jsonObject = new JSONObject();
+ jsonObject.put("serverId", userSetting.getServerId());
+ jsonObject.put("app", mediaItem.getApp());
+ jsonObject.put("stream", mediaItem.getStream());
+ jsonObject.put("register", false);
+ jsonObject.put("mediaServerId", mediaServerId);
+ redisCatchStorage.sendStreamChangeMsg(type, jsonObject);
+ }
+ }
+ }
+
+ @Override
+ public void clean() {
+
+ }
+
+ @Override
+ public boolean saveToRandomGB() {
+ List<StreamPushItem> streamPushItems = streamPushMapper.selectAll();
+ long gbId = 100001;
+ for (StreamPushItem streamPushItem : streamPushItems) {
+ streamPushItem.setStreamType("push");
+ streamPushItem.setStatus(true);
+ streamPushItem.setGbId("34020000004111" + gbId);
+ streamPushItem.setCreateTime(DateUtil.getNow());
+ gbId ++;
+ }
+ int limitCount = 30;
+
+ if (streamPushItems.size() > limitCount) {
+ for (int i = 0; i < streamPushItems.size(); i += limitCount) {
+ int toIndex = i + limitCount;
+ if (i + limitCount > streamPushItems.size()) {
+ toIndex = streamPushItems.size();
+ }
+ gbStreamMapper.batchAdd(streamPushItems.subList(i, toIndex));
+ }
+ }else {
+ gbStreamMapper.batchAdd(streamPushItems);
+ }
+ return true;
+ }
+
+ @Override
+ public void batchAdd(List<StreamPushItem> streamPushItems) {
+ streamPushMapper.addAll(streamPushItems);
+ gbStreamMapper.batchAdd(streamPushItems);
+ }
+
+
+ @Override
+ public void batchAddForUpload(List<StreamPushItem> streamPushItems, Map<String, List<String[]>> streamPushItemsForAll ) {
+ // 存储数据到stream_push表
+ streamPushMapper.addAll(streamPushItems);
+ List<StreamPushItem> streamPushItemForGbStream = streamPushItems.stream()
+ .filter(streamPushItem-> streamPushItem.getId() != null)
+ .collect(Collectors.toList());
+ // 存储数据到gb_stream表, id会返回到streamPushItemForGbStream里
+ if (streamPushItemForGbStream.size() > 0) {
+ gbStreamMapper.batchAdd(streamPushItemForGbStream);
+ }
+ // 去除没有ID也就是没有存储到数据库的数据
+ List<StreamPushItem> streamPushItemsForPlatform = streamPushItemForGbStream.stream()
+ .filter(streamPushItem-> streamPushItem.getGbStreamId() != null)
+ .collect(Collectors.toList());
+
+ if (streamPushItemsForPlatform.size() > 0) {
+ // 获取所有平台,平台和目录信息一般不会特别大量。
+ List<ParentPlatform> parentPlatformList = parentPlatformMapper.getParentPlatformList();
+ Map<String, Map<String, PlatformCatalog>> platformInfoMap = new HashMap<>();
+ if (parentPlatformList.size() == 0) {
+ return;
+ }
+ for (ParentPlatform platform : parentPlatformList) {
+ Map<String, PlatformCatalog> catalogMap = new HashMap<>();
+
+ // 创建根节点
+ PlatformCatalog platformCatalog = new PlatformCatalog();
+ platformCatalog.setId(platform.getServerGBId());
+ catalogMap.put(platform.getServerGBId(), platformCatalog);
+
+ // 查询所有节点信息
+ List<PlatformCatalog> platformCatalogs = platformCatalogMapper.selectByPlatForm(platform.getServerGBId());
+ if (platformCatalogs.size() > 0) {
+ for (PlatformCatalog catalog : platformCatalogs) {
+ catalogMap.put(catalog.getId(), catalog);
+ }
+ }
+ platformInfoMap.put(platform.getServerGBId(), catalogMap);
+ }
+ List<StreamPushItem> streamPushItemListFroPlatform = new ArrayList<>();
+ Map<String, List<GbStream>> platformForEvent = new HashMap<>();
+ // 遍历存储结果,查找app+Stream->platformId+catalogId的对应关系,然后执行批量写入
+ for (StreamPushItem streamPushItem : streamPushItemsForPlatform) {
+ List<String[]> platFormInfoList = streamPushItemsForAll.get(streamPushItem.getApp() + streamPushItem.getStream());
+ if (platFormInfoList != null && platFormInfoList.size() > 0) {
+ for (String[] platFormInfoArray : platFormInfoList) {
+ StreamPushItem streamPushItemForPlatform = new StreamPushItem();
+ streamPushItemForPlatform.setGbStreamId(streamPushItem.getGbStreamId());
+ if (platFormInfoArray.length > 0) {
+ // 数组 platFormInfoArray 0 为平台ID。 1为目录ID
+ // 不存在这个平台,则忽略导入此关联关系
+ if (platformInfoMap.get(platFormInfoArray[0]) == null
+ || platformInfoMap.get(platFormInfoArray[0]).get(platFormInfoArray[1]) == null) {
+ logger.info("导入数据时不存在平台或目录{}/{},已导入未分配", platFormInfoArray[0], platFormInfoArray[1] );
+ continue;
+ }
+ streamPushItemForPlatform.setPlatformId(platFormInfoArray[0]);
+ List<GbStream> gbStreamList = platformForEvent.get(platFormInfoArray[0]);
+ if (gbStreamList == null) {
+ gbStreamList = new ArrayList<>();
+ platformForEvent.put(platFormInfoArray[0], gbStreamList);
+ }
+ // 为发送通知整理数据
+ streamPushItemForPlatform.setName(streamPushItem.getName());
+ streamPushItemForPlatform.setApp(streamPushItem.getApp());
+ streamPushItemForPlatform.setStream(streamPushItem.getStream());
+ streamPushItemForPlatform.setGbId(streamPushItem.getGbId());
+ gbStreamList.add(streamPushItemForPlatform);
+ }
+ if (platFormInfoArray.length > 1) {
+ streamPushItemForPlatform.setCatalogId(platFormInfoArray[1]);
+ }
+ streamPushItemListFroPlatform.add(streamPushItemForPlatform);
+ }
+
+ }
+ }
+ if (streamPushItemListFroPlatform.size() > 0) {
+ platformGbStreamMapper.batchAdd(streamPushItemListFroPlatform);
+ // 发送通知
+ for (String platformId : platformForEvent.keySet()) {
+ eventPublisher.catalogEventPublishForStream(
+ platformId, platformForEvent.get(platformId), CatalogEvent.ADD);
+ }
+ }
+ }
+ }
+
+ @Override
+ public boolean batchStop(List<GbStream> gbStreams) {
+ if (gbStreams == null || gbStreams.size() == 0) {
+ return false;
+ }
+ gbStreamService.sendCatalogMsgs(gbStreams, CatalogEvent.DEL);
+
+ platformGbStreamMapper.delByGbStreams(gbStreams);
+ gbStreamMapper.batchDelForGbStream(gbStreams);
+ int delStream = streamPushMapper.delAllForGbStream(gbStreams);
+ if (delStream > 0) {
+ for (GbStream gbStream : gbStreams) {
+ MediaServerItem mediaServerItem = mediaServerService.getOne(gbStream.getMediaServerId());
+ zlmresTfulUtils.closeStreams(mediaServerItem, gbStream.getApp(), gbStream.getStream());
+ }
+
+ }
+ return true;
+ }
+
+ @Override
+ public void allStreamOffline() {
+ List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGb();
+ if (onlinePushers.size() == 0) {
+ return;
+ }
+ streamPushMapper.setAllStreamOffline();
+
+ // 发送通知
+ eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
+ }
+
+ @Override
+ public void offline(List<StreamPushItemFromRedis> offlineStreams) {
+ // 更新部分设备离线
+ List<GbStream> onlinePushers = streamPushMapper.getOnlinePusherForGbInList(offlineStreams);
+ streamPushMapper.offline(offlineStreams);
+ // 发送通知
+ eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.OFF);
+ }
+
+ @Override
+ public void online(List<StreamPushItemFromRedis> onlineStreams) {
+ // 更新部分设备上线streamPushService
+ List<GbStream> onlinePushers = streamPushMapper.getOfflinePusherForGbInList(onlineStreams);
+ streamPushMapper.online(onlineStreams);
+ // 发送通知
+ eventPublisher.catalogEventPublishForStream(null, onlinePushers, CatalogEvent.ON);
+ }
+
+ @Override
+ public boolean add(StreamPushItem stream) {
+ stream.setUpdateTime(DateUtil.getNow());
+ stream.setCreateTime(DateUtil.getNow());
+ stream.setServerId(userSetting.getServerId());
+
+ // 放在事务内执行
+ boolean result = false;
+ TransactionStatus transactionStatus = dataSourceTransactionManager.getTransaction(transactionDefinition);
+ try {
+ int addStreamResult = streamPushMapper.add(stream);
+ if (!ObjectUtils.isEmpty(stream.getGbId())) {
+ stream.setStreamType("push");
+ gbStreamMapper.add(stream);
+ }
+ dataSourceTransactionManager.commit(transactionStatus);
+ result = true;
+ }catch (Exception e) {
+ logger.error("批量移除流与平台的关系时错误", e);
+ dataSourceTransactionManager.rollback(transactionStatus);
+ }
+ return result;
+ }
+
+ @Override
+ public List<String> getAllAppAndStream() {
+ return streamPushMapper.getAllAppAndStream();
+ }
+}
--
Gitblit v1.9.3