Netty中使用零拷贝方式写大数据

因为网络饱和的可能性,如何在异步框架中高效地写大块的数据是一个特殊的问题。由于写操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大型数据时,需要准备好处理到远程节点的连接是慢速连接的情况,这种情况会导致内存释放的延迟。让我们考虑下将一个文件内容写出到网络的情况。

我们知道在传输的过程中,由于NIO的零拷贝特性,这种特性消除了将文件的内容从文件系统移动到网络栈的复制过程。所有的这一切都发生在Netty的核心中,所以应用程序所有需要做的就是使用一个FileRegion接口的实现,其在Netty的API文档中的定义是:“通过支持零拷贝的文件传输的Channel来发送的文件区域。”

下面的代码清单展示了如何通过从FileInputStream创建一个DefaultFileRegion,并将其写入Channel,从而利用零拷贝特性来传输一个文件的内容。

使用FileRegion传输文件的内容

FileInputStream in = new FileInputStream(file); // 创建一个FileInputStream
FileRegion region = new DefaultFileRegion(in.getChannel(), 0, // 以该文件的完整长度创建一个新的DefaultFileRegion
        file.length());channel.writeAndFlush(region).addListener(// 发送该DefaultFileRegion,并注册一个ChannelFutureListener

  new ChannelFutureListener(){

    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
    if (!future.isSuccess()) {
        Throwable cause = future.cause();// 处理失败
        // Do something
        }
    }

});

这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据从文件系统复制到用户内存中时,可以使用ChunkedWriteHandler,它支持异步写大型数据流,而又不会导致大量的内存消耗。
关键是interfaceChunkedInput<B>,其中类型参数B是readChunk()方法返回的类型。Netty预置了该接口的4个实现,如下表中所列出的。每个都代表了一个将由ChunkedWriteHandler处理的不定长度的数据流。

ChunkedInput 的实现

ChunkedFile 从文件中逐块获取数据,当你的平台不支持零拷贝或者你需要转换数据时使用
ChunkedNioFile 和ChunkedFile 类似,只是它使用了FileChannel
ChunkedStream 从InputStream 中逐块传输内容
ChunkedNioStream 从ReadableByteChannel 中逐块传输内容

下面代码清单说明了ChunkedStream的用法,它是实践中最常用的实现。所示的类使用了一个File以及一个SslContext进行实例化。当initChannel()方法被调用时,它将使用所示的ChannelHandler 链初始化该Channel。
当Channel 的状态变为活动的时,WriteStreamHandler 将会逐块地把来自文件中的数据作为ChunkedStream 写入。数据在传输之前将会由SslHandler加密。

使用ChunkedStream 传输文件内容

public class ChunkedWriteHandlerInitializer extends ChannelInitializer<Channel> {
    private final File file;
    private final SslContext sslCtx;

    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
        this.file = file;
        this.sslCtx = sslCtx;
    }

    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SslHandler(sslCtx.newEngine(ch.alloc());// 将SslHandler添加到ChannelPipeline中
        pipeline.addLast(new ChunkedWriteHandler());// 添加ChunkedWriteHandler以处理作为ChunkedInput传入的数据
        pipeline.addLast(new WriteStreamHandler());// 一旦连接建立,WriteStreamHandler就开始写文件数据
    }

    public final class WriteStreamHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {// 当连接建立时,channelActive()方法将使用ChunkedInput写文件数据
            super.channelActive(ctx);
            ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file)));
        }
    }
}

逐块输入要使用你自己的ChunkedInput实现,请在ChannelPipeline中安装一个ChunkedWriteHandler。在本节中,我们讨论了如何通过使用零拷贝特性来高效地传输文件,以及如何通过使用ChunkedWriteHandler来写大型数据而又不必冒着导致OutOfMemoryError的风险。

相关推荐