Netty深入浅出系列:Netty read

boss线程主要负责监听并处理accept事件,将socketChannel注册到work线程的selector,由worker线程来监听并处理read事件,本节主要分析Netty如何处理read事件。

Netty深入浅出系列:Netty read

accept->read

当work线程的selector检测到OP_READ事件发生时,触发read操作。

//NioEventLoop 
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) { 
 unsafe.read(); 
 if (!ch.isOpen()) { 
 // Connection already closed - no need to handle write. 
 return; 
 } 
}

该read方法定义在类NioByteUnsafe中。

//AbstractNioByteChannel.NioByteUnsafe
public final void read() {
 final ChannelConfig config = config();
 if (!config.isAutoRead() && !isReadPending()) {
 // ChannelConfig.setAutoRead(false) was called in the meantime
 removeReadOp();
 return;
 }
 final ChannelPipeline pipeline = pipeline();
 final ByteBufAllocator allocator = config.getAllocator();
 final int maxMessagesPerRead = config.getMaxMessagesPerRead();
 RecvByteBufAllocator.Handle allocHandle = this.allocHandle;
 if (allocHandle == null) {
 this.allocHandle = allocHandle = config.getRecvByteBufAllocator().newHandle();
 }
 ByteBuf byteBuf = null;
 int messages = 0;
 boolean close = false;
 try {
 int totalReadAmount = 0;
 boolean readPendingReset = false;
 do {
 byteBuf = allocHandle.allocate(allocator);
 int writable = byteBuf.writableBytes();
 int localReadAmount = doReadBytes(byteBuf);
 if (localReadAmount <= 0) {
 // not was read release the buffer
 byteBuf.release();
 byteBuf = null;
 close = localReadAmount < 0;
 break;
 }
 if (!readPendingReset) {
 readPendingReset = true;
 setReadPending(false);
 }
 pipeline.fireChannelRead(byteBuf);
 byteBuf = null;
 if (totalReadAmount >= Integer.MAX_VALUE - localReadAmount) {
 // Avoid overflow.
 totalReadAmount = Integer.MAX_VALUE;
 break;
 }
 totalReadAmount += localReadAmount;
 // stop reading
 if (!config.isAutoRead()) {
 break;
 }
 if (localReadAmount < writable) {
 // Read less than what the buffer can hold,
 // which might mean we drained the recv buffer completely.
 break;
 }
 } while (++ messages < maxMessagesPerRead);
 pipeline.fireChannelReadComplete();
 allocHandle.record(totalReadAmount);
 if (close) {
 closeOnRead(pipeline);
 close = false;
 }
 } catch (Throwable t) {
 handleReadException(pipeline, byteBuf, t, close);
 } finally {
 // Check if there is a readPending which was not processed yet.
 // This could be for two reasons:
 // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
 // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
 //
 // See https://github.com/netty/netty/issues/2254
 if (!config.isAutoRead() && !isReadPending()) {
 removeReadOp();
 }
 }
}

1、allocHandle负责自适应调整当前缓存分配的大小,以防止缓存分配过多或过少,先看看AdaptiveRecvByteBufAllocator内部实现:

public class AdaptiveRecvByteBufAllocator implements RecvByteBufAllocator {
 static final int DEFAULT_MINIMUM = 64;
 static final int DEFAULT_INITIAL = 1024;
 static final int DEFAULT_MAXIMUM = 65536;
 private static final int INDEX_INCREMENT = 4;
 private static final int INDEX_DECREMENT = 1;
 private static final int[] SIZE_TABLE;
}

SIZE_TABLE:按照从小到大的顺序预先存储可以分配的缓存大小。

从16开始,每次累加16,直到496,接着从512开始,每次增大一倍,直到溢出。

DEFAULT_MINIMUM:最小缓存(64),在SIZE_TABLE中对应的下标为3。

**DEFAULT_MAXIMUM **:最大缓存(65536),在SIZE_TABLE中对应的下标为38。

**DEFAULT_INITIAL **:初始化缓存大小,第一次分配缓存时,由于没有上一次实际收到的字节数做参考,需要给一个默认初始值。

INDEX_INCREMENT:上次预估缓存偏小,下次index的递增值。

**INDEX_DECREMENT **:上次预估缓存偏大,下次index的递减值。

Netty深入浅出系列:Netty read

2、allocHandle.allocate(allocator) 申请一块指定大小的内存。

//AdaptiveRecvByteBufAllocator.HandleImpl
public ByteBuf allocate(ByteBufAllocator alloc) {
 return alloc.ioBuffer(nextReceiveBufferSize);
}

通过ByteBufAllocator的ioBuffer方法申请缓存。

//AbstractByteBufAllocator
public ByteBuf ioBuffer(int initialCapacity) {
 if (PlatformDependent.hasUnsafe()) {
 return directBuffer(initialCapacity);
 }
 return heapBuffer(initialCapacity);
}

根据平台是否支持unsafe,选择使用直接物理内存还是堆上内存。

direct buffer方案:

//AbstractByteBufAllocator
public ByteBuf directBuffer(int initialCapacity) {
 return directBuffer(initialCapacity, Integer.MAX_VALUE);
}
public ByteBuf directBuffer(int initialCapacity, int maxCapacity) {
 if (initialCapacity == 0 && maxCapacity == 0) {
 return emptyBuf;
 }
 validate(initialCapacity, maxCapacity);
 return newDirectBuffer(initialCapacity, maxCapacity);
}
//UnpooledByteBufAllocator
protected ByteBuf newDirectBuffer(int initialCapacity, int maxCapacity) {
 ByteBuf buf;
 if (PlatformDependent.hasUnsafe()) {
 buf = new UnpooledUnsafeDirectByteBuf(this, initialCapacity, maxCapacity);
 } else {
 buf = new UnpooledDirectByteBuf(this, initialCapacity, maxCapacity);
 }
 return toLeakAwareBuffer(buf);
}

