package org.springblade.modules.webscoket; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.DefaultFullHttpResponse; import io.netty.handler.codec.http.FullHttpRequest; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; import io.netty.handler.codec.http.websocketx.*; import io.netty.util.AttributeKey; import io.netty.util.CharsetUtil; import org.springblade.modules.nettyServer.NettyConfig; import org.springblade.modules.system.service.IUserService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; @Component public class WebSocketHandler extends SimpleChannelInboundHandler { private WebSocketServerHandshaker handshaker; private String on=null; @Autowired private IUserService userService; private static WebSocketHandler webSocketHandler; @PostConstruct public void init() { webSocketHandler = this; } @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { //以http请求形式接入,但是走的是websocket handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { //处理websocket客户端的消息 handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } } /** * 客户端加入连接 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //添加连接 System.out.println("客户端加入连接:" + ctx.channel()); //ChannelSupervise.addChannel(ctx.channel()); } /** * 客户端离开连接 * @param ctx * @throws Exception */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { //断开连接 System.out.println("客户端断开连接:" + ctx.channel()); //用户离线状态 String name = ChannelSupervise.findName(ctx.channel().id().asShortText()); if ( name != null &&!name.equals("ping") ){ String num="0"; //工作状态(0闲置,1工作中) String workSt = "0"; // webSocketHandler.userService.updateUser(num,name,workSt); //ChannelSupervise.removeChannel(ctx.channel()); NettyConfig.getChannelGroup().remove(ctx.channel()); removeUserId(ctx); } } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws NumberFormatException, Exception { System.out.println("ctx = " + ctx); System.out.println("frame = " + frame); // 判断是否关闭链路的指令 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // 判断是否ping消息 if (frame instanceof PingWebSocketFrame) { ctx.channel().write( new PongWebSocketFrame(frame.content().retain())); return; } // 本例程仅支持文本消息,不支持二进制消息 if (!(frame instanceof TextWebSocketFrame)) { System.out.println("本例程仅支持文本消息,不支持二进制消息"); throw new UnsupportedOperationException(String.format( "%s frame types not supported", frame.getClass().getName())); } // 返回应答消息 String request = ((TextWebSocketFrame) frame).text(); if (!request.equals("ping")){ NettyConfig.getUserChannelMap().put(request,ctx.channel()); //将用户id作为自定义属性加入到channel 中,方便随时channel中获取用户id AttributeKey key = AttributeKey.valueOf("userId"); ctx.channel().attr(key).setIfAbsent(request); //把用户信息添加到通道里 ChannelSupervise.addChannel(ctx.channel(),request); //用户在线状态 this.on=request; //在线状态(0掉线,1在线) String num="1"; //工作状态(0闲置,1工作中) String workSt = "0"; // webSocketHandler.userService.updateUser(num,request,workSt); } } /** * 唯一的一次http请求,用于创建websocket * * @throws InterruptedException */ private void handleHttpRequest(final ChannelHandlerContext ctx, FullHttpRequest req) throws InterruptedException { //要求Upgrade为websocket,过滤掉get/Post if (!req.decoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) { //若不是websocket方式,则创建BAD_REQUEST的req,返回给客户端 sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST)); return; } //握手 WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( "ws://localhost:9034/websocket", null, false); handshaker = wsFactory.newHandshaker(req); if (handshaker == null) { WebSocketServerHandshakerFactory .sendUnsupportedVersionResponse(ctx.channel()); } else { handshaker.handshake(ctx.channel(), req); } } /** * 拒绝不合法的请求,并返回错误信息 */ private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) { // 返回应答给客户端 if (res.status().code() != 200) { ByteBuf buf = Unpooled.copiedBuffer(res.status().toString(), CharsetUtil.UTF_8); res.content().writeBytes(buf); buf.release(); } ChannelFuture f = ctx.channel().writeAndFlush(res); // 如果是非Keep-Alive,关闭连接 // if (!isKeepAlive(req) || res.status().code() != 200) { // f.addListener(ChannelFutureListener.CLOSE); // } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } /** * 删除用户与channel 对应关系 * @param ctx */ private void removeUserId(ChannelHandlerContext ctx){ AttributeKey key = AttributeKey.valueOf("userId"); String userId = ctx.channel().attr(key).get(); NettyConfig.getUserChannelMap().remove(userId); } }