学习下Netty

Posted by Vincent on Thursday, July 15, 2021

前言

很久以前跟电信对接短信和彩信,然后要跟移动同步订购关系,移动要访问我们的WEB服务器,当时struct一堆XML看得头大,就自己写了个WEBSERVER,当时是用的 ServerSocket加多线程,比较原始不过自己实现了部分HTTP协议,整个过程自己对网络通讯的技术有了很深刻的认识,还是获益良多.然后写了很多Socket客服端,服务器端,私有协议还有各种其他协议.自己制定规则还挺好玩.现在网络模型大概都脱胎自NGINX吧.Netty算Java比较基础的库.所以学习下.顺便记录备查.空了用Netty写个IM玩玩.

这个文章是参照Netty官方的Quick Start编写的.没有完全翻译,加入了一些自己的备注.

Netty 作为一个高性能高扩展性的异步事件驱动的网络应用框架,被很多著名的JAVA开源组件所采用.使用Netty可以方便的进行高性能SOCKET服务和客户端的开发.Netty的创始人是韩国人Trustin Lee.韩国大佬.

安装

创建个Maven项目,添加相关的依赖

<dependency>
      <groupId>io.netty</groupId>
      <artifactId>netty-all</artifactId>
      <version>4.1.65.Final</version>
      <scope>compile</scope>
    </dependency>

实现最简单的Socket服务器

编写事件处理器

我们先来编写一个最简单的服务器,他的功能只是接受客户端的连接,然后丢弃内容关闭这个连接.不进行任何响应

package com.vnzmi;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
//DiscardServerHandler 继承了 ChannelInboundHandlerAdapter 该类实现了ChannelInboundHandler
//对事件的处理方法.我们可以重载一些方法来实现我们的功能
public class DiscardServerHandler  extends ChannelInboundHandlerAdapter {
    
  	// 重载channelRead事件处理方法.该事件在从客户端获取到消息时会被调用
    // 获取到的内容为 reference-counted 对象.(这个后面再了解)
  	@Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        //因为我们不会做任何操作,所以直接使用release方法释放获取到的消息内容
        ((ByteBuf)msg).release(); 
      
        //通过处理方法应该如下,对收到的数据进行处理
        //处理完成后释放消息
        /**
      	try{
            ByteBuf msg1 = (ByteBuf)  msg;
            while(msg1.isReadable()){
                System.out.print((char)msg1.readByte());
                System.out.flush();
            }
        }finally {
            ReferenceCountUtil.release(msg);
        }
        **/
    }
		
  	// 异常处理,当Netty在进行IO 处理出现异常时会触发该事件
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

启动服务器

创建服务器,进行服务器设置然后启动服务即可.

public class DiscardServer {
    private int port;

    public DiscardServer(int port){
        this.port = port;
    }

    public void run(){
        EventLoopGroup masterGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        ServerBootstrap server = new ServerBootstrap();
        server.group(masterGroup,workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        socketChannel.pipeline().addLast(new DiscardServerHandler());
                    }
                })
                .option(ChannelOption.SO_BACKLOG,128)
                .childOption(ChannelOption.SO_KEEPALIVE,true);

        try {
            ChannelFuture f = server.bind(port).sync();
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }


    }
}
  1. NioEventLoopGroup 是一个多线程的时间循环,用于处理IO操作.Netty为多种类型的数据传输提供了EventLoopGroup的实现,在我们这个服务端应用中,我们有两个NioEventLoopGroup 循环一个是 主进程(master),另外一个是工作进程(worker), 主进程用于接受传入的连接.一旦接收到连接就会将连接注册到工作进程,可以通过构造函数来设置NioEventLoopGroup的线程数量等参数
  2. ServerBootstrap 是一个工具类用于设置服务器
  3. 我们通过server.channel(NioServerSocketChannel.class) 设置使用NioServerSocketChannel来接受传入的连接
  4. server.childHandler() 指定通过ChannelInitializer 来初始化一个新的channel . 在Socket通道上附加上我们上一步编写的处理器程序.随着业务的复杂,我们会附加越来越多的处理程序,我们也可以在这里对Socket连接进行参数设置
  5. option(),childOption()用于设置Socket选项,SO_BACKLOG设置同一时间等待Socket连接的客户端队列的长度. SO_KEEPALIVE 设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文

常用的SOCKET选项

SO_BACKLOG

设置同一时间等待Socket连接的客户端队列的长度

SO_KEEPALIVE

参数对应于套接字选项中的SO_KEEPALIVE,该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

SO_REUSEADDR

