钟日健
2022-05-19 de31e91aa2f38a717f2d4640a3197580ec1807a7
关闭websckot
12 files modified
1404 ■■■■ changed files
src/main/java/org/springblade/modules/nettyServer/ChannelMap.java 72 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/nettyServer/NettyConfig.java 90 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/nettyServer/Server.java 172 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/nettyServer/ServerHandler.java 178 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/runner/MyRunner.java 48 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/webscoket/ChannelSupervise.java 76 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/webscoket/WebSocketHandler.java 384 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/webscoket/WebSocketServer.java 118 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/webscoket/controller/PushMsgController.java 60 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/webscoket/index.html 100 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/webscoket/service/IPushMsgService.java 42 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/webscoket/service/impl/PushMsgServiceImpl.java 64 ●●●● patch | view | raw | blame | history
src/main/java/org/springblade/modules/nettyServer/ChannelMap.java
@@ -1,36 +1,36 @@
package org.springblade.modules.nettyServer;
import io.netty.channel.Channel;
import java.util.concurrent.ConcurrentHashMap;
public class ChannelMap {
    public static int channelNum=0;
    private static ConcurrentHashMap<String, Channel> channelHashMap=null;//concurrentHashmap以解决多线程冲突
    public static ConcurrentHashMap<String, Channel> getChannelHashMap() {
        return channelHashMap;
    }
    public static Channel getChannelByName(String name){
        if(channelHashMap==null||channelHashMap.isEmpty()){
            return null;
        }
        return channelHashMap.get(name);
    }
    public static void addChannel(String name, Channel channel){
        if(channelHashMap==null){
            channelHashMap=new ConcurrentHashMap<String, Channel>(10);
        }
        channelHashMap.put(name,channel);
        channelNum++;
    }
    public static int removeChannelByName(String name){
        if(channelHashMap.containsKey(name)){
            channelHashMap.remove(name);
            return 0;
        }else{
            return 1;
        }
    }
}
//package org.springblade.modules.nettyServer;
//
//import io.netty.channel.Channel;
//
//import java.util.concurrent.ConcurrentHashMap;
//
//public class ChannelMap {
//    public static int channelNum=0;
//    private static ConcurrentHashMap<String, Channel> channelHashMap=null;//concurrentHashmap以解决多线程冲突
//
//    public static ConcurrentHashMap<String, Channel> getChannelHashMap() {
//        return channelHashMap;
//    }
//
//    public static Channel getChannelByName(String name){
//        if(channelHashMap==null||channelHashMap.isEmpty()){
//            return null;
//        }
//        return channelHashMap.get(name);
//    }
//    public static void addChannel(String name, Channel channel){
//        if(channelHashMap==null){
//            channelHashMap=new ConcurrentHashMap<String, Channel>(10);
//        }
//        channelHashMap.put(name,channel);
//        channelNum++;
//    }
//    public static int removeChannelByName(String name){
//        if(channelHashMap.containsKey(name)){
//            channelHashMap.remove(name);
//            return 0;
//        }else{
//            return 1;
//        }
//    }
//}
src/main/java/org/springblade/modules/nettyServer/NettyConfig.java
@@ -1,45 +1,45 @@
package org.springblade.modules.nettyServer;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.concurrent.ConcurrentHashMap;
public class NettyConfig {
    /**
     * 存储每一个客户端接入进来时的channel对象
     */
    public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    /**
     * 定义一个channel组,管理所有channel
     * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
     */
    private static ChannelGroup channelGroup = new DefaultChannelGroup("用户管理组", GlobalEventExecutor.INSTANCE);
    /**
     * 存放用户与chanel 的对应的信息,用于给指定用户发送信息
     */
    private static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>();
    public NettyConfig() {
    }
    /**
     * 获取用户channel 组
     * @return
     */
    public static ChannelGroup getChannelGroup() {
        return channelGroup;
    }
    /**
     * 获取用户channel map
     * @return
     */
    public static ConcurrentHashMap<String, Channel> getUserChannelMap() {
        return userChannelMap;
    }
}
//package org.springblade.modules.nettyServer;
//import io.netty.channel.Channel;
//import io.netty.channel.group.ChannelGroup;
//import io.netty.channel.group.DefaultChannelGroup;
//import io.netty.util.concurrent.GlobalEventExecutor;
//
//import java.util.concurrent.ConcurrentHashMap;
//
//
//public class NettyConfig {
//    /**
//     * 存储每一个客户端接入进来时的channel对象
//     */
//    public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//
//    /**
//     * 定义一个channel组,管理所有channel
//     * GlobalEventExecutor.INSTANCE 是全局的事件执行器,是一个单例
//     */
//    private static ChannelGroup channelGroup = new DefaultChannelGroup("用户管理组", GlobalEventExecutor.INSTANCE);
//
//    /**
//     * 存放用户与chanel 的对应的信息,用于给指定用户发送信息
//     */
//    private static ConcurrentHashMap<String, Channel> userChannelMap = new ConcurrentHashMap<>();
//
//    public NettyConfig() {
//    }
//
//    /**
//     * 获取用户channel 组
//     * @return
//     */
//    public static ChannelGroup getChannelGroup() {
//        return channelGroup;
//    }
//
//    /**
//     * 获取用户channel map
//     * @return
//     */
//    public static ConcurrentHashMap<String, Channel> getUserChannelMap() {
//        return userChannelMap;
//    }
//}
src/main/java/org/springblade/modules/nettyServer/Server.java
@@ -1,86 +1,86 @@
package org.springblade.modules.nettyServer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class Server {
    private int port;
    private ServerSocketChannel serverSocketChannel;
    public Server(int port){
        this.port = port;
        bind();
    }
    private void bind() {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                //服务端要建立两个group,一个负责接收客户端的连接,一个负责处理数据传输
                //连接处理group
                EventLoopGroup boss = new NioEventLoopGroup();
                //事件处理group
                EventLoopGroup worker = new NioEventLoopGroup();
                ServerBootstrap bootstrap = new ServerBootstrap();
                // 绑定处理group
                bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
                    //保持连接数
                    .option(ChannelOption.SO_BACKLOG, 1024)
                    //有数据立即发送
                    .option(ChannelOption.TCP_NODELAY, true)
                    //保持连接
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    //处理新连接
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel sc) throws Exception {
                            // 增加任务处理
                            ChannelPipeline p = sc.pipeline();
                            p.addLast(
//                                        //使用了netty自带的编码器和解码器
//                                        new StringDecoder(),
//                                        new StringEncoder(),
                                //心跳检测,读超时,写超时,读写超时
                                //new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS),
                                //自定义的处理器
                                new ServerHandler());
                        }
                    });
                //绑定端口,同步等待成功
                ChannelFuture future;
                try {
                    future = bootstrap.bind(port).sync();
                    if (future.isSuccess()) {
                        serverSocketChannel = (ServerSocketChannel) future.channel();
                        System.out.println("服务端启动成功,端口:"+port);
                    } else {
                        System.out.println("服务端启动失败!");
                    }
                    //等待服务监听端口关闭,就是由于这里会将线程阻塞,导致无法发送信息,所以我这里开了线程
                    future.channel().closeFuture().sync();
                } catch (Exception e) {
                    e.printStackTrace();
                }
                finally {
                    //优雅地退出,释放线程池资源
                    boss.shutdownGracefully();
                    worker.shutdownGracefully();
                }
            }
        });
        thread.start();
    }
    public void sendMessage(Object msg){
        if(serverSocketChannel != null){
            serverSocketChannel.writeAndFlush(msg);
        }
    }
}
//package org.springblade.modules.nettyServer;
//
//import io.netty.bootstrap.ServerBootstrap;
//import io.netty.channel.*;
//import io.netty.channel.nio.NioEventLoopGroup;
//import io.netty.channel.socket.ServerSocketChannel;
//import io.netty.channel.socket.SocketChannel;
//import io.netty.channel.socket.nio.NioServerSocketChannel;
//
//
//public class Server {
//    private int port;
//    private ServerSocketChannel serverSocketChannel;
//
//    public Server(int port){
//        this.port = port;
//        bind();
//    }
//
//    private void bind() {
//        Thread thread = new Thread(new Runnable() {
//            @Override
//            public void run() {
//                //服务端要建立两个group,一个负责接收客户端的连接,一个负责处理数据传输
//                //连接处理group
//                EventLoopGroup boss = new NioEventLoopGroup();
//                //事件处理group
//                EventLoopGroup worker = new NioEventLoopGroup();
//                ServerBootstrap bootstrap = new ServerBootstrap();
//                // 绑定处理group
//                bootstrap.group(boss, worker).channel(NioServerSocketChannel.class)
//                    //保持连接数
//                    .option(ChannelOption.SO_BACKLOG, 1024)
//                    //有数据立即发送
//                    .option(ChannelOption.TCP_NODELAY, true)
//                    //保持连接
//                    .childOption(ChannelOption.SO_KEEPALIVE, true)
//                    //处理新连接
//                    .childHandler(new ChannelInitializer<SocketChannel>() {
//                        @Override
//                        protected void initChannel(SocketChannel sc) throws Exception {
//                            // 增加任务处理
//                            ChannelPipeline p = sc.pipeline();
//                            p.addLast(
////                                        //使用了netty自带的编码器和解码器
////                                        new StringDecoder(),
////                                        new StringEncoder(),
//                                //心跳检测,读超时,写超时,读写超时
//                                //new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS),
//                                //自定义的处理器
//                                new ServerHandler());
//                        }
//                    });
//
//                //绑定端口,同步等待成功
//                ChannelFuture future;
//                try {
//                    future = bootstrap.bind(port).sync();
//                    if (future.isSuccess()) {
//                        serverSocketChannel = (ServerSocketChannel) future.channel();
//                        System.out.println("服务端启动成功,端口:"+port);
//                    } else {
//                        System.out.println("服务端启动失败!");
//                    }
//
//                    //等待服务监听端口关闭,就是由于这里会将线程阻塞,导致无法发送信息,所以我这里开了线程
//                    future.channel().closeFuture().sync();
//                } catch (Exception e) {
//                    e.printStackTrace();
//                }
//                finally {
//                    //优雅地退出,释放线程池资源
//                    boss.shutdownGracefully();
//                    worker.shutdownGracefully();
//                }
//            }
//        });
//        thread.start();
//    }
//
//    public void sendMessage(Object msg){
//        if(serverSocketChannel != null){
//            serverSocketChannel.writeAndFlush(msg);
//        }
//    }
//}
src/main/java/org/springblade/modules/nettyServer/ServerHandler.java
@@ -1,89 +1,89 @@
package org.springblade.modules.nettyServer;
import com.alibaba.fastjson.JSONObject;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import org.springblade.modules.webscoket.service.IPushMsgService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class ServerHandler extends ChannelInboundHandlerAdapter {
    private String reg_LA = "LA[d]{8}[d|A-F]{12}[d|A-F]{8}[d|A-Z]{2}[d|A-F]{4}[x2A][d|A-F]{6}[#@]";
    private String reg_LB = "LB[\\d|A-F]{12}[\\x2A].*[#@]";
    private String reg_LB2 = "LB[\\d|A-F]{6}[\\x2A].*[#@]";
    private String reg_LD = "LD[d]{8}[d|A-F]{12}:[A-Z]{4}[\\x2A].*[#@]";
    private ConcurrentHashMap<String, Channel> sessionChannelMap = new ConcurrentHashMap<String, Channel>();
    private static ServerHandler serverHandler;
    @PostConstruct
    public void init() {
        serverHandler = this;
    }
    /**
     * 客户端与服务端创建连接的时候调用
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("CTX:" + ctx.channel());
        System.out.println("客户端与服务端连接开始...");
    }
    /**
     * 客户端与服务端断开连接时调用
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("客户端与服务端连接关闭...");
        NettyConfig.group.remove(ctx.channel());
    }
    /**
     * 服务端接收客户端发送过来的数据结束之后调用
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
        System.out.println("信息接收完毕...");
    }
    /**
     * 工程出现异常的时候调用
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
    /**
     * 服务端处理客户端websocket请求的核心方法,这里接收了客户端发来的信息
     */
    @Override
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object info) throws Exception {
        System.out.println("接收到了:" + info);
        ByteBuf buf = (ByteBuf) info;
        byte[] req = new byte[buf.readableBytes()];
        buf.readBytes(req);
        String body = new String(req, "UTF-8");
        String content = body;
        System.out.println("接收客户端数据:" + body);
    }
}
//package org.springblade.modules.nettyServer;
//
//import com.alibaba.fastjson.JSONObject;
//import io.netty.buffer.ByteBuf;
//import io.netty.buffer.Unpooled;
//import io.netty.channel.Channel;
//import io.netty.channel.ChannelHandlerContext;
//import io.netty.channel.ChannelInboundHandlerAdapter;
//import io.netty.util.CharsetUtil;
//import org.springblade.modules.webscoket.service.IPushMsgService;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.PostConstruct;
//import java.text.SimpleDateFormat;
//import java.util.*;
//import java.util.concurrent.ConcurrentHashMap;
//
//@Component
//public class ServerHandler extends ChannelInboundHandlerAdapter {
//
//    private String reg_LA = "LA[d]{8}[d|A-F]{12}[d|A-F]{8}[d|A-Z]{2}[d|A-F]{4}[x2A][d|A-F]{6}[#@]";
//    private String reg_LB = "LB[\\d|A-F]{12}[\\x2A].*[#@]";
//    private String reg_LB2 = "LB[\\d|A-F]{6}[\\x2A].*[#@]";
//    private String reg_LD = "LD[d]{8}[d|A-F]{12}:[A-Z]{4}[\\x2A].*[#@]";
//
//
//    private ConcurrentHashMap<String, Channel> sessionChannelMap = new ConcurrentHashMap<String, Channel>();
//
//    private static ServerHandler serverHandler;
//
//    @PostConstruct
//    public void init() {
//        serverHandler = this;
//    }
//
//    /**
//     * 客户端与服务端创建连接的时候调用
//     */
//    @Override
//    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//        System.out.println("CTX:" + ctx.channel());
//        System.out.println("客户端与服务端连接开始...");
//    }
//
//    /**
//     * 客户端与服务端断开连接时调用
//     */
//    @Override
//    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//        System.out.println("客户端与服务端连接关闭...");
//        NettyConfig.group.remove(ctx.channel());
//    }
//
//    /**
//     * 服务端接收客户端发送过来的数据结束之后调用
//     */
//    @Override
//    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//        ctx.flush();
//        System.out.println("信息接收完毕...");
//    }
//
//    /**
//     * 工程出现异常的时候调用
//     */
//    @Override
//    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//        cause.printStackTrace();
//        ctx.close();
//    }
//
//    /**
//     * 服务端处理客户端websocket请求的核心方法,这里接收了客户端发来的信息
//     */
//    @Override
//    public void channelRead(ChannelHandlerContext channelHandlerContext, Object info) throws Exception {
//        System.out.println("接收到了:" + info);
//        ByteBuf buf = (ByteBuf) info;
//        byte[] req = new byte[buf.readableBytes()];
//        buf.readBytes(req);
//        String body = new String(req, "UTF-8");
//        String content = body;
//        System.out.println("接收客户端数据:" + body);
//
//    }
//
//
//}
src/main/java/org/springblade/modules/runner/MyRunner.java
@@ -1,24 +1,24 @@
package org.springblade.modules.runner;
import okhttp3.WebSocket;
import org.springblade.common.config.CommonConfig;
import org.springblade.common.config.FtpConfig;
import org.springblade.modules.webscoket.WebSocketServer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
/**
 * 自定义启动(项目启动即启动)
 * @author zhongrj
 * @since 2022-01-06
 */
