| pom.xml | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/Application.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/modules/nettyServer/ChannelMap.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/modules/nettyServer/NettyConfig.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/modules/webscoket/WebSocketHandler.java | ●●●●● patch | view | raw | blame | history | |
| src/main/java/org/springblade/modules/webscoket/WebSocketServer.java | ●●●●● patch | view | raw | blame | history |
pom.xml
@@ -207,6 +207,10 @@ <version>2.5.1</version> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> </dependency> <dependency> <groupId>commons-net</groupId> <artifactId>commons-net</artifactId> <version>1.4.1</version> src/main/java/org/springblade/Application.java
@@ -18,6 +18,8 @@ import org.springblade.common.constant.CommonConstant; import org.springblade.core.launch.BladeApplication; import org.springblade.modules.webscoket.WebSocketServer; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @@ -28,11 +30,16 @@ */ @EnableScheduling @SpringBootApplication public class Application { public class Application implements CommandLineRunner { public static void main(String[] args) { BladeApplication.run(CommonConstant.APPLICATION_NAME, Application.class, args); } @Override public void run(String... args) throws Exception { WebSocketServer webSocketServer= new WebSocketServer(2086); } } src/main/java/org/springblade/modules/nettyServer/ChannelMap.java
New file @@ -0,0 +1,36 @@ package org.springblade.modules.nettyServer; import io.netty.channel.Channel; import java.util.concurrent.ConcurrentHashMap; public class ChannelMap { public static int channelNum=0; private static ConcurrentHashMap<String, Channel> channelHashMap=null;//concurrentHashmap以解决多线程冲突 public static ConcurrentHashMap<String, Channel> getChannelHashMap() { return channelHashMap; } public static Channel getChannelByName(String name){ if(channelHashMap==null||channelHashMap.isEmpty()){ return null; } return channelHashMap.get(name); } public static void addChannel(String name, Channel channel){ if(channelHashMap==null){ channelHashMap=new ConcurrentHashMap<String, Channel>(10); } channelHashMap.put(name,channel); channelNum++; } public static int removeChannelByName(String name){ if(channelHashMap.containsKey(name)){ channelHashMap.remove(name); return 0; }else{ return 1; } } } src/main/java/org/springblade/modules/nettyServer/NettyConfig.java
New file @@ -0,0 +1,45 @@ package org.springblade.modules.nettyServer; import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.concurrent.ConcurrentHashMap; public class NettyConfig { /** * 存储每一个客户端接入进来时的channel对象 */ public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 定义一个channel组,管理所有channel * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例 */ private static ChannelGroup channelGroup = new DefaultChannelGroup("用户管理组",GlobalEventExecutor.INSTANCE); /** * 存放用户与chanel 的对应的信息,用于给指定用户发送信息 */ private static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>(); public NettyConfig() { } /** * 获取用户channel 组 * @return */ public static ChannelGroup getChannelGroup() { return channelGroup; } /** * 获取用户channel map * @return */ public static ConcurrentHashMap<String, Channel> getUserChannelMap() { return userChannelMap; } } src/main/java/org/springblade/modules/webscoket/WebSocketHandler.java
@@ -15,7 +15,6 @@ import io.netty.util.AttributeKey; import io.netty.util.CharsetUtil; import org.springblade.modules.nettyServer.NettyConfig; import org.springblade.modules.suser.service.ISuserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @@ -26,10 +25,7 @@ private WebSocketServerHandshaker handshaker; private String on=null; @Autowired private ISuserService suserService; private String on = null; private static WebSocketHandler webSocketHandler; @@ -63,7 +59,7 @@ //用户离线状态 String name = ChannelSupervise.findName(ctx.channel().id().asShortText()); System.out.println(name); if ( name != null &&!name.equals("ping") ){ if (name != null && !name.equals("ping")) { NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); } @@ -95,15 +91,15 @@ // 返回应答消息 String request = ((TextWebSocketFrame) frame).text(); if (!request.equals("ping")){ if (!request.equals("ping")) { JSONObject jsonObj = JSON.parseObject(request); String type = jsonObj.get("type").toString(); if (type != null && type.equals("login")){ if (type != null && type.equals("login")) { //登录链接 String id = jsonObj.get("id").toString(); NettyConfig.getUserChannelMap().put(id,ctx.channel()); NettyConfig.getUserChannelMap().put(id, ctx.channel()); System.out.println(jsonObj.get("type")); System.out.println(jsonObj.get("id")); @@ -112,12 +108,8 @@ AttributeKey<String> key = AttributeKey.valueOf("userId"); ctx.channel().attr(key).setIfAbsent(id); //把用户信息添加到通道里 ChannelSupervise.addChannel(ctx.channel(),id); ChannelSupervise.addChannel(ctx.channel(), id); } } @@ -174,6 +166,7 @@ // f.addListener(ChannelFutureListener.CLOSE); // } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); @@ -182,9 +175,10 @@ /** * 删除用户与channel 对应关系 * * @param ctx */ private void removeUserId(ChannelHandlerContext ctx){ private void removeUserId(ChannelHandlerContext ctx) { AttributeKey<String> key = AttributeKey.valueOf("userId"); String userId = ctx.channel().attr(key).get(); NettyConfig.getUserChannelMap().remove(userId); src/main/java/org/springblade/modules/webscoket/WebSocketServer.java
@@ -14,7 +14,7 @@ import io.netty.handler.stream.ChunkedWriteHandler; public class WebSocketServer { private int port = 9034; private int port = 2086; public WebSocketServer(int port) { bind(port);