这个参数表示允许重复使用本地地址和端口比如,某个服务器进程占用了TCP的80端口进行监听,此时再次监听该端口就会返回错误,使用该参数就可以解决问题,该参数允许共用该端口,这个在服务器程序中比较常使用,比如某个进程非正常退出,该程序占用的端口可能要被占用一段时间才能允许其他进程使用,而且程序死掉以后,内核一需要一定的时间才能够释放此端口,不设置SO_REUSEADDR

SO_SNDBUFSO_RCVBUF

这两个参数用于操作接收缓冲区和发送缓冲区的大小,接收缓冲区用于保存网络协议站内收到的数据,直到应用程序读取成功,发送缓冲区用于保存发送数据,直到发送成功。

SO_LINGER

Linux内核默认的处理方式是当用户调用close()方法的时候,函数返回,在可能的情况下,尽量发送数据,不一定保证会发生剩余的数据,造成了数据的不确定性,使用SO_LINGER可以阻塞close()的调用时间,直到数据完全发送

TCP_NODELAY

该参数的使用与Nagle算法有关,Nagle算法是将小的数据包组装为更大的帧然后进行发送,而不是输入一次发送一次,因此在数据包不足的时候会等待其他数据的到了,组装成大的数据包进行发送,虽然该方式有效提高网络的有效负载,但是却造成了延时,而该参数的作用就是禁止使用Nagle算法,使用于小数据即时传输,TCP_NODELAY相对应的是TCP_CORK,该选项是需要等到发送的数据量最大的时候,一次性发送数据,适用于文件传输。

IP_TOS

IP参数,设置IP头部的Type-of-Service字段,用于描述IP包的优先级和QoS选项。

ALLOW_HALF_CLOSURE

一个连接的远端关闭时本地端是否关闭,默认值为False。值为False时,连接自动关闭;为True时,触发ChannelInboundHandler.userEventTriggered()方法,事件为ChannelInputShutdownEvent

使用TELNET连接服务器

我们可以使用Telnet客户端来访问我们编写的服务器.为了查看到我们发送给服务器的内容,可以将channelRead

方法修改,直接打印出客户端输入的内,如下:

下面的程序将收到的字节转换为字符直接打印出来

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try{
            ByteBuf msg1 = (ByteBuf)  msg;
            while(msg1.isReadable()){
                System.out.print((char)msg1.readByte());
                System.out.flush();
            }
        }finally {
            ReferenceCountUtil.release(msg);
        }
    }

使用TELNET 进行连接 telnet 127.0.0.1 7010

$ telnet 127.0.0.1 7010
Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Hello Worker !!!^]
telnet>

服务器输出

java -classpath /Users/vincentmi/work/test/imserv/target/classes:/Users/vincentmi/.m2/repository/io/netty/netty-all/4.1.65.Final/netty-all-4.1.65.Final.jar com.vnzmi.App
Hello Worker !!!

实现服务器输出

改造channelRead

刚才的服务器只是丢弃了请求,通常服务器需要响应客户端的请求,我们需要输出一些内容到客户端,这样才是一个完整的服务器.我们实现一个回显功能,只需要简单的将客户端输入的内容返回回去就可以.通过修改channelRead函数来实现.

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        try{
            ByteBuf msg1 = (ByteBuf)  msg;
            while(msg1.isReadable()){
                System.out.print((char)msg1.readByte());
                System.out.flush();
            }
            msg1.retain();
            ctx.writeAndFlush(msg);
        }finally {
            ReferenceCountUtil.release(msg);
        }
    }

IllegalReferenceCountException 异常

ctx.writeAndFlush(msg);之前我们调用msg1.retain();来进行计数器加一.因为Netty使用引用计数进行垃圾回收,将msg 写入后对象计数会减一.之后进行release因为对象的引用已经为0 了所以会报 io.netty.util.IllegalReferenceCountException 这个错误. 参考 [Netty.docs: Reference counted objects](https://netty.io/wiki/reference-counted-objects.html)

实现时间服务器

修改channelActive事件

我们计划实现一个时间服务器,客户端连接后我们直接以32位int返回UNIX时间戳,消息发送完成后我们可以直接关闭连接,因为我们不需要读取数据,因此我们可以在channelActive事件中进行处理.

public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final ByteBuf time = ctx.alloc().buffer(4);//(1)
        time.writeInt((int)(System.currentTimeMillis() / 1000L + 2208988800L));//(2)
        final ChannelFuture f = ctx.writeAndFlush(time);//(3)
        f.addListener(future -> {
            assert f == future;
            ctx.close();//(4)
        });
    }
}
  1. 我们要写入32位的整形数据,因此我们需要申请4字节用于保存我们的数据

  2. NTP服务器的时间戳是从1900/1/1而Unix时间戳是从1970/1/1为起点,因此我们需要加上2208988800秒

  3. 写入数据到连接,writeAndFlush返回一个ChannelFuture对象.因为在Netty中所有操作都是异步的,如果我们直接调用close会导致连接被关闭.这时候信息可能并没有写入完成.因此我们需要在ChannelFuture完成后再进行关闭连接的操作

  4. 通过添加一个ChannelFutureListenerd来在操作完成时关闭连接.也可以用内置的监听器来达到这个目的,代码如下