@Component
public class MyRunner implements CommandLineRunner {
    @Override
    public void run(String... args) throws Exception {
        System.out.println("websocketServer 开始启动!");
        //启动即创建webSocketServer
        WebSocketServer socketServer = new WebSocketServer(CommonConfig.socketPort);
    }
}
//package org.springblade.modules.runner;
//
//import okhttp3.WebSocket;
//import org.springblade.common.config.CommonConfig;
//import org.springblade.common.config.FtpConfig;
//import org.springblade.modules.webscoket.WebSocketServer;
//import org.springframework.boot.CommandLineRunner;
//import org.springframework.stereotype.Component;
//
///**
// * 自定义启动(项目启动即启动)
// * @author zhongrj
// * @since 2022-01-06
// */
//@Component
//public class MyRunner implements CommandLineRunner {
//
//    @Override
//    public void run(String... args) throws Exception {
//        System.out.println("websocketServer 开始启动!");
//        //启动即创建webSocketServer
//        WebSocketServer socketServer = new WebSocketServer(CommonConfig.socketPort);
//    }
//}
src/main/java/org/springblade/modules/webscoket/ChannelSupervise.java
@@ -1,38 +1,38 @@
package org.springblade.modules.webscoket;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
public class ChannelSupervise {
    private   static ChannelGroup GlobalGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
    private  static ConcurrentMap<String, ChannelId> ChannelMap=new ConcurrentHashMap();
    private  static Map<String, String> map = new HashMap<String, String>();;
    public  static void addChannel(Channel channel, String name){
        GlobalGroup.add(channel);
        ChannelMap.put(channel.id().asShortText(),channel.id());
        map.put(channel.id().asShortText(),name);
    }
    public static void removeChannel(Channel channel){
        GlobalGroup.remove(channel);
        ChannelMap.remove(channel.id().asShortText());
        map.remove(channel.id().asShortText());
    }
    public static Channel findChannel(String id){
        return GlobalGroup.find(ChannelMap.get(id));
    }
    public static String findName(String id){
    return map.get(id);
    }
    public static void send2All(TextWebSocketFrame tws){
        GlobalGroup.writeAndFlush(tws);
    }
}
//package org.springblade.modules.webscoket;
//import io.netty.channel.Channel;
//import io.netty.channel.ChannelId;
//import io.netty.channel.group.ChannelGroup;
//import io.netty.channel.group.DefaultChannelGroup;
//import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
//import io.netty.util.concurrent.GlobalEventExecutor;
//
//import java.util.HashMap;
//import java.util.Map;
//import java.util.concurrent.ConcurrentHashMap;
//import java.util.concurrent.ConcurrentMap;
//
//public class ChannelSupervise {
//    private   static ChannelGroup GlobalGroup=new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
//    private  static ConcurrentMap<String, ChannelId> ChannelMap=new ConcurrentHashMap();
//    private  static Map<String, String> map = new HashMap<String, String>();;
//    public  static void addChannel(Channel channel, String name){
//        GlobalGroup.add(channel);
//        ChannelMap.put(channel.id().asShortText(),channel.id());
//        map.put(channel.id().asShortText(),name);
//    }
//    public static void removeChannel(Channel channel){
//        GlobalGroup.remove(channel);
//        ChannelMap.remove(channel.id().asShortText());
//        map.remove(channel.id().asShortText());
//    }
//    public static Channel findChannel(String id){
//        return GlobalGroup.find(ChannelMap.get(id));
//    }
//    public static String findName(String id){
//    return map.get(id);
//    }
//    public static void send2All(TextWebSocketFrame tws){
//        GlobalGroup.writeAndFlush(tws);
//    }
//
//}
src/main/java/org/springblade/modules/webscoket/WebSocketHandler.java
@@ -1,193 +1,193 @@
package org.springblade.modules.webscoket;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.*;
import io.netty.util.AttributeKey;
import io.netty.util.CharsetUtil;
import org.springblade.modules.nettyServer.NettyConfig;
import org.springblade.modules.system.service.IUserService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@Component
public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
    private WebSocketServerHandshaker handshaker;
    private  String on=null;
    @Autowired
    private IUserService userService;
    private static WebSocketHandler webSocketHandler;
    @PostConstruct
    public void init() {
        webSocketHandler = this;
    }
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            //以http请求形式接入,但是走的是websocket
            handleHttpRequest(ctx, (FullHttpRequest) msg);
        } else if (msg instanceof WebSocketFrame) {
            //处理websocket客户端的消息
            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }
    /**
     * 客户端加入连接
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        //添加连接
        System.out.println("客户端加入连接:" + ctx.channel());
        //ChannelSupervise.addChannel(ctx.channel());
    }
    /**
     * 客户端离开连接
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //断开连接
        System.out.println("客户端断开连接:" + ctx.channel());
        //用户离线状态
        String name = ChannelSupervise.findName(ctx.channel().id().asShortText());
        if ( name != null &&!name.equals("ping") ){
            String num="0";
            //工作状态(0闲置,1工作中)
            String workSt = "0";
//            webSocketHandler.userService.updateUser(num,name,workSt);
            //ChannelSupervise.removeChannel(ctx.channel());
            NettyConfig.getChannelGroup().remove(ctx.channel());
            removeUserId(ctx);
        }
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws NumberFormatException, Exception {
        System.out.println("ctx = " + ctx);
        System.out.println("frame = " + frame);
        // 判断是否关闭链路的指令
        if (frame instanceof CloseWebSocketFrame) {
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判断是否ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(
                new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 本例程仅支持文本消息,不支持二进制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            System.out.println("本例程仅支持文本消息,不支持二进制消息");
            throw new UnsupportedOperationException(String.format(
                "%s frame types not supported", frame.getClass().getName()));
        }
        // 返回应答消息
        String request = ((TextWebSocketFrame) frame).text();
        if (!request.equals("ping")){
            NettyConfig.getUserChannelMap().put(request,ctx.channel());
            //将用户id作为自定义属性加入到channel 中,方便随时channel中获取用户id
            AttributeKey<String> key = AttributeKey.valueOf("userId");
            ctx.channel().attr(key).setIfAbsent(request);
            //把用户信息添加到通道里
            ChannelSupervise.addChannel(ctx.channel(),request);
            //用户在线状态
            this.on=request;
            //在线状态(0掉线,1在线)
            String num="1";
            //工作状态(0闲置,1工作中)
            String workSt = "0";
//            webSocketHandler.userService.updateUser(num,request,workSt);
        }
    }
    /**
     * 唯一的一次http请求,用于创建websocket
     *
     * @throws InterruptedException
     */
    private void handleHttpRequest(final ChannelHandlerContext ctx,
                                   FullHttpRequest req) throws InterruptedException {
        //要求Upgrade为websocket,过滤掉get/Post
        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        //握手
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
            "ws://localhost:9034/websocket", null, false);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory
                .sendUnsupportedVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }
    }
    /**
     * 拒绝不合法的请求,并返回错误信息
     */
    private static void sendHttpResponse(ChannelHandlerContext ctx,
                                         FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回应答给客户端
        if (res.status().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
                CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        // 如果是非Keep-Alive,关闭连接
//        if (!isKeepAlive(req) || res.status().code() != 200) {
//            f.addListener(ChannelFutureListener.CLOSE);
//package org.springblade.modules.webscoket;
//
//import io.netty.buffer.ByteBuf;
//import io.netty.buffer.Unpooled;
//import io.netty.channel.ChannelFuture;
//import io.netty.channel.ChannelHandlerContext;
//import io.netty.channel.SimpleChannelInboundHandler;
//import io.netty.handler.codec.http.DefaultFullHttpResponse;
//import io.netty.handler.codec.http.FullHttpRequest;
//import io.netty.handler.codec.http.HttpResponseStatus;
//import io.netty.handler.codec.http.HttpVersion;
//import io.netty.handler.codec.http.websocketx.*;
//import io.netty.util.AttributeKey;
//import io.netty.util.CharsetUtil;
//import org.springblade.modules.nettyServer.NettyConfig;
//import org.springblade.modules.system.service.IUserService;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.stereotype.Component;
//
//import javax.annotation.PostConstruct;
//
//@Component
//public class WebSocketHandler extends SimpleChannelInboundHandler<Object> {
//
//    private WebSocketServerHandshaker handshaker;
//
//    private  String on=null;
//
//    @Autowired
//    private IUserService userService;
//
//    private static WebSocketHandler webSocketHandler;
//
//    @PostConstruct
//    public void init() {
//        webSocketHandler = this;
//    }
//
//    @Override
//    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
//        if (msg instanceof FullHttpRequest) {
//            //以http请求形式接入,但是走的是websocket
//            handleHttpRequest(ctx, (FullHttpRequest) msg);
//        } else if (msg instanceof WebSocketFrame) {
//            //处理websocket客户端的消息
//            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
//        }
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
    /**
     * 删除用户与channel 对应关系
     * @param ctx
     */
    private void removeUserId(ChannelHandlerContext ctx){
        AttributeKey<String> key = AttributeKey.valueOf("userId");
        String userId = ctx.channel().attr(key).get();
        NettyConfig.getUserChannelMap().remove(userId);
    }
}
//    }
//
//    /**
//     * 客户端加入连接
//     * @param ctx
//     * @throws Exception
//     */
//    @Override
//    public void channelActive(ChannelHandlerContext ctx) throws Exception {
//        //添加连接
//        System.out.println("客户端加入连接:" + ctx.channel());
//        //ChannelSupervise.addChannel(ctx.channel());
//    }
//
//    /**
//     * 客户端离开连接
//     * @param ctx
//     * @throws Exception
//     */
//    @Override
//    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//        //断开连接
//        System.out.println("客户端断开连接:" + ctx.channel());
//        //用户离线状态
//        String name = ChannelSupervise.findName(ctx.channel().id().asShortText());
//        if ( name != null &&!name.equals("ping") ){
//            String num="0";
//            //工作状态(0闲置,1工作中)
//            String workSt = "0";
////            webSocketHandler.userService.updateUser(num,name,workSt);
//            //ChannelSupervise.removeChannel(ctx.channel());
//
//            NettyConfig.getChannelGroup().remove(ctx.channel());
//            removeUserId(ctx);
//        }
//    }
//
//    @Override
//    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//        ctx.flush();
//    }
//
//    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws NumberFormatException, Exception {
//        System.out.println("ctx = " + ctx);
//        System.out.println("frame = " + frame);
//        // 判断是否关闭链路的指令
//        if (frame instanceof CloseWebSocketFrame) {
//            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
//            return;
//        }
//        // 判断是否ping消息
//        if (frame instanceof PingWebSocketFrame) {
//            ctx.channel().write(
//                new PongWebSocketFrame(frame.content().retain()));
//            return;
//        }
//        // 本例程仅支持文本消息,不支持二进制消息
//        if (!(frame instanceof TextWebSocketFrame)) {
//            System.out.println("本例程仅支持文本消息,不支持二进制消息");
//            throw new UnsupportedOperationException(String.format(
//                "%s frame types not supported", frame.getClass().getName()));
//        }
//        // 返回应答消息
//        String request = ((TextWebSocketFrame) frame).text();
//
//        if (!request.equals("ping")){
//
//            NettyConfig.getUserChannelMap().put(request,ctx.channel());
//
//            //将用户id作为自定义属性加入到channel 中,方便随时channel中获取用户id
//            AttributeKey<String> key = AttributeKey.valueOf("userId");
//            ctx.channel().attr(key).setIfAbsent(request);
//
//            //把用户信息添加到通道里
//            ChannelSupervise.addChannel(ctx.channel(),request);
//            //用户在线状态
//            this.on=request;
//            //在线状态(0掉线,1在线)
//            String num="1";
//            //工作状态(0闲置,1工作中)
//            String workSt = "0";
////            webSocketHandler.userService.updateUser(num,request,workSt);
//        }
//
//    }
//
//    /**
//     * 唯一的一次http请求,用于创建websocket
//     *
//     * @throws InterruptedException
//     */
//    private void handleHttpRequest(final ChannelHandlerContext ctx,
//                                   FullHttpRequest req) throws InterruptedException {
//        //要求Upgrade为websocket,过滤掉get/Post
//        if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
//            //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端
//            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
//            return;
//        }
//        //握手
//        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(
//            "ws://localhost:9034/websocket", null, false);
//        handshaker = wsFactory.newHandshaker(req);
//        if (handshaker == null) {
//            WebSocketServerHandshakerFactory
//                .sendUnsupportedVersionResponse(ctx.channel());
//        } else {
//            handshaker.handshake(ctx.channel(), req);
//        }
//    }
//
//    /**
//     * 拒绝不合法的请求,并返回错误信息
//     */
//    private static void sendHttpResponse(ChannelHandlerContext ctx,
//                                         FullHttpRequest req, DefaultFullHttpResponse res) {
//        // 返回应答给客户端
//        if (res.status().code() != 200) {
//            ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(),
//                CharsetUtil.UTF_8);
//            res.content().writeBytes(buf);
//            buf.release();
//        }
//        ChannelFuture f = ctx.channel().writeAndFlush(res);
//        // 如果是非Keep-Alive,关闭连接
////        if (!isKeepAlive(req) || res.status().code() != 200) {
////            f.addListener(ChannelFutureListener.CLOSE);
////        }
//    }
//    @Override
//    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//        cause.printStackTrace();
//        ctx.close();
//    }
//
//    /**
//     * 删除用户与channel 对应关系
//     * @param ctx
//     */
//    private void removeUserId(ChannelHandlerContext ctx){
//        AttributeKey<String> key = AttributeKey.valueOf("userId");
//        String userId = ctx.channel().attr(key).get();
//        NettyConfig.getUserChannelMap().remove(userId);
//    }
//
//}
src/main/java/org/springblade/modules/webscoket/WebSocketServer.java
@@ -1,59 +1,59 @@
package org.springblade.modules.webscoket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
public class WebSocketServer {
    private int port = 9034;
    public WebSocketServer(int port) {
        bind(port);
    }
    public void bind(int port) {
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                EventLoopGroup bossGroup = new NioEventLoopGroup();
                EventLoopGroup workerGroup = new NioEventLoopGroup();
                try {
                    ServerBootstrap serverBootstrap = new ServerBootstrap();
                    serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        //保持连接
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) {
//                                ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));//设置log监听器,并且日志级别为debug,方便观察运行流程
                                ch.pipeline().addLast("http-codec", new HttpServerCodec());//设置解码器
                                ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));//聚合器,使用websocket会用到
                                ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());//用于大数据的分区传输
                                ch.pipeline().addLast("handler", new WebSocketHandler());//自定义的业务handler
                            }
                        });
                    ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                    System.out.println("WebSocketServer启动成功");
                    channelFuture.channel().closeFuture().sync();
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    bossGroup.shutdownGracefully();
                    workerGroup.shutdownGracefully();
                }
            }
        });
        thread.start();
    }
}
//package org.springblade.modules.webscoket;
//import io.netty.bootstrap.ServerBootstrap;
//import io.netty.channel.ChannelFuture;
//import io.netty.channel.ChannelInitializer;
//import io.netty.channel.ChannelOption;
//import io.netty.channel.EventLoopGroup;
//import io.netty.channel.nio.NioEventLoopGroup;
//import io.netty.channel.socket.SocketChannel;
//import io.netty.channel.socket.nio.NioServerSocketChannel;
//import io.netty.handler.codec.http.HttpObjectAggregator;
//import io.netty.handler.codec.http.HttpServerCodec;
//import io.netty.handler.logging.LogLevel;
//import io.netty.handler.logging.LoggingHandler;
//import io.netty.handler.stream.ChunkedWriteHandler;
//
//public class WebSocketServer {
//    private int port = 9034;
//
//    public WebSocketServer(int port) {
//        bind(port);
//    }
//
//    public void bind(int port) {
//        Thread thread = new Thread(new Runnable() {
//            @Override
//            public void run() {
//                EventLoopGroup bossGroup = new NioEventLoopGroup();
//                EventLoopGroup workerGroup = new NioEventLoopGroup();
//                try {
//                    ServerBootstrap serverBootstrap = new ServerBootstrap();
//                    serverBootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
//                        .handler(new LoggingHandler(LogLevel.INFO))
//                        //保持连接
//                        .childOption(ChannelOption.SO_KEEPALIVE, true)
//                        .childHandler(new ChannelInitializer<SocketChannel>() {
//                            @Override
//                            protected void initChannel(SocketChannel ch) {
////                                ch.pipeline().addLast("logging",new LoggingHandler("DEBUG"));//设置log监听器,并且日志级别为debug,方便观察运行流程
//                                ch.pipeline().addLast("http-codec", new HttpServerCodec());//设置解码器
//                                ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));//聚合器,使用websocket会用到
//                                ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());//用于大数据的分区传输
//                                ch.pipeline().addLast("handler", new WebSocketHandler());//自定义的业务handler
//                            }
//                        });
//
//                    ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
//                    System.out.println("WebSocketServer启动成功");
//                    channelFuture.channel().closeFuture().sync();
//                } catch (Exception e) {
//                    e.printStackTrace();
//                } finally {
//                    bossGroup.shutdownGracefully();
//                    workerGroup.shutdownGracefully();
//                }
//            }
//        });
//        thread.start();
//    }
//}
src/main/java/org/springblade/modules/webscoket/controller/PushMsgController.java
@@ -1,30 +1,30 @@
package org.springblade.modules.webscoket.controller;
import org.springblade.modules.webscoket.service.IPushMsgService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RestController;
/**
 * @author lq
 * @date 2020/4/1 11:22
 */
