Netty 服务端开发
上一篇文章 BIO、伪异步 IO、AIO和NIO 我们使用 JDK 的 NIO 原生类库进行异步 IO 的开发. 现在我们使用 Netty 来进行开发.
示例代码
public class TimeServer {
public void bind(int port) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childHandler(new ChildChannelHandler());
// 绑定端口, 同步等待成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
}
private class TimeServerHandler extends ChannelHandlerAdapter {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buffer = (ByteBuf) msg;
byte[] req = new byte[buffer.readableBytes()];
buffer.readBytes(req);
String body = new String(req, "UTF-8");
System.out.println(body);
ByteBuf resp = Unpooled.copiedBuffer("6666".getBytes());
ctx.write(resp);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
}创建两个 NioEventLoopGroup 实例. NioEventLoopGroup 是个线程组, 它包含了一组 NIO 线程, 专门用于网络事件的处理, 实际上它们就是 Reactor 线程组. 这里创建两个的原因是一个用于服务端接收客户端的连接, 另一个用于进行 SocketChannel 的网络读写.
创建 ServerBootstrap 对象, 它是 Netty 用于启动 NIO 服务端的辅助启动类, 目的是降低服务端的开发复杂度.
调用 ServerBootstrap 的 group 方法, 将两个 NIO 线程组当作参数传递到 ServerBootstrap 中. 并且设置 Channel 为 NioServerSocketChannel, 它的功能对于 JDK NIO 类库中的 ServerSocketChannel 类. 然后配置 NioServerSocketChannel 的 TCP 参数, 此处将它的 backlog 设置为 1024. 最后绑定 IO 事件的处理类 ChildChannelHandler, 它的作用类似于 Reactor 模式中的 Handler 类, 主要用于处理网络 IO 事件, 例如记录日志, 对消息进行编解码等.
ChannelOption.SO_BACKLOG 对应的是 tcp/ip 协议 listen 函数中的 backlog 参数, 函数 listen(int socketfd,int backlog) 用来初始化服务端可连接队列, 服务端处理客户端连接请求是顺序处理的, 所以同一时间只能处理一个客户端连接, 多个客户端来的时候, 服务端将不能处理的客户端连接请求放在队列中等待处理, backlog参数指定了队列的大小. ChannelOption 连接服务端启动辅助类配置完成之后, 调用它的 bind 方法绑定监听端口, 随后, 调用同步阻塞方法 sync 等待绑定操作完成. 完成之后 Netty 会返回一个 ChannelFuture, 它的功能类似于 JDK 的 java.util.concurrent.Future, 主要用于异步操作的通知回调.
f.channel().closeFuture().sync(); 方法进行阻塞, 等待服务端链路关闭之后 main 函数才退出.
调用 NIO 线程组的 shutdownGracefully 进行优雅退出, 它会释放跟 shutdownGracefully 相关联的资源.
TimeServerHandler 继承自 ChannelHandlerAdapter, 它用于对网络事件进行读写操作, 通常我们只需要关注 channelRead 和 exceptionCaught 方法.
在 channelRead 方法中, 将 msg 转换成 Netty 的 ByteBuf 对象. ByteBuf 类似与 JDK 中的 java.nio.ByteBuffer 对象, 不过它提供了更加强大和灵活的功能. 通过 ByteBuf 的 readableBytes 方法可以获取缓冲区可读的字节数. 然后使用 readBytes 方法将缓冲区中的字节数组复制到新建的 byte 数组中.
我们还调用了 ChannelHandlerContext 的 flush 方法, 它的作用是将消息发送队列中的消息写入到 SocketChannel 中发送给对方. 从性能角度考虑, 为了防止频繁的唤醒 Selector 进行消息发送, Netty 的 write 方法并不直接将消息写入 SocketChannel 中.
注意: write 方法只是将要发送的消息放到消息发送队列中, 再通过调用 flush 方法, 将发送缓冲区中的消息全部写到 SocketChannel 中.
在 exceptionCaught 方法中, 如果出现异常, 就关闭 ChannelHandlerContext, 释放和 ChannelHandlerContext 相关联的句柄等资源.
总结
EventLoopGroup: 线程组, 专门用于网络事件的处理, 实际上它们就是 Reactor 线程组.ServerBootstrap 是 Netty 用于启动 NIO 服务端的辅助启动类.group方法: 第一个参数 parent (acceptor), 第二个参数 child (client).NioServerSocketChannel: 它的功能对于 JDK NIO 类库中的 ServerSocketChannel 类.ServerSocketChannel: 允许我们监听TCP链接请求, 每个请求会创建会一个 SocketChannel.ChannelHandlerAdapter类: 它用于对网络事件进行读写操作.