UnpooledUnsafeDirectByteBuf是如何实现缓存管理的?对Nio的ByteBuffer进行了封装,通过ByteBuffer的allocateDirect方法实现缓存的申请。

protected UnpooledUnsafeDirectByteBuf(ByteBufAllocator alloc, ByteBuffer initialBuffer, int maxCapacity) {
 //判断逻辑已经忽略
 this.alloc = alloc;
 setByteBuffer(allocateDirect(initialCapacity));
}
protected ByteBuffer allocateDirect(int initialCapacity) {
 return ByteBuffer.allocateDirect(initialCapacity);
}
private void setByteBuffer(ByteBuffer buffer) {
 ByteBuffer oldBuffer = this.buffer;
 if (oldBuffer != null) {
 if (doNotFree) {
 doNotFree = false;
 } else {
 freeDirect(oldBuffer);
 }
 }
 this.buffer = buffer;
 memoryAddress = PlatformDependent.directBufferAddress(buffer);
 tmpNioBuf = null;
 capacity = buffer.remaining();
}

memoryAddress = PlatformDependent.directBufferAddress(buffer) 获取buffer的address字段值,指向缓存地址。

capacity = buffer.remaining() 获取缓存容量。

方法toLeakAwareBuffer(buf)对申请的buf又进行了一次包装:

protected static ByteBuf toLeakAwareBuffer(ByteBuf buf) {
 ResourceLeak leak;
 switch (ResourceLeakDetector.getLevel()) {
 case SIMPLE:
 leak = AbstractByteBuf.leakDetector.open(buf);
 if (leak != null) {
 buf = new SimpleLeakAwareByteBuf(buf, leak);
 }
 break;
 case ADVANCED:
 case PARANOID:
 leak = AbstractByteBuf.leakDetector.open(buf);
 if (leak != null) {
 buf = new AdvancedLeakAwareByteBuf(buf, leak);
 }
 break;
 }
 return buf;
}

Netty中使用引用计数机制来管理资源,ByteBuf实现了ReferenceCounted接口,当实例化一个ByteBuf时,引用计数为1, 代码中需要保持一个该对象的引用时需要调用retain方法将计数增1,对象使用完时调用release将计数减1。当引用计数变为0时,对象将释放所持有的底层资源或将资源返回资源池。

3、方法doReadBytes(byteBuf) 将socketChannel数据写入缓存。

//NioSocketChannel
@Override
protected int doReadBytes(ByteBuf byteBuf) throws Exception {
 return byteBuf.writeBytes(javaChannel(), byteBuf.writableBytes());
}
//WrappedByteBuf
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
 return buf.writeBytes(in, length);
}
//AbsractByteBuf
@Override
public int writeBytes(ScatteringByteChannel in, int length) throws IOException {
 ensureAccessible();
 ensureWritable(length);
 int writtenBytes = setBytes(writerIndex, in, length);
 if (writtenBytes > 0) {
 writerIndex += writtenBytes;
 }
 return writtenBytes;
}
//UnpooledUnsafeDirectByteBuf
@Override
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
 ensureAccessible();
 ByteBuffer tmpBuf = internalNioBuffer();
 tmpBuf.clear().position(index).limit(index + length);
 try {
 return in.read(tmpBuf);
 } catch (ClosedChannelException ignored) {
 return -1;
 }
}
private ByteBuffer internalNioBuffer() {
 ByteBuffer tmpNioBuf = this.tmpNioBuf;
 if (tmpNioBuf == null) {
 this.tmpNioBuf = tmpNioBuf = buffer.duplicate();
 }
 return tmpNioBuf;
}

最终底层采用ByteBuffer实现read操作,这里有一块逻辑不清楚,为什么要用tmpNioBuf?

int localReadAmount = doReadBytes(byteBuf);

1、如果返回0,则表示没有读取到数据,则退出循环。

2、如果返回-1,表示对端已经关闭连接,则退出循环。

3、否则,表示读取到了数据,数据读入缓存后,触发pipeline的ChannelRead事件,byteBuf作为参数进行后续处理,这时自定义Inbound类型的handler就可以进行业务处理了。

static class DiscardServerHandler extends ChannelInboundHandlerAdapter {
 @Override
 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
 ByteBuf in = (ByteBuf) msg;
 try {
 while (in.isReadable()) { // (1)
 System.out.print((char) in.readByte());
 System.out.flush();
 }
 } finally {
 ReferenceCountUtil.release(msg); // (2)
 }
 }
}

其中参数msg,就是对应的byteBuf,当请求的数据量比较大时,会多次触发channelRead事件,默认最多触发16次,可以通过maxMessagesPerRead字段进行配置。

如果客户端传输的数据过大,可能会分成好几次传输,因为TCP一次传输内容大小有上限,所以同一个selectKey会触发多次read事件,剩余的数据会在下一轮select操作继续读取。

在实际应用中,应该把所有请求数据都缓存起来再进行业务处理。

所有数据都处理完,触发pipeline的ChannelReadComplete事件,并且allocHandle记录这次read的字节数,进行下次处理时缓存大小的调整。

到此为止,整个NioSocketChannel的read事件已经处理完成。

Netty深入浅出系列:Netty read