@RestController
public class PushMsgController {
    @Autowired
    private IPushMsgService pushMsgService;
    @PostMapping("/pushUser")
    public String pushUser(String userId,String msg){
        pushMsgService.pushMsg(userId, msg);
        return "消息发送成功:"+msg;
    }
    @PostMapping("/pushAll")
    public String pushAll(String msg){
        pushMsgService.pushMsg(msg);
        return "消息发送成功:"+msg;
    }
}
//package org.springblade.modules.webscoket.controller;
//
//import org.springblade.modules.webscoket.service.IPushMsgService;
//import org.springframework.beans.factory.annotation.Autowired;
//import org.springframework.web.bind.annotation.PostMapping;
//import org.springframework.web.bind.annotation.RestController;
//
///**
// * @author lq
// * @date 2020/4/1 11:22
// */
//@RestController
//public class PushMsgController {
//
//    @Autowired
//    private IPushMsgService pushMsgService;
//
//    @PostMapping("/pushUser")
//    public String pushUser(String userId,String msg){
//        pushMsgService.pushMsg(userId, msg);
//        return "消息发送成功:"+msg;
//    }
//
//    @PostMapping("/pushAll")
//    public String pushAll(String msg){
//        pushMsgService.pushMsg(msg);
//        return "消息发送成功:"+msg;
//    }
//
//}
src/main/java/org/springblade/modules/webscoket/index.html
@@ -1,57 +1,57 @@
<html>
<head>
    <meta http-equiv="Content-Type" content="text/html; charset = utf-8"/>
    <title>WebSocket客户端</title>
    <script type="text/javascript">
        var socket;
        if(!window.WebSocket){
            window.WebSocket = window.MozWebSocket;
        }
