package com.dji.sample.component.websocket.model; import com.dji.sample.component.websocket.config.ConcurrentWebSocketSession; import com.dji.sample.manage.model.enums.UserTypeEnum; import lombok.extern.slf4j.Slf4j; import java.util.Collection; import java.util.HashSet; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * Manage all WebSocket connection objects. * @author sean.zhou * @date 2021/11/16 * @version 0.1 */ @Slf4j public class WebSocketManager { private static final ConcurrentHashMap>> MANAGER = new ConcurrentHashMap<>(16); /** * WebSocket connection from the pilot. */ private static final Set PILOT_SESSION = ConcurrentHashMap.newKeySet(16); /** * WebSocket connection from the web. */ private static final Set WEB_SESSION = ConcurrentHashMap.newKeySet(16); public static void put(String key, ConcurrentWebSocketSession val) { String[] name = key.split("/"); if (name.length != 3) { log.debug("The key is out of format. [{workspaceId}/{userType}/{userId}]"); return; } ConcurrentHashMap> workspaceSessions = MANAGER.getOrDefault(name[0], new ConcurrentHashMap<>(16)); ConcurrentHashMap userSessions = workspaceSessions.getOrDefault( name[2], new ConcurrentHashMap<>(16)); userSessions.put(val.getId(), val); workspaceSessions.put(name[2], userSessions); MANAGER.put(name[0], workspaceSessions); getSetByUserType(Integer.valueOf(name[1])).add(val); } public static void remove(String key, String sessionId) { String[] name = key.split("/"); if (name.length != 3) { log.debug("The key is out of format. [{workspaceId}/{userType}/{userId}]"); return; } ConcurrentHashMap userSession = MANAGER.get(name[0]).get(name[2]); Set typeSession = getSetByUserType(Integer.valueOf(name[1])); ConcurrentWebSocketSession session = userSession.get(sessionId); typeSession.remove(session); userSession.remove(sessionId); } public static int getConnectedCount() { return PILOT_SESSION.size() + WEB_SESSION.size(); } public static Collection getValueWithWorkspace(String workspaceId) { Set sessions = ConcurrentHashMap.newKeySet(); MANAGER.get(workspaceId) .forEach((userId, userSessions) -> { sessions.addAll(userSessions.values()); }); return sessions; } public static Collection getValueWithWorkspaceAndUserType(String workspaceId, Integer userType) { Set sessions = ConcurrentHashMap.newKeySet(); Set typeSessions = getSetByUserType(userType); MANAGER.getOrDefault(workspaceId, new ConcurrentHashMap<>()) .forEach((userId, userSessions) -> { Collection sessionList = userSessions.values(); if (!sessionList.isEmpty()) { ConcurrentWebSocketSession session = sessionList.iterator().next(); if (typeSessions.contains(session)) { sessions.addAll(sessionList); } } }); return sessions; } private static Set getSetByUserType(Integer userType) { if (UserTypeEnum.PILOT.getVal() == userType) { return PILOT_SESSION; } if (UserTypeEnum.WEB.getVal() == userType) { return WEB_SESSION; } return new HashSet<>(); } }