分享

Netty进阶

 贪挽懒月 2022-06-20 发布于广东

一、Netty核心模块

  • BootStrap:客户端程序的启动引导类
  • ServerBootStrap:服务端程序的启动引导类

它们的常用方法有:

- group:设置线程组
- channel:指定通道的实现类
- option:给channel添加配置
- childOption:给接收到的channel添加配置
- handler:设置bossGroup的handler
- childHandler:设置workGroup的handler
  • Selector:用来接收注册的channel的。当往selector中注册channel后,selector就会自动不断地轮询这些channel是否有就绪的IO事件。
  • ChannelHandler:是一个接口,处理IO事件或者拦截IO事件,也就是说你拿到channel后想干的事情都通过channelHandler来完成。
  • ChannelPipeline:是一个handler的集合,负责处理和拦截入站事件和出站操作。一个channel包含了一个ChannelPipeline,而ChannelPipeline又维护了一个由ChannelHandlerContext组成的双向链表,并且每个ChannelHandlerContext中又关联着一个ChannelHandler。入站事件就是从链表头传递到链表尾的handler,出站事件就是从链表尾传到链表头的handler。ChannelPipeline的常用方法:
- addFirst:把handler添加到链表第一个位置
- addLast:把handler添加到链表的最后一个位置
  • ChannelHandlerContext:保存channel相关的上下文信息,同时关联了Channelhandler对象,也绑定了对应的pipeline和channel信息,方便对channelhandler进行调用。

二、用Netty实现聊天室功能

之前说过用NIO实现聊天室,现在来看看用netty如何实现聊天室。这里我将新建两个maven项目,一个服务端,一个客户端,最后可以打成jar包,服务端jar包运行在你电脑上,客户端jar包自己跑一份,还可以发给你的同事,然后就可以愉快的聊天了。

1、服务端:

  • pom.xml:引入netty的依赖,还要配置一下打包插件,不然你运行的jar包就会报“找不到主清单文件”或者没把netty依赖打包上去。
<dependencies>
 <dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.50.Final</version>
 </dependency>
</dependencies>
<build>
 <plugins>
  <plugin>
   <artifactId>maven-assembly-plugin</artifactId>
   <configuration>
    <descriptorRefs>
     <descriptorRef>jar-with-dependencies</descriptorRef>
    </descriptorRefs>
    <archive>
     <manifest>
      <mainClass>com.zhusl.study.chatroom.NettyChatRoomClient</mainClass>
     </manifest>
    </archive>
   </configuration>
   <executions>
    <execution>
     <id>make-assembly</id>
     <phase>package</phase>
     <goals>
      <goal>single</goal>
     </goals>
    </execution>
   </executions>
  </plugin>
 </plugins>
</build>

客户端的pom.xml唯一的区别就是换成了客户端启动类。

  • NettyChatRoomServer:
public class NettyChatRoomServer {
 
 public void run () throws Exception {
  // 创建两个线程组
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workGroup = new NioEventLoopGroup();
  try {
   // 配置参数
   ServerBootstrap bootstrap = new ServerBootstrap();
   bootstrap.group(bossGroup, workGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 128)
            .childOption(ChannelOption.SO_KEEPALIVE, true)
            .childHandler(new NettyChatRoomServerInitializer());
   // 监听端口
   ChannelFuture cf = bootstrap.bind(6666).sync(); 
   cf.channel().closeFuture().sync();
  } finally {
   bossGroup.shutdownGracefully();
            workGroup.shutdownGracefully();
  }
 }
 
 public static void main(String[] args) throws Exception {
  NettyChatRoomServer ncrs = new NettyChatRoomServer();
  ncrs.run();
 }
}
  • NettyChatRoomServerInitializer:
public class NettyChatRoomServerInitializer extends ChannelInitializer<SocketChannel>{

 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
  ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("decode",new StringDecoder());//解码器
        pipeline.addLast("encode",new StringEncoder());//编码器
        pipeline.addLast("handler",new NettyChatRoomServerHandler());
  
 }
}
  • NettyChatRoomServerHandler:
public class NettyChatRoomServerHandler extends SimpleChannelInboundHandler<String>{

 private  static ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
 
 /**
  * 当有channel加入时执行该方法(即当有客户端连接时)
  */
 @Override
 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
  System.out.println("【" + ip + "】" + "进入聊天室");
  for (Channel channel : channelGroup) {
   // 给别的客户端提醒:xxx加入群聊
   if (ctx.channel() != channel) {
    channel.writeAndFlush("【" + ip + "】" + "进入聊天室");
   }
  }
  // 将当前channel加入到channelGroup中
  channelGroup.add(ctx.channel());
 }
 
 /**
  * 当有channel删除时执行该方法(即客户端断开连接)
  */
 @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
  System.out.println("【" + ip + "】" + "离开聊天室");
  for (Channel channel : channelGroup) {
   // 给别的客户端提醒:xxx加入群聊
   if (ctx.channel() != channel) {
    channel.writeAndFlush("【" + ip + "】" + "离开聊天室");
   }
  }
  // 这里不需要channelGroup.remove,会自动remove
 }
 
 /**
  * 当有数据读取时执行该方法
  */
 @Override
 protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  String ip = ctx.channel().remoteAddress().toString().substring(9, 13);
  System.out.println("【" + ip + "】" + ":" + msg);
  for (Channel channel : channelGroup) {
   // 将消息转发给别的客户端
   if (ctx.channel() != channel) {
    channel.writeAndFlush("【" + ip + "】"  + ":" + msg);
   } else {
    channel.writeAndFlush("【我】:" + msg);
   }
  }
 }
 
 /**
  * 异常处理
  */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

2、客户端:

  • NettyChatRoomClient:
public class NettyChatRoomClient {

 @SuppressWarnings("resource")
 public void run() throws Exception {
  EventLoopGroup group = new NioEventLoopGroup();
  try {
   Bootstrap bootstrap = new Bootstrap();
   bootstrap.group(group)
               .channel(NioSocketChannel.class)
               .handler(new NettyChatRoomClientInitializer());
   // 连接服务器
   ChannelFuture channelFuture = bootstrap.connect("192.168.2.36", 7777).sync();
            Channel channel = channelFuture.channel();
            Scanner scanner = new Scanner(System.in);
            while (scanner.hasNextLine()) {
                String msg = scanner.nextLine();
                channel.writeAndFlush(msg);
            }
            channelFuture.channel().closeFuture().sync();
        } finally {
         group.shutdownGracefully();
        }
 }
 
 public static void main(String[] args) throws Exception {
  NettyChatRoomClient ncrc = new NettyChatRoomClient();
  ncrc.run();
 }
}
  • NettyChatRoomClientInitializer:
public class NettyChatRoomClientInitializer extends ChannelInitializer<SocketChannel>{

 @Override
 protected void initChannel(SocketChannel ch) throws Exception {
  ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast("decode",new StringDecoder());//解码器
        pipeline.addLast("encode",new StringEncoder());
        pipeline.addLast("handler",new NettyChatRoomClientHandler());
 }
}
  • NettyChatRoomClientHandler:
public class NettyChatRoomClientHandler extends SimpleChannelInboundHandler<String>{
 
 /**
  * 异常处理
  */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }

 @Override
 protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
  // 将从服务端接收到的消息打印出来
  System.out.println(msg);
 }
}

三、Netty心跳检测机制

客户端与服务端连接是否正常,需要有一个机制来检测,Netty提供了心跳检测机制。1、Netty心跳检测案例:

  • 服务器超过3秒没有读操作,提示读空闲
  • 服务器超过5秒没有写操作,提示写空闲
  • 服务器超过7秒没有读和写,提示读写空闲

客户端和以前一样,没有变换,主要是服务端加了日志handler以及childHandler重写了一个用于检测心跳的方法userEventTriggered,服务端代码如下:

  • HeartBeatServer:
public class HeartBeatServer {

 public static void main(String[] args) throws Exception {
  // 1. 创建boss group (boss group和work group含有的子线程数默认是cpu数 * 2)
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  // 2. 创建work group
  EventLoopGroup workGroup = new NioEventLoopGroup();
  try {
   // 3. 创建服务端启动对象
   ServerBootstrap bootstrap = new ServerBootstrap();
   // 4. 配置启动参数
   bootstrap.group(bossGroup, workGroup) // 设置两个线程组
     .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
     .handler(new LoggingHandler(LogLevel.INFO)) // 日志处理器
     .childHandler(new ChannelInitializer<SocketChannel>() { // 创建通道初始化对象
      @Override
      protected void initChannel(SocketChannel sc) throws Exception {
       sc.pipeline().addLast(new IdleStateHandler(3, 5, 7, TimeUnit.SECONDS));
       sc.pipeline().addLast(new HeartBeatServerHandler());
      }
     });
   // 5. 启动服务器并绑定端口
   ChannelFuture cf = bootstrap.bind(6666).sync();
   // 6. 对关闭通道进行监听
   cf.channel().closeFuture().sync();
  } finally {
   bossGroup.shutdownGracefully();
   workGroup.shutdownGracefully();
  }
 }
}
  • HeartBeatServerHandler:
public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter{
 @Override
 public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  if (evt instanceof IdleStateEvent) {
   IdleStateEvent event = (IdleStateEvent) evt;
   String info = null;
   switch (event.state()) {
   case READER_IDLE:
    info = "读空闲";
    break;
            case WRITER_IDLE:
             info = "写空闲";
             break;
            case ALL_IDLE:
             info = "读写空闲";
             break;
   }
   System.out.println(ctx.channel().remoteAddress() + ":" + info);
  }
 }
}

四、WebSocket长连接开发

1、http协议和websocket协议的区别:

  • http协议:无状态、短连接、被动型。即只能由客户端发起请求,服务端给客户端响应,当服务端响应完,本次请求的生命周期就结束了。客户端没办法主动感知服务端的变化,服务端也没办法主动推送数据给客户端。比如你请求秒杀接口,秒杀接口给你返回排队中,那到底什么时候排上号了呢?客户端就得不断地循环请求获取秒杀结果的接口。

  • websocket:是基于http协议开发的,握手过程和htpp一样。所谓长连接,就是服务端和客户端可以相互感知,浏览器关闭了服务端可以感知,服务端关闭了浏览器可以感知。比如还是秒杀,如果是用websocket长连接开发的接口,你请求秒杀返回排队中,然后你不用再循环请求获取订单状态的接口,服务端和客户端会保持一个长连接,服务端可以主动把订单状态推给客户端。

2、案例代码:

  • WebSocketServer:
public class WebSocketServer {
 public static void main(String[] args) throws Exception {
  EventLoopGroup bossGroup = new NioEventLoopGroup();
  EventLoopGroup workGroup = new NioEventLoopGroup();
  try {
   ServerBootstrap bootstrap = new ServerBootstrap();
   bootstrap.group(bossGroup, workGroup) // 设置两个线程组
     .channel(NioServerSocketChannel.class) // 使用NioSocketChannel 作为服务器的通道
     .handler(new LoggingHandler(LogLevel.INFO)) // 日志处理器
     .childHandler(new ChannelInitializer<SocketChannel>() { // 创建通道初始化对象
      @Override
      protected void initChannel(SocketChannel sc) throws Exception {
       sc.pipeline().addLast(new HttpServerCodec()); // 使用http的编码解码器
       sc.pipeline().addLast(new ChunkedWriteHandler()); // 是以块方式写,添加ChunkedWriteHandler处理器
       sc.pipeline().addLast(new HttpObjectAggregator(8192)); // http数据在传输的时候是分段的,用这个处理器就可聚集分段
       // 请求的url就是:ws://localhost:6666/hello
       sc.pipeline().addLast(new WebSocketServerProtocolHandler("/hello"));
       sc.pipeline().addLast(new WebSocketServerHandler());
      }
     });
   ChannelFuture cf = bootstrap.bind(80).sync();
   System.out.println("服务端准备好了");
   cf.channel().closeFuture().sync();
  } finally {
   bossGroup.shutdownGracefully();
   workGroup.shutdownGracefully();
  }
 }
}
  • WebSocketServerHandler:
public class WebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame>{

 @Override
 protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {
  System.out.println("服务器收到消息:" + msg.text());
  ctx.channel().writeAndFlush(new TextWebSocketFrame("服务器时间:" + LocalDateTime.now()) + ",msg:" + msg.text());
 }

 @Override
 public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
  System.out.println("handlerAdded被调用:" + ctx.channel().id().asLongText());
 }

 @Override
 public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
  System.out.println("handlerRemoved被调用:" + ctx.channel().id().asLongText());
 }

 @Override
 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  System.out.println("exceptionCaught被调用:" + ctx.channel().id().asLongText());
  ctx.close();
 }
}

然后编写一个页面,用来发送websocket请求:


<body>
  <script type="text/javascript">
     var socket;
     if(window.WebSocket) {
      socket = new WebSocket("ws://127.0.0.1/hello");
      // 接收服务器端返回的消息,显示在responseText中
      socket.onmessage = function(ev){
       var rt = document.getElementById("responseText");
       rt.value = rt.value + "\n" + ev.data;
      }
      // 相当于开启连接
      socket.onopen = function(ev){
       var rt = document.getElementById("responseText");
       rt.value = "连接开启了";
      }
      // 连接关闭
      socket.onclose = function(ev){
       var rt = document.getElementById("responseText");
       rt.value = rt.value + "\n" + "连接关闭了";
      }
     } else {
      alert("浏览器不支持websocket");
     }
     
     function send(message){
      if (!window.socket){
       return;
      }
      if (socket.readyState == WebSocket.OPEN){
       socket.send(message);
      } else {
       alert("连接没有开启");
      }
     }
  </script>

  <form onsubmit="return false">
     <textarea name="message" style="height:300px;width: 300px"></textarea>
     <input type="button" value="发送消息" onclick="send(this.form.message.value)">
     <textarea id="responseText" style="height:300px;width: 300px"></textarea>
  </form>
</body>

访问这个页面,服务端启动或者关闭会在框框中显示出来,同样,如果客户端关闭,服务端也会在控制台打印出来。

五、protobuf

1、编解码问题:

数据在网络中是以二进制字节码传输的,发送的数据需要编码,服务端收到后需要解码。Netty提供了StringDecoder、ObjectDecoder,底层采用的是java序列化技术,java序列化本身效率较低,而且无法跨语言,所以就有了protobuf。

2、protobuf简介:它是Google的开源项目,轻便高效的结构化数据存储格式,可用于数据的序列化,且支持跨平台跨语言,很适合做数据存储和RPC数据交换格式。

3、protobuf的使用:

  • 需求:客户端发送一个Student对象到服务器,服务器接收这个对象,并显示它的信息。

下面开始编码:



  • 下载protoc-3.6.1-win32.zip,然后解压。下载地址:https://github.com/protocolbuffers/protobuf/releases/tag/v3.6.1

  • 引入protobuf依赖:

<dependency>
 <groupId>com.google.protobuf</groupId>
 <artifactId>protobuf-java</artifactId>
 <version>3.6.1</version>
</dependency>
  • 编写Student.proto:
syntax = "proto3"; // 版本
option java_outer_classname = "StudentPOJO"; // 外部类名
message Student { // 内部类名
   int32 id = 1; // 1不是值,而是序号
   string name = 2;
}
  • 将这个类Student.proto复制到刚解压出来的bin目录下,也就是和protoc.exe要同一个目录;
  • 在此目录打开cmd,执行如下命令:
protoc.exe --java_out=. Student.proto

执行完后会在protoc.exe所在目录生成一个StudentPOJO.java文件,这就是我们要的文件,复制到项目中。

  • 客户端的修改:NettyClientHandler中的channelActive改成这样:
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
 System.out.println("client:" + ctx);
 // 发送student对象到服务端
 StudentPOJO.Student student = StudentPOJO.Student.newBuilder().setId(666).setName("张三").build();
 ctx.writeAndFlush(student);
}

NettyClient中添加protobuf的编码器:

@Override
protected void initChannel(SocketChannel sc) throws Exception {
 // 加入protobuf的编码器
 sc.pipeline().addLast("encoder", new ProtobufEncoder());
 sc.pipeline().addLast(new NettyClientHandler());
}
  • 服务端的修改:NettyServer中添加解码器:
@Override
protected void initChannel(SocketChannel sc) throws Exception {
 // 加入解码器,指定解码对象
 sc.pipeline().addLast("decoder", new ProtobufDecoder(StudentPOJO.Student.getDefaultInstance()));
 // 传入自定义的handler
 sc.pipeline().addLast(new NettyServerHandler());
 // 在这里,可以将SocketChannel sc保存到集合中,别的线程拿到集合就可以调用channel的方法了

NettyServerHandler中读取student对象:

// 读取数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 // 读取客户端发送的student
 StudentPOJO.Student student = (Student) msg;
 System.out.println("客户端发送的数据是:id=" + student.getId() + ", name=" + student.getName());
}

启动服务端,再启动客户端,就可以看到服务端后台打印出了如下信息:

客户端发送的数据是:id=666, name=张三
转一转
赞一赞
看一看
-java开发那些事-

    转藏 分享 献花(0

    0条评论

    发表

    请遵守用户 评论公约

    类似文章 更多