<!--<html>-->
<!--<head>-->
<!--    <meta http-equiv="Content-Type" content="text/html; charset = utf-8"/>-->
<!--    <title>WebSocket客户端</title>-->
<!--    <script type="text/javascript">-->
<!--        var socket;-->
<!--        if(!window.WebSocket){-->
<!--            window.WebSocket = window.MozWebSocket;-->
<!--        }-->
        if(window.WebSocket){
            socket = new WebSocket("ws://localhost:9034/websocket");
            socket.onmessage = function(event){
                var ta = document.getElementById('responseContent');
                ta.value += event.data + "\r\n";
            };
<!--        if(window.WebSocket){-->
<!--            socket = new WebSocket("ws://localhost:9034/websocket");-->
<!--            socket.onmessage = function(event){-->
<!--                var ta = document.getElementById('responseContent');-->
<!--                ta.value += event.data + "\r\n";-->
<!--            };-->
            socket.onopen = function(event){
                var ta = document.getElementById('responseContent');
                ta.value = "你当前的浏览器支持WebSocket,请进行后续操作\r\n";
            };
<!--            socket.onopen = function(event){-->
<!--                var ta = document.getElementById('responseContent');-->
<!--                ta.value = "你当前的浏览器支持WebSocket,请进行后续操作\r\n";-->
<!--            };-->
            socket.onclose = function(event){
                var ta = document.getElementById('responseContent');
                ta.value = "";
<!--            socket.onclose = function(event){-->
<!--                var ta = document.getElementById('responseContent');-->
<!--                ta.value = "";-->
                ta.value = "WebSocket连接已经关闭\r\n";
            };
        }else{
            alert("您的浏览器不支持WebSocket");
        }
<!--                ta.value = "WebSocket连接已经关闭\r\n";-->
<!--            };-->
<!--        }else{-->
<!--            alert("您的浏览器不支持WebSocket");-->
<!--        }-->
        function send(message){
            if(!window.WebSocket){
                return;
            }
            if(socket.readyState == WebSocket.OPEN){
                socket.send(message);
            }else{
                alert("WebSocket连接没有建立成功!!");
            }
        }
    </script>