f.addListener(ChannelFutureListener.CLOSE)

查看响应

使用rdate -o <port> -p <host> 可以验证我们的服务器是否工作正常

编写客户端

时间服务器返回的是4字节数据,我们使用telnet访问会看到乱码,人类是无法识别成一个时间戳.我们可以使用Netty编写一个客户端来连接我们自己的服务器.服务器与客户端在实现上最大的不同是使用的BootstarpChannel的使用.代码如下:


public class Client {
    public static void main(String[] args){
        int port = 7010;
        String host = "127.0.0.1";
        EventLoopGroup workGroup = new NioEventLoopGroup();

        Bootstrap bootstrap = new Bootstrap(); //(1)
        bootstrap.group(workGroup);//(2)
        bootstrap.channel(NioSocketChannel.class);//3
        bootstrap.option(ChannelOption.SO_KEEPALIVE,true);//4
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new TimeClientHandler() );
            }
        });

        try {
            ChannelFuture f = bootstrap.connect(host,port).sync();//5
            f.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            workGroup.shutdownGracefully();
        }
    }
}
  1. Bootstrap与上面的ServerBootstrap类似,不同点在于这个是用于非客户端的通道或者无连接通道
  2. 如果只指定了一个EventLoopGroup,这个时间循环将会被同时用于master和worker.当然客户端用不到Master循环
  3. NioSocketChannel用于客户端,而NioServerSocketChannel则用于服务端
  4. 我们设置了KEEP_ALIVE选项,只需要设置客户端用到的参数,不需要设置childOption因为当前的连接并没有一个父级
  5. 使用connet方法来建立一个连接.sync() 是将阻塞直到连接操作完成

客户端Handler

客户端连接的处理就是简单的读取从服务端返回的字节数据然后转换成我们熟悉的格式,然后关闭连接.

public class TimeClientHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        try {
            long unixTimeMillis = (byteBuf.readUnsignedInt()  - 2208988800L) * 1000L;
            System.out.println(new Date(unixTimeMillis));
            ctx.close();
        } finally {
            byteBuf.release();
        }
    }
}

处理基于流的传输

在类似TCP/IP的协议中,接收到的数据被存储到socket缓存中.但是这个缓存并不是基于数据包的的队列,而是基于字节的队列.如果你发了两条消息,操作系统只会给你一堆字节码.无法保证你读到内容和服务器写入内容是一致的.比如服务端写入了 ABC DEF GHI 客户端从缓存中读取的顺序可能还是 AB CDEF GH i 无论是客户端还是服务端都希望将这些碎片整理成有意义的帧.

粘包: ABC,DEF 读取 ABCD 这种一次读取了两个数据包的情况称为粘包.粘包的原因是发送方每次写入的数据小于Socket缓冲区大小,或者接收方读取缓冲区不够及时.

半包: 发送ABC 读取到 AB , C这种情况称为半包.就是一个数据包没有读完.造成的原因是发送方写入的数据大于缓冲区,或者发送的数据大于协议的MTU大小必须拆包.

解决方案一

以时间服务器为例,第一种解决方案是创建一个内部缓存,将收到的数据存入缓存,直到达到4字节我们再进行处理

代码如下:

public class TimeClientInterBufferHandler extends ChannelInboundHandlerAdapter {

    private ByteBuf buf ;

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        buf = ctx.alloc().buffer(4);//1
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        buf.release();//2
        buf = null;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ByteBuf byteBuf = (ByteBuf) msg;
        buf.writeBytes(byteBuf);//3
        byteBuf.release();
        try {
            if(buf.readableBytes() >= 4 ){
                long unixTimeMillis = (buf.readUnsignedInt()  - 2208988800L) * 1000L;
                System.out.println(new Date(unixTimeMillis));
                ctx.close();
            }

        } finally {
            byteBuf.release();
        }
    }
}
  1. 当Handler被加入时我们初始化内部缓存给他4字节
  2. 移除时清理分配的空间
  3. 将收到的数据写入到缓存中

