src/main/java/org/springblade/modules/nettyServer/ChannelMap.java
@@ -1,36 +1,36 @@ package org.springblade.modules.nettyServer; import io.netty.channel.Channel; import java.util.concurrent.ConcurrentHashMap; public class ChannelMap { public static int channelNum=0; private static ConcurrentHashMap<String, Channel> channelHashMap=null;//concurrentHashmap以解决多线程冲突 public static ConcurrentHashMap<String, Channel> getChannelHashMap() { return channelHashMap; } public static Channel getChannelByName(String name){ if(channelHashMap==null||channelHashMap.isEmpty()){ return null; } return channelHashMap.get(name); } public static void addChannel(String name, Channel channel){ if(channelHashMap==null){ channelHashMap=new ConcurrentHashMap<String, Channel>(10); } channelHashMap.put(name,channel); channelNum++; } public static int removeChannelByName(String name){ if(channelHashMap.containsKey(name)){ channelHashMap.remove(name); return 0; }else{ return 1; } } } //package org.springblade.modules.nettyServer; // //import io.netty.channel.Channel; // //import java.util.concurrent.ConcurrentHashMap; // //public class ChannelMap { // public static int channelNum=0; // private static ConcurrentHashMap<String, Channel> channelHashMap=null;//concurrentHashmap以解决多线程冲突 // // public static ConcurrentHashMap<String, Channel> getChannelHashMap() { // return channelHashMap; // } // // public static Channel getChannelByName(String name){ // if(channelHashMap==null||channelHashMap.isEmpty()){ // return null; // } // return channelHashMap.get(name); // } // public static void addChannel(String name, Channel channel){ // if(channelHashMap==null){ // channelHashMap=new ConcurrentHashMap<String, Channel>(10); // } // channelHashMap.put(name,channel); // channelNum++; // } // public static int removeChannelByName(String name){ // if(channelHashMap.containsKey(name)){ // channelHashMap.remove(name); // return 0; // }else{ // return 1; // } // } //} src/main/java/org/springblade/modules/nettyServer/NettyConfig.java
@@ -1,45 +1,45 @@ package org.springblade.modules.nettyServer; import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.concurrent.ConcurrentHashMap; public class NettyConfig { /** * 存储每一个客户端接入进来时的channel对象 */ public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); /** * 定义一个channel组,管理所有channel * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例 */ private static ChannelGroup channelGroup = new DefaultChannelGroup("用户管理组", GlobalEventExecutor.INSTANCE); /** * 存放用户与chanel 的对应的信息,用于给指定用户发送信息 */ private static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>(); public NettyConfig() { } /** * 获取用户channel 组 * @return */ public static ChannelGroup getChannelGroup() { return channelGroup; } /** * 获取用户channel map * @return */ public static ConcurrentHashMap<String, Channel> getUserChannelMap() { return userChannelMap; } } //package org.springblade.modules.nettyServer; //import io.netty.channel.Channel; //import io.netty.channel.group.ChannelGroup; //import io.netty.channel.group.DefaultChannelGroup; //import io.netty.util.concurrent.GlobalEventExecutor; // //import java.util.concurrent.ConcurrentHashMap; // // //public class NettyConfig { // /** // * 存储每一个客户端接入进来时的channel对象 // */ // public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // // /** // * 定义一个channel组,管理所有channel // * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例 // */ // private static ChannelGroup channelGroup = new DefaultChannelGroup("用户管理组", GlobalEventExecutor.INSTANCE); // // /** // * 存放用户与chanel 的对应的信息,用于给指定用户发送信息 // */ // private static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>(); // // public NettyConfig() { // } // // /** // * 获取用户channel 组 // * @return // */ // public static ChannelGroup getChannelGroup() { // return channelGroup; // } // // /** // * 获取用户channel map // * @return // */ // public static ConcurrentHashMap<String, Channel> getUserChannelMap() { // return userChannelMap; // } //} src/main/java/org/springblade/modules/nettyServer/Server.java
@@ -1,86 +1,86 @@ package org.springblade.modules.nettyServer; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; public class Server { private int port; private ServerSocketChannel serverSocketChannel; public Server(int port){ this.port = port; bind(); } private void bind() { Thread thread = new Thread(new Runnable() { @Override public void run() { //服务端要建立两个group,一个负责接收客户端的连接,一个负责处理数据传输 //连接处理group EventLoopGroup boss = new NioEventLoopGroup(); //事件处理group EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); // 绑定处理group bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) //保持连接数 .option(ChannelOption.SO_BACKLOG, 1024) //有数据立即发送 .option(ChannelOption.TCP_NODELAY, true) //保持连接 .childOption(ChannelOption.SO_KEEPALIVE, true) //处理新连接 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // 增加任务处理 ChannelPipeline p = sc.pipeline(); p.addLast( // //使用了netty自带的编码器和解码器 // new StringDecoder(), // new StringEncoder(), //心跳检测,读超时,写超时,读写超时 //new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS), //自定义的处理器 new ServerHandler()); } }); //绑定端口,同步等待成功 ChannelFuture future; try { future = bootstrap.bind(port).sync(); if (future.isSuccess()) { serverSocketChannel = (ServerSocketChannel) future.channel(); System.out.println("服务端启动成功,端口:"+port); } else { System.out.println("服务端启动失败!"); } //等待服务监听端口关闭,就是由于这里会将线程阻塞,导致无法发送信息,所以我这里开了线程 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { //优雅地退出,释放线程池资源 boss.shutdownGracefully(); worker.shutdownGracefully(); } } }); thread.start(); } public void sendMessage(Object msg){ if(serverSocketChannel != null){ serverSocketChannel.writeAndFlush(msg); } } } //package org.springblade.modules.nettyServer; // //import io.netty.bootstrap.ServerBootstrap; //import io.netty.channel.*; //import io.netty.channel.nio.NioEventLoopGroup; //import io.netty.channel.socket.ServerSocketChannel; //import io.netty.channel.socket.SocketChannel; //import io.netty.channel.socket.nio.NioServerSocketChannel; // // //public class Server { // private int port; // private ServerSocketChannel serverSocketChannel; // // public Server(int port){ // this.port = port; // bind(); // } // // private void bind() { // Thread thread = new Thread(new Runnable() { // @Override // public void run() { // //服务端要建立两个group,一个负责接收客户端的连接,一个负责处理数据传输 // //连接处理group // EventLoopGroup boss = new NioEventLoopGroup(); // //事件处理group // EventLoopGroup worker = new NioEventLoopGroup(); // ServerBootstrap bootstrap = new ServerBootstrap(); // // 绑定处理group // bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) // //保持连接数 // .option(ChannelOption.SO_BACKLOG, 1024) // //有数据立即发送 // .option(ChannelOption.TCP_NODELAY, true) // //保持连接 // .childOption(ChannelOption.SO_KEEPALIVE, true) // //处理新连接 // .childHandler(new ChannelInitializer<SocketChannel>() { // @Override // protected void initChannel(SocketChannel sc) throws Exception { // // 增加任务处理 // ChannelPipeline p = sc.pipeline(); // p.addLast( //// //使用了netty自带的编码器和解码器 //// new StringDecoder(), //// new StringEncoder(), // //心跳检测,读超时,写超时,读写超时 // //new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS), // //自定义的处理器 // new ServerHandler()); // } // }); // // //绑定端口,同步等待成功 // ChannelFuture future; // try { // future = bootstrap.bind(port).sync(); // if (future.isSuccess()) { // serverSocketChannel = (ServerSocketChannel) future.channel(); // System.out.println("服务端启动成功,端口:"+port); // } else { // System.out.println("服务端启动失败!"); // } // // //等待服务监听端口关闭,就是由于这里会将线程阻塞,导致无法发送信息,所以我这里开了线程 // future.channel().closeFuture().sync(); // } catch (Exception e) { // e.printStackTrace(); // } // finally { // //优雅地退出,释放线程池资源 // boss.shutdownGracefully(); // worker.shutdownGracefully(); // } // } // }); // thread.start(); // } // // public void sendMessage(Object msg){ // if(serverSocketChannel != null){ // serverSocketChannel.writeAndFlush(msg); // } // } //} src/main/java/org/springblade/modules/nettyServer/ServerHandler.java
@@ -1,89 +1,89 @@ package org.springblade.modules.nettyServer; import com.alibaba.fastjson.JSONObject; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import org.springblade.modules.webscoket.service.IPushMsgService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.text.SimpleDateFormat; import java.util.*; import java.util.concurrent.ConcurrentHashMap; @Component public class ServerHandler extends ChannelInboundHandlerAdapter { private String reg_LA = "LA[d]{8}[d|A-F]{12}[d|A-F]{8}[d|A-Z]{2}[d|A-F]{4}[x2A][d|A-F]{6}[#@]"; private String reg_LB = "LB[\\d|A-F]{12}[\\x2A].*[#@]"; private String reg_LB2 = "LB[\\d|A-F]{6}[\\x2A].*[#@]"; private String reg_LD = "LD[d]{8}[d|A-F]{12}:[A-Z]{4}[\\x2A].*[#@]"; private ConcurrentHashMap<String, Channel> sessionChannelMap = new ConcurrentHashMap<String, Channel>(); private static ServerHandler serverHandler; @PostConstruct public void init() { serverHandler = this; } /** * 客户端与服务端创建连接的时候调用 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("CTX:" + ctx.channel()); System.out.println("客户端与服务端连接开始..."); } /** * 客户端与服务端断开连接时调用 */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("客户端与服务端连接关闭..."); NettyConfig.group.remove(ctx.channel()); } /** * 服务端接收客户端发送过来的数据结束之后调用 */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); System.out.println("信息接收完毕..."); } /** * 工程出现异常的时候调用 */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 服务端处理客户端websocket请求的核心方法,这里接收了客户端发来的信息 */ @Override public void channelRead(ChannelHandlerContext channelHandlerContext, Object info) throws Exception { System.out.println("接收到了:" + info); ByteBuf buf = (ByteBuf) info; byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); String body = new String(req, "UTF-8"); String content = body; System.out.println("接收客户端数据:" + body); } } //package org.springblade.modules.nettyServer; // //import com.alibaba.fastjson.JSONObject; //import io.netty.buffer.ByteBuf; //import io.netty.buffer.Unpooled; //import io.netty.channel.Channel; //import io.netty.channel.ChannelHandlerContext; //import io.netty.channel.ChannelInboundHandlerAdapter; //import io.netty.util.CharsetUtil; //import org.springblade.modules.webscoket.service.IPushMsgService; //import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.stereotype.Component; // //import javax.annotation.PostConstruct; //import java.text.SimpleDateFormat; //import java.util.*; //import java.util.concurrent.ConcurrentHashMap; // //@Component //public class ServerHandler extends ChannelInboundHandlerAdapter { // // private String reg_LA = "LA[d]{8}[d|A-F]{12}[d|A-F]{8}[d|A-Z]{2}[d|A-F]{4}[x2A][d|A-F]{6}[#@]"; // private String reg_LB = "LB[\\d|A-F]{12}[\\x2A].*[#@]"; // private String reg_LB2 = "LB[\\d|A-F]{6}[\\x2A].*[#@]"; // private String reg_LD = "LD[d]{8}[d|A-F]{12}:[A-Z]{4}[\\x2A].*[#@]"; // // // private ConcurrentHashMap<String, Channel> sessionChannelMap = new ConcurrentHashMap<String, Channel>(); // // private static ServerHandler serverHandler; // // @PostConstruct // public void init() { // serverHandler = this; // } // // /** // * 客户端与服务端创建连接的时候调用 // */ // @Override // public void channelActive(ChannelHandlerContext ctx) throws Exception { // System.out.println("CTX:" + ctx.channel()); // System.out.println("客户端与服务端连接开始..."); // } // // /** // * 客户端与服务端断开连接时调用 // */ // @Override // public void channelInactive(ChannelHandlerContext ctx) throws Exception { // System.out.println("客户端与服务端连接关闭..."); // NettyConfig.group.remove(ctx.channel()); // } // // /** // * 服务端接收客户端发送过来的数据结束之后调用 // */ // @Override // public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // ctx.flush(); // System.out.println("信息接收完毕..."); // } // // /** // * 工程出现异常的时候调用 // */ // @Override // public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // cause.printStackTrace(); // ctx.close(); // } // // /** // * 服务端处理客户端websocket请求的核心方法,这里接收了客户端发来的信息 // */ // @Override // public void channelRead(ChannelHandlerContext channelHandlerContext, Object info) throws Exception { // System.out.println("接收到了:" + info); // ByteBuf buf = (ByteBuf) info; // byte[] req = new byte[buf.readableBytes()]; // buf.readBytes(req); // String body = new String(req, "UTF-8"); // String content = body; // System.out.println("接收客户端数据:" + body); // // } // // //} src/main/java/org/springblade/modules/runner/MyRunner.java
@@ -1,24 +1,24 @@ package org.springblade.modules.runner; import okhttp3.WebSocket; import org.springblade.common.config.CommonConfig; import org.springblade.common.config.FtpConfig; import org.springblade.modules.webscoket.WebSocketServer; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; /** * 自定义启动(项目启动即启动) * @author zhongrj * @since 2022-01-06 */ @Component public class MyRunner implements CommandLineRunner { @Override public void run(String... args) throws Exception { System.out.println("websocketServer 开始启动!"); //启动即创建webSocketServer WebSocketServer socketServer = new WebSocketServer(CommonConfig.socketPort); } } //package org.springblade.modules.runner; // //import okhttp3.WebSocket; //import org.springblade.common.config.CommonConfig; //import org.springblade.common.config.FtpConfig; //import org.springblade.modules.webscoket.WebSocketServer; //import org.springframework.boot.CommandLineRunner; //import org.springframework.stereotype.Component; // ///** // * 自定义启动(项目启动即启动) // * @author zhongrj // * @since 2022-01-06 // */ //@Component //public class MyRunner implements CommandLineRunner { // // @Override // public void run(String... args) throws Exception { // System.out.println("websocketServer 开始启动!"); // //启动即创建webSocketServer // WebSocketServer socketServer = new WebSocketServer(CommonConfig.socketPort); // } //} src/main/java/org/springblade/modules/webscoket/ChannelSupervise.java
@@ -1,38 +1,38 @@ package org.springblade.modules.webscoket; import io.netty.channel.Channel; import io.netty.channel.ChannelId; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.GlobalEventExecutor; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class ChannelSupervise { private static ChannelGroup GlobalGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static ConcurrentMap<String, ChannelId> ChannelMap=new ConcurrentHashMap(); private static Map<String, String> map = new HashMap<String, String>();; public static void addChannel(Channel channel, String name){ GlobalGroup.add(channel); ChannelMap.put(channel.id().asShortText(),channel.id()); map.put(channel.id().asShortText(),name); } public static void removeChannel(Channel channel){ GlobalGroup.remove(channel); ChannelMap.remove(channel.id().asShortText()); map.remove(channel.id().asShortText()); } public static Channel findChannel(String id){ return GlobalGroup.find(ChannelMap.get(id)); } public static String findName(String id){ return map.get(id); } public static void send2All(TextWebSocketFrame tws){ GlobalGroup.writeAndFlush(tws); } } //package org.springblade.modules.webscoket; //import io.netty.channel.Channel; //import io.netty.channel.ChannelId; //import io.netty.channel.group.ChannelGroup; //import io.netty.channel.group.DefaultChannelGroup; //import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; //import io.netty.util.concurrent.GlobalEventExecutor; // //import java.util.HashMap; //import java.util.Map; //import java.util.concurrent.ConcurrentHashMap; //import java.util.concurrent.ConcurrentMap; // //public class ChannelSupervise { // private static ChannelGroup GlobalGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); // private static ConcurrentMap<String, ChannelId> ChannelMap=new ConcurrentHashMap(); // private static Map<String, String> map = new HashMap<String, String>();; // public static void addChannel(Channel channel, String name){ // GlobalGroup.add(channel); // ChannelMap.put(channel.id().asShortText(),channel.id()); // map.put(channel.id().asShortText(),name); // } // public static void removeChannel(Channel channel){ // GlobalGroup.remove(channel); // ChannelMap.remove(channel.id().asShortText()); // map.remove(channel.id().asShortText()); // } // public static Channel findChannel(String id){ // return GlobalGroup.find(ChannelMap.get(id)); // } // public static String findName(String id){ // return map.get(id); // } // public static void send2All(TextWebSocketFrame tws){ // GlobalGroup.writeAndFlush(tws); // } // //} src/main/java/org/springblade/modules/webscoket/WebSocketHandler.java
@@ -1,193 +1,193 @@ package org.springblade.modules.webscoket; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.AttributeKey; import io.netty.util.CharsetUtil; import org.springblade.modules.nettyServer.NettyConfig; import org.springblade.modules.system.service.IUserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class WebSocketHandler extends SimpleChannelInboundHandler<Object> { private WebSocketServerHandshaker handshaker; private String on=null; @Autowired private IUserService userService; private static WebSocketHandler webSocketHandler; @PostConstruct public void init() { webSocketHandler = this; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { //以http请求形式接入,但是走的是websocket handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { //处理websocket客户端的消息 handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } } /** * 客户端加入连接 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //添加连接 System.out.println("客户端加入连接:" + ctx.channel()); //ChannelSupervise.addChannel(ctx.channel()); } /** * 客户端离开连接 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //断开连接 System.out.println("客户端断开连接:" + ctx.channel()); //用户离线状态 String name = ChannelSupervise.findName(ctx.channel().id().asShortText()); if ( name != null &&!name.equals("ping") ){ String num="0"; //工作状态(0闲置,1工作中) String workSt = "0"; // webSocketHandler.userService.updateUser(num,name,workSt); //ChannelSupervise.removeChannel(ctx.channel()); NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws NumberFormatException, Exception { System.out.println("ctx = " + ctx); System.out.println("frame = " + frame); // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判断是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write( new PongWebSocketFrame(frame.content().retain())); return; } // 本例程仅支持文本消息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)) { System.out.println("本例程仅支持文本消息,不支持二进制消息"); throw new UnsupportedOperationException(String.format( "%s frame types not supported", frame.getClass().getName())); } // 返回应答消息 String request = ((TextWebSocketFrame) frame).text(); if (!request.equals("ping")){ NettyConfig.getUserChannelMap().put(request,ctx.channel()); //将用户id作为自定义属性加入到channel 中,方便随时channel中获取用户id AttributeKey<String> key = AttributeKey.valueOf("userId"); ctx.channel().attr(key).setIfAbsent(request); //把用户信息添加到通道里 ChannelSupervise.addChannel(ctx.channel(),request); //用户在线状态 this.on=request; //在线状态(0掉线,1在线) String num="1"; //工作状态(0闲置,1工作中) String workSt = "0"; // webSocketHandler.userService.updateUser(num,request,workSt); } } /** * 唯一的一次http请求,用于创建websocket * * @throws InterruptedException */ private void handleHttpRequest(final ChannelHandlerContext ctx, FullHttpRequest req) throws InterruptedException { //要求Upgrade为websocket,过滤掉get/Post if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } //握手 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://localhost:9034/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory .sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } /** * 拒绝不合法的请求,并返回错误信息 */ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } ChannelFuture f = ctx.channel().writeAndFlush(res); // 如果是非Keep-Alive,关闭连接 // if (!isKeepAlive(req) || res.status().code() != 200) { // f.addListener(ChannelFutureListener.CLOSE); //package org.springblade.modules.webscoket; // //import io.netty.buffer.ByteBuf; //import io.netty.buffer.Unpooled; //import io.netty.channel.ChannelFuture; //import io.netty.channel.ChannelHandlerContext; //import io.netty.channel.SimpleChannelInboundHandler; //import io.netty.handler.codec.http.DefaultFullHttpResponse; //import io.netty.handler.codec.http.FullHttpRequest; //import io.netty.handler.codec.http.HttpResponseStatus; //import io.netty.handler.codec.http.HttpVersion; //import io.netty.handler.codec.http.websocketx.*; //import io.netty.util.AttributeKey; //import io.netty.util.CharsetUtil; //import org.springblade.modules.nettyServer.NettyConfig; //import org.springblade.modules.system.service.IUserService; //import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.stereotype.Component; // //import javax.annotation.PostConstruct; // //@Component //public class WebSocketHandler extends SimpleChannelInboundHandler<Object> { // // private WebSocketServerHandshaker handshaker; // // private String on=null; // // @Autowired // private IUserService userService; // // private static WebSocketHandler webSocketHandler; // // @PostConstruct // public void init() { // webSocketHandler = this; // } // // @Override // protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // if (msg instanceof FullHttpRequest) { // //以http请求形式接入,但是走的是websocket // handleHttpRequest(ctx, (FullHttpRequest) msg); // } else if (msg instanceof WebSocketFrame) { // //处理websocket客户端的消息 // handlerWebSocketFrame(ctx, (WebSocketFrame) msg); // } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 删除用户与channel 对应关系 * @param ctx */ private void removeUserId(ChannelHandlerContext ctx){ AttributeKey<String> key = AttributeKey.valueOf("userId"); String userId = ctx.channel().attr(key).get(); NettyConfig.getUserChannelMap().remove(userId); } } // } // // /** // * 客户端加入连接 // * @param ctx // * @throws Exception // */ // @Override // public void channelActive(ChannelHandlerContext ctx) throws Exception { // //添加连接 // System.out.println("客户端加入连接:" + ctx.channel()); // //ChannelSupervise.addChannel(ctx.channel()); // } // // /** // * 客户端离开连接 // * @param ctx // * @throws Exception // */ // @Override // public void channelInactive(ChannelHandlerContext ctx) throws Exception { // //断开连接 // System.out.println("客户端断开连接:" + ctx.channel()); // //用户离线状态 // String name = ChannelSupervise.findName(ctx.channel().id().asShortText()); // if ( name != null &&!name.equals("ping") ){ // String num="0"; // //工作状态(0闲置,1工作中) // String workSt = "0"; //// webSocketHandler.userService.updateUser(num,name,workSt); // //ChannelSupervise.removeChannel(ctx.channel()); // // NettyConfig.getChannelGroup().remove(ctx.channel()); // removeUserId(ctx); // } // } // // @Override // public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { // ctx.flush(); // } // // private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws NumberFormatException, Exception { // System.out.println("ctx = " + ctx); // System.out.println("frame = " + frame); // // 判断是否关闭链路的指令 // if (frame instanceof CloseWebSocketFrame) { // handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); // return; // } // // 判断是否ping消息 // if (frame instanceof PingWebSocketFrame) { // ctx.channel().write( // new PongWebSocketFrame(frame.content().retain())); // return; // } // // 本例程仅支持文本消息,不支持二进制消息 // if (!(frame instanceof TextWebSocketFrame)) { // System.out.println("本例程仅支持文本消息,不支持二进制消息"); // throw new UnsupportedOperationException(String.format( // "%s frame types not supported", frame.getClass().getName())); // } // // 返回应答消息 // String request = ((TextWebSocketFrame) frame).text(); // // if (!request.equals("ping")){ // // NettyConfig.getUserChannelMap().put(request,ctx.channel()); // // //将用户id作为自定义属性加入到channel 中,方便随时channel中获取用户id // AttributeKey<String> key = AttributeKey.valueOf("userId"); // ctx.channel().attr(key).setIfAbsent(request); // // //把用户信息添加到通道里 // ChannelSupervise.addChannel(ctx.channel(),request); // //用户在线状态 // this.on=request; // //在线状态(0掉线,1在线) // String num="1"; // //工作状态(0闲置,1工作中) // String workSt = "0"; //// webSocketHandler.userService.updateUser(num,request,workSt); // } // // } // // /** // * 唯一的一次http请求,用于创建websocket // * // * @throws InterruptedException // */ // private void handleHttpRequest(final ChannelHandlerContext ctx, // FullHttpRequest req) throws InterruptedException { // //要求Upgrade为websocket,过滤掉get/Post // if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { // //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端 // sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); // return; // } // //握手 // WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( // "ws://localhost:9034/websocket", null, false); // handshaker = wsFactory.newHandshaker(req); // if (handshaker == null) { // WebSocketServerHandshakerFactory // .sendUnsupportedVersionResponse(ctx.channel()); // } else { // handshaker.handshake(ctx.channel(), req); // } // } // // /** // * 拒绝不合法的请求,并返回错误信息 // */ // private static void sendHttpResponse(ChannelHandlerContext ctx, // FullHttpRequest req, DefaultFullHttpResponse res) { // // 返回应答给客户端 // if (res.status().code() != 200) { // ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), // CharsetUtil.UTF_8); // res.content().writeBytes(buf); // buf.release(); // } // ChannelFuture f = ctx.channel().writeAndFlush(res); // // 如果是非Keep-Alive,关闭连接 //// if (!isKeepAlive(req) || res.status().code() != 200) { //// f.addListener(ChannelFutureListener.CLOSE); //// } // } // @Override // public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // cause.printStackTrace(); // ctx.close(); // } // // /** // * 删除用户与channel 对应关系 // * @param ctx // */ // private void removeUserId(ChannelHandlerContext ctx){ // AttributeKey<String> key = AttributeKey.valueOf("userId"); // String userId = ctx.channel().attr(key).get(); // NettyConfig.getUserChannelMap().remove(userId); // } // //} src/main/java/org/springblade/modules/webscoket/WebSocketServer.java
@@ -1,59 +1,59 @@ package org.springblade.modules.webscoket; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.stream.ChunkedWriteHandler; public class WebSocketServer { private int port = 9034; public WebSocketServer(int port) { bind(port); } public void bind(int port) { Thread thread = new Thread(new Runnable() { @Override public void run() { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) //保持连接 .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { // ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));//设置log监听器,并且日志级别为debug,方便观察运行流程 ch.pipeline().addLast("http-codec", new HttpServerCodec());//设置解码器 ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));//聚合器,使用websocket会用到 ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());//用于大数据的分区传输 ch.pipeline().addLast("handler", new WebSocketHandler());//自定义的业务handler } }); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); System.out.println("WebSocketServer启动成功"); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }); thread.start(); } } //package org.springblade.modules.webscoket; //import io.netty.bootstrap.ServerBootstrap; //import io.netty.channel.ChannelFuture; //import io.netty.channel.ChannelInitializer; //import io.netty.channel.ChannelOption; //import io.netty.channel.EventLoopGroup; //import io.netty.channel.nio.NioEventLoopGroup; //import io.netty.channel.socket.SocketChannel; //import io.netty.channel.socket.nio.NioServerSocketChannel; //import io.netty.handler.codec.http.HttpObjectAggregator; //import io.netty.handler.codec.http.HttpServerCodec; //import io.netty.handler.logging.LogLevel; //import io.netty.handler.logging.LoggingHandler; //import io.netty.handler.stream.ChunkedWriteHandler; // //public class WebSocketServer { // private int port = 9034; // // public WebSocketServer(int port) { // bind(port); // } // // public void bind(int port) { // Thread thread = new Thread(new Runnable() { // @Override // public void run() { // EventLoopGroup bossGroup = new NioEventLoopGroup(); // EventLoopGroup workerGroup = new NioEventLoopGroup(); // try { // ServerBootstrap serverBootstrap = new ServerBootstrap(); // serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) // .handler(new LoggingHandler(LogLevel.INFO)) // //保持连接 // .childOption(ChannelOption.SO_KEEPALIVE, true) // .childHandler(new ChannelInitializer<SocketChannel>() { // @Override // protected void initChannel(SocketChannel ch) { //// ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));//设置log监听器,并且日志级别为debug,方便观察运行流程 // ch.pipeline().addLast("http-codec", new HttpServerCodec());//设置解码器 // ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));//聚合器,使用websocket会用到 // ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());//用于大数据的分区传输 // ch.pipeline().addLast("handler", new WebSocketHandler());//自定义的业务handler // } // }); // // ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); // System.out.println("WebSocketServer启动成功"); // channelFuture.channel().closeFuture().sync(); // } catch (Exception e) { // e.printStackTrace(); // } finally { // bossGroup.shutdownGracefully(); // workerGroup.shutdownGracefully(); // } // } // }); // thread.start(); // } //} src/main/java/org/springblade/modules/webscoket/controller/PushMsgController.java
@@ -1,30 +1,30 @@ package org.springblade.modules.webscoket.controller; import org.springblade.modules.webscoket.service.IPushMsgService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RestController; /** * @author lq * @date 2020/4/1 11:22 */ @RestController public class PushMsgController { @Autowired private IPushMsgService pushMsgService; @PostMapping("/pushUser") public String pushUser(String userId,String msg){ pushMsgService.pushMsg(userId, msg); return "消息发送成功:"+msg; } @PostMapping("/pushAll") public String pushAll(String msg){ pushMsgService.pushMsg(msg); return "消息发送成功:"+msg; } } //package org.springblade.modules.webscoket.controller; // //import org.springblade.modules.webscoket.service.IPushMsgService; //import org.springframework.beans.factory.annotation.Autowired; //import org.springframework.web.bind.annotation.PostMapping; //import org.springframework.web.bind.annotation.RestController; // ///** // * @author lq // * @date 2020/4/1 11:22 // */ //@RestController //public class PushMsgController { // // @Autowired // private IPushMsgService pushMsgService; // // @PostMapping("/pushUser") // public String pushUser(String userId,String msg){ // pushMsgService.pushMsg(userId, msg); // return "消息发送成功:"+msg; // } // // @PostMapping("/pushAll") // public String pushAll(String msg){ // pushMsgService.pushMsg(msg); // return "消息发送成功:"+msg; // } // //} src/main/java/org/springblade/modules/webscoket/index.html
@@ -1,57 +1,57 @@ <html> <head> <meta http-equiv="Content-Type" content="text/html; charset = utf-8"/> <title>WebSocket客户端</title> <script type="text/javascript"> var socket; if(!window.WebSocket){ window.WebSocket = window.MozWebSocket; } <!--<html>--> <!--<head>--> <!-- <meta http-equiv="Content-Type" content="text/html; charset = utf-8"/>--> <!-- <title>WebSocket客户端</title>--> <!-- <script type="text/javascript">--> <!-- var socket;--> <!-- if(!window.WebSocket){--> <!-- window.WebSocket = window.MozWebSocket;--> <!-- }--> if(window.WebSocket){ socket = new WebSocket("ws://localhost:9034/websocket"); socket.onmessage = function(event){ var ta = document.getElementById('responseContent'); ta.value += event.data + "\r\n"; }; <!-- if(window.WebSocket){--> <!-- socket = new WebSocket("ws://localhost:9034/websocket");--> <!-- socket.onmessage = function(event){--> <!-- var ta = document.getElementById('responseContent');--> <!-- ta.value += event.data + "\r\n";--> <!-- };--> socket.onopen = function(event){ var ta = document.getElementById('responseContent'); ta.value = "你当前的浏览器支持WebSocket,请进行后续操作\r\n"; }; <!-- socket.onopen = function(event){--> <!-- var ta = document.getElementById('responseContent');--> <!-- ta.value = "你当前的浏览器支持WebSocket,请进行后续操作\r\n";--> <!-- };--> socket.onclose = function(event){ var ta = document.getElementById('responseContent'); ta.value = ""; <!-- socket.onclose = function(event){--> <!-- var ta = document.getElementById('responseContent');--> <!-- ta.value = "";--> ta.value = "WebSocket连接已经关闭\r\n"; }; }else{ alert("您的浏览器不支持WebSocket"); } <!-- ta.value = "WebSocket连接已经关闭\r\n";--> <!-- };--> <!-- }else{--> <!-- alert("您的浏览器不支持WebSocket");--> <!-- }--> function send(message){ if(!window.WebSocket){ return; } if(socket.readyState == WebSocket.OPEN){ socket.send(message); }else{ alert("WebSocket连接没有建立成功!!"); } } </script> </head> <body> <form onSubmit="return false;"> <input type = "text" name = "message" value = ""/> <br/><br/> <input type = "button" value = "发送WebSocket请求消息" onClick = "send(this.form.message.value)"/> <hr color="red"/> <h2>客户端接收到服务端返回的应答消息</h2> <textarea id = "responseContent" style = "width:1024px; height:300px"></textarea> </form> </body> </html> <!-- function send(message){--> <!-- if(!window.WebSocket){--> <!-- return;--> <!-- }--> <!-- if(socket.readyState == WebSocket.OPEN){--> <!-- socket.send(message);--> <!-- }else{--> <!-- alert("WebSocket连接没有建立成功!!");--> <!-- }--> <!-- }--> <!-- </script>--> <!--</head>--> <!--<body>--> <!--<form onSubmit="return false;">--> <!-- <input type = "text" name = "message" value = ""/>--> <!-- <br/><br/>--> <!-- <input type = "button" value = "发送WebSocket请求消息" onClick = "send(this.form.message.value)"/>--> <!-- <hr color="red"/>--> <!-- <h2>客户端接收到服务端返回的应答消息</h2>--> <!-- <textarea id = "responseContent" style = "width:1024px; height:300px"></textarea>--> <!--</form>--> <!--</body>--> <!--</html>--> src/main/java/org/springblade/modules/webscoket/service/IPushMsgService.java
@@ -1,21 +1,21 @@ package org.springblade.modules.webscoket.service; /** * @author 123456 */ public interface IPushMsgService { /** * 给指定用户发送消息 * @param userId * @param msg */ void pushMsg(String userId,String msg); /** * 给所有用户发送消息 * @param msg */ void pushMsg(String msg); } //package org.springblade.modules.webscoket.service; // ///** // * @author 123456 // */ //public interface IPushMsgService { // // /** // * 给指定用户发送消息 // * @param userId // * @param msg // */ // void pushMsg(String userId,String msg); // // /** // * 给所有用户发送消息 // * @param msg // */ // void pushMsg(String msg); // //} src/main/java/org/springblade/modules/webscoket/service/impl/PushMsgServiceImpl.java
@@ -1,32 +1,32 @@ package org.springblade.modules.webscoket.service.impl; import io.netty.channel.Channel; import io.netty.channel.group.ChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.springblade.modules.nettyServer.NettyConfig; import org.springblade.modules.webscoket.service.IPushMsgService; import org.springframework.stereotype.Service; /** * @author lq * @date 2020/4/1 11:20 */ @Service public class PushMsgServiceImpl implements IPushMsgService { @Override public void pushMsg(String userId, String msg) { Channel channel = NettyConfig.getUserChannelMap().get(userId); if (channel != null){ channel.writeAndFlush(new TextWebSocketFrame(msg)); } } @Override public void pushMsg(String msg) { ChannelGroup group = NettyConfig.getChannelGroup(); String name = group.name(); System.out.println("空间大小:"+group.size()+",名字:"+name); group.writeAndFlush(new TextWebSocketFrame(msg)); } } //package org.springblade.modules.webscoket.service.impl; // //import io.netty.channel.Channel; //import io.netty.channel.group.ChannelGroup; //import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; //import org.springblade.modules.nettyServer.NettyConfig; //import org.springblade.modules.webscoket.service.IPushMsgService; //import org.springframework.stereotype.Service; // ///** // * @author lq // * @date 2020/4/1 11:20 // */ //@Service //public class PushMsgServiceImpl implements IPushMsgService { // @Override // public void pushMsg(String userId, String msg) { // Channel channel = NettyConfig.getUserChannelMap().get(userId); // if (channel != null){ // channel.writeAndFlush(new TextWebSocketFrame(msg)); // } // // } // // @Override // public void pushMsg(String msg) { // ChannelGroup group = NettyConfig.getChannelGroup(); // String name = group.name(); // System.out.println("空间大小:"+group.size()+",名字:"+name); // group.writeAndFlush(new TextWebSocketFrame(msg)); // } //}