</head>
<body>
<form onSubmit="return false;">
    <input type = "text" name = "message" value = ""/>
    <br/><br/>
    <input type = "button" value = "发送WebSocket请求消息" onClick = "send(this.form.message.value)"/>
    <hr color="red"/>
    <h2>客户端接收到服务端返回的应答消息</h2>
    <textarea id = "responseContent" style = "width:1024px; height:300px"></textarea>
</form>
</body>
</html>
<!--        function send(message){-->
<!--            if(!window.WebSocket){-->
<!--                return;-->
<!--            }-->
<!--            if(socket.readyState == WebSocket.OPEN){-->
<!--                socket.send(message);-->
<!--            }else{-->
<!--                alert("WebSocket连接没有建立成功!!");-->
<!--            }-->
<!--        }-->
<!--    </script>-->
<!--</head>-->
<!--<body>-->
<!--<form onSubmit="return false;">-->
<!--    <input type = "text" name = "message" value = ""/>-->
<!--    <br/><br/>-->
<!--    <input type = "button" value = "发送WebSocket请求消息" onClick = "send(this.form.message.value)"/>-->
<!--    <hr color="red"/>-->
<!--    <h2>客户端接收到服务端返回的应答消息</h2>-->
<!--    <textarea id = "responseContent" style = "width:1024px; height:300px"></textarea>-->
<!--</form>-->
<!--</body>-->
<!--</html>-->
src/main/java/org/springblade/modules/webscoket/service/IPushMsgService.java
@@ -1,21 +1,21 @@
package org.springblade.modules.webscoket.service;
/**
 * @author 123456
 */
public interface IPushMsgService {
    /**
     * 给指定用户发送消息
     * @param userId
     * @param msg
     */
    void pushMsg(String userId,String msg);
    /**
     * 给所有用户发送消息
     * @param msg
     */
    void pushMsg(String msg);
}
//package org.springblade.modules.webscoket.service;
//
///**
// * @author 123456
// */
//public interface IPushMsgService {
//
//    /**
//     * 给指定用户发送消息
//     * @param userId
//     * @param msg
//     */
//    void pushMsg(String userId,String msg);
//
//    /**
//     * 给所有用户发送消息
//     * @param msg
//     */
//    void pushMsg(String msg);
//
//}
src/main/java/org/springblade/modules/webscoket/service/impl/PushMsgServiceImpl.java
@@ -1,32 +1,32 @@
package org.springblade.modules.webscoket.service.impl;
import io.netty.channel.Channel;
import io.netty.channel.group.ChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import org.springblade.modules.nettyServer.NettyConfig;
import org.springblade.modules.webscoket.service.IPushMsgService;
import org.springframework.stereotype.Service;
/**
 * @author lq
 * @date 2020/4/1 11:20
 */
@Service
public class PushMsgServiceImpl implements IPushMsgService {
    @Override
    public void pushMsg(String userId, String msg) {
        Channel channel = NettyConfig.getUserChannelMap().get(userId);
        if (channel != null){
            channel.writeAndFlush(new TextWebSocketFrame(msg));
        }
    }
    @Override
    public void pushMsg(String msg) {
        ChannelGroup group = NettyConfig.getChannelGroup();
        String name = group.name();
        System.out.println("空间大小:"+group.size()+",名字:"+name);
        group.writeAndFlush(new TextWebSocketFrame(msg));
    }
}
//package org.springblade.modules.webscoket.service.impl;
//
//import io.netty.channel.Channel;
//import io.netty.channel.group.ChannelGroup;
//import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
//import org.springblade.modules.nettyServer.NettyConfig;
//import org.springblade.modules.webscoket.service.IPushMsgService;
//import org.springframework.stereotype.Service;
//
///**
// * @author lq
// * @date 2020/4/1 11:20
// */
//@Service
//public class PushMsgServiceImpl implements IPushMsgService {
//    @Override
//    public void pushMsg(String userId, String msg) {
//        Channel channel = NettyConfig.getUserChannelMap().get(userId);
//        if (channel != null){
//            channel.writeAndFlush(new TextWebSocketFrame(msg));
//        }
//
//    }
//
//    @Override
//    public void pushMsg(String msg) {
//        ChannelGroup group = NettyConfig.getChannelGroup();
//        String name = group.name();
//        System.out.println("空间大小:"+group.size()+",名字:"+name);
//        group.writeAndFlush(new TextWebSocketFrame(msg));
//    }
//}