解决方案二

使用ByteToMessageDecoder

第一个解决方案解决了我们的问题.修改的地方不多,但是随着业务的复杂代码会变得很复杂,因此我们可以添加多个Handler来降低复杂度.不同的Hander处理不同的部分.上面的方案我们可以将他分为两个处理器.

  • TimeDecoder 处理数据解包的问题
  • TimeClientHandler 用来处理我们的核心逻辑

Netty提供了一个开盖即用的扩展类来帮助我们简化开发

public class TimeDecoder extends ByteToMessageDecoder { //1
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {//2
        if(byteBuf.readableBytes() < 4){ 
            return ;//3
        }

        list.add(byteBuf.readBytes(4)); //4
    }
}
  1. ByteToMessageDecoder是ChannleInBoundHandler的一个实现,用于解决解包的问题
  2. 当收到新数据的时候BytesToMessageDecoder会调用decode方法.BytesToMessageDecoder会维护一个内部缓冲区
  3. 如果可用数据没有达到4字节则不进行操作
  4. 当缓冲的数据达到4字节我们将其写入到输出列表中,此时BytesToMessageDecoder会丢弃已经读取的字节.

下一步我们将这个处理器加入到pipeline中即可.


        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new TimeDecoder() , new TimeClientHandler() );
            }
        });

使用ReplylingDecoder

使用ReplylingDecoder可以使我们的处理更加简化,

public class TimeDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
        out.add(in.readBytes(4));
    }
}

ReplayingDecoder.decode方法的 in 参数在读取的字节数不够时会抛出异常,ReplayingDecoder处理异常并退回指针.使我们减少了一些判断更加简单的实现我们的需求.

粘包?

我认为粘包并不是TCP的问题或者缺陷.TCP本来就是流式传输保证顺序.粘包是属于通讯协议设计层面的问题.所以需要通过协议设计来解决.

使用长度包头

在包的开始使用固定长度的包头字段,早期短信的协议就是这样固定包头,在包头指定内容长度

struct header {
	int32_t length
	int32_t command
	int32_t seqId
}

使用分隔符

可以使用分隔符号来拆分数据包比如使用"\r\n".

使用POJO替代ByteBuf

之前我们一直使用了ByteBuf来处理协议的消息,如果能使用POJO来替代他将让我们的代码更好维护.

定义POJO对象

public class TimeDecoder2 extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        long t = byteBuf.readUnsignedInt();
        list.add(new UnixTime(t));

    }
}

修改Decoder

进行decode的时候直接输出一个UnixTime对象

public class TimeDecoder2 extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
        long t = byteBuf.readUnsignedInt();

        list.add(new UnixTime(t));

    }
}

修改Handler

对TimeClientHandler进行修改

public class TimeClientHandler2 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        UnixTime t = (UnixTime) msg;
        System.out.println(t);
        ctx.close();
    }
}

修改服务端Handler

我们再ctx中直接写入POJO对象


public class TimeServerHandler2 extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        final ChannelFuture f = ctx.writeAndFlush(new UnixTime());
        f.addListener(ChannelFutureListener.CLOSE);
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }
}

实现服务端Encoder

我们添加一个TimeEncoder来对他进行编码

public class TimeEncoder  extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        UnixTime t = (UnixTime) msg;
        ByteBuf encoded = ctx.alloc().buffer(4);
        encoded.writeInt((int)t.getValue());
        ctx.write(encoded, promise);
    }
}

简化版本,我们使用 MessageToByteEncoder 来更简单的进行处理

public class TimeEncoder2 extends MessageToByteEncoder<UnixTime> {
    @Override
    protected void encode(ChannelHandlerContext channelHandlerContext, UnixTime unixTime, ByteBuf byteBuf) throws Exception {
        byteBuf.writeInt((int)unixTime.getValue());
    }
}

最后我们需要在pipeline中加入我们新设置的处理器

socketChannel.pipeline().addLast(new TimeEncoder2(),new TimeServerHandler2() );

注意TimeEncoder要在前面.

关闭服务

使用 workGroup.shutdownGracefully();关闭完我们的连接关闭所有管道之后我们就可以安全关闭服务了.

更多代码细节 查看

https://github.com/netty/netty/tree/4.1/example/src/main/java/io/netty/example

参考: https://netty.io/wiki/user-guide-for-4.x.html#wiki-h2-0

「真诚赞赏,手留余香」

我的乐与怒

真诚赞赏,手留余香

使用微信扫描二维码完成支付