Java NIO源码分析
1.前言
JDK1.4之前的传统阻塞IO(BIO),服务端需要为每一个客户端连接创建单独的线程为其服务,从JDK1.4开始NIO非阻塞式IO出现,它只需要单独的一个线程就能接收多个客户端请求,而真正处理各个请求的细节可以使用多线程的方式高效率的完成,这些处理线程与具体的业务逻辑分离,做到了IO的复用。
2.源码分析
首先以一段典型的NIO使用代码开始:
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(9527));
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true){
int n = selector.select();
if (n <= 0) continue;
Iterator it = selector.selectedKeys().iterator();
while(it.hasNext()){
SelectionKey key = (SelectionKey)it.next();
if (key.isAcceptable()){
SocketChannel sc= ((ServerSocketChannel) key.channel()).accept();
sc.configureBlocking(false);
sc.register(key.selector(), SelectionKey.OP_READ|SelectionKey.OP_WRITE);
}
if (key.isReadable()){
SocketChannel channel = ((SocketChannel) key.channel());
ByteBuffer bf = ByteBuffer.allocate(10);
int read = channel.read(bf);
System.out.println("read "+read+" : "+new String(bf.array()).trim());
}
if (key.isWritable()){
SocketChannel channel = ((SocketChannel) key.channel());
channel.write(ByteBuffer.wrap(new String("hello client").getBytes()));
}
it.remove();
}
}2.1 Selector.open() 获取选择器。
public static Selector open() throws IOException {
return SelectorProvider.provider().openSelector();
}
public static SelectorProvider provider() {
synchronized (lock) {
if (provider != null)
return provider;
return AccessController.doPrivileged(
new PrivilegedAction<SelectorProvider>() {
public SelectorProvider run() {
if (loadProviderFromProperty())
return provider;
if (loadProviderAsService())
return provider;
provider = sun.nio.ch.DefaultSelectorProvider.create();
return provider;
}
});
}
}从Selector源码中可以看到,open方法是交给selectorProvider处理的。 其中provider = sun.nio.ch.DefaultSelectorProvider.create();会根据操作系统来返回不同的实现类,windows平台就返回WindowsSelectorProvider;Linux平台会根据不同的内核版本选择是使用select/poll模式还是epoll模式。
public static SelectorProvider create() {
PrivilegedAction pa = new GetPropertyAction("os.name");
String osname = (String) AccessController.doPrivileged(pa);
if ("SunOS".equals(osname)) {
return new sun.nio.ch.DevPollSelectorProvider();
}
// use EPollSelectorProvider for Linux kernels >= 2.6
if ("Linux".equals(osname)) {
pa = new GetPropertyAction("os.version");
String osversion = (String) AccessController.doPrivileged(pa);
String[] vers = osversion.split("\\.", 0);
if (vers.length >= 2) {
try {
int major = Integer.parseInt(vers[0]);
int minor = Integer.parseInt(vers[1]);
if (major > 2 || (major == 2 && minor >= 6)) {
return new sun.nio.ch.EPollSelectorProvider();
}
} catch (NumberFormatException x) {
// format not recognized
}
}
}
return new sun.nio.ch.PollSelectorProvider();
}
sun.nio.ch.EPollSelectorProvider
public AbstractSelector openSelector() throws IOException {
return new EPollSelectorImpl(this);
}
sun.nio.ch.PollSelectorProvider
public AbstractSelector openSelector() throws IOException {
return new PollSelectorImpl(this);
}可以看到,如果Linux内核版本>=2.6则,具体的SelectorProvider为EPollSelectorProvider,否则为默认的PollSelectorProvider,实际上这是在JDK5U9之后才有这样的更新。
public static SelectorProvider create() {
return new sun.nio.ch.WindowsSelectorProvider();
}
sun.nio.ch.WindowsSelectorProvider
public AbstractSelector openSelector() throws IOException {
return new WindowsSelectorImpl(this);
}
WindowsSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe = Pipe.open();
wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();
// Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd = ((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
}
void addWakeupSocket(int fdVal, int index) {
putDescriptor(index, fdVal);
putEventOps(index, POLLIN);
}接下来,以Windows的实现为准进行分析。在openSelector方法里面实例化WindowsSelectorImpl的过程中,
1).实例化了PollWrapper,pollWrapper用Unsafe类申请一块物理内存,用于存放注册时的socket句柄fdVal和event的数据结构pollfd.
2)Pipe.open()打开一个管道(打开管道的实现后面再看);拿到wakeupSourceFd和wakeupSinkFd两个文件描述符;把唤醒端的文件描述符(wakeupSourceFd)放到pollWrapper里.addWakeupSocket方法将source的POLLIN事件(有数据可读)标识为感兴趣的,当sink端有数据写入时,source对应的文件描述描wakeupSourceFd就会处于就绪状态.
public static Pipe open() throws IOException {
return SelectorProvider.provider().openPipe();
}
public Pipe openPipe() throws IOException {
return new PipeImpl(this);
}
PipeImpl(final SelectorProvider sp) throws IOException {
try {
AccessController.doPrivileged(new Initializer(sp));
} catch (PrivilegedActionException x) {
throw (IOException)x.getCause();
}
}
private Initializer(SelectorProvider sp) {
this.sp = sp;
}
public Void run() throws IOException {
LoopbackConnector connector = new LoopbackConnector();
connector.run();
....//省略
}
private class LoopbackConnector implements Runnable {
@Override
public void run() {
ServerSocketChannel ssc = null;
SocketChannel sc1 = null;
SocketChannel sc2 = null;
try {
// Loopback address
InetAddress lb = InetAddress.getByName("127.0.0.1");
assert(lb.isLoopbackAddress());
InetSocketAddress sa = null;
for(;;) {
// Bind ServerSocketChannel to a port on the loopback
// address
if (ssc == null || !ssc.isOpen()) {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(lb, 0));
sa = new InetSocketAddress(lb, ssc.socket().getLocalPort());
}
// Establish connection (assume connections are eagerly
// accepted)
sc1 = SocketChannel.open(sa);
ByteBuffer bb = ByteBuffer.allocate(8);
long secret = rnd.nextLong();
bb.putLong(secret).flip();
sc1.write(bb);
// Get a connection and verify it is legitimate
sc2 = ssc.accept();
bb.clear();
sc2.read(bb);
bb.rewind();
if (bb.getLong() == secret)
break;
sc2.close();
sc1.close();
}
// Create source and sink channels
source = new SourceChannelImpl(sp, sc1);
sink = new SinkChannelImpl(sp, sc2);
} catch (IOException e) {
try {
if (sc1 != null)
sc1.close();
if (sc2 != null)
sc2.close();
} catch (IOException e2) {}
ioe = e;
} finally {
try {
if (ssc != null)
ssc.close();
} catch (IOException e2) {}
}
}
}
}通过创建管道的代码分析:创建管道的具体实现方式也是与具体的操作系统紧密相关的,这里以Windows为例,创建了一个PipeImpl对象, AccessController.doPrivileged调用后紧接着会执行initializer的run方法,在run方法里面,windows下的实现是创建两个本地的socketChannel,然后连接(链接的过程通过写一个随机long做两个socket的链接校验),两个socketChannel分别实现了管道的source与sink端。通过查阅资料,而在Linux下则是直接使用操作系统提供的管道。
到这里,Selector.open()就完成了,总结一下,主要完成以下几件事:
1.实例化pollWrapper对象,用于将来存放注册时的socket句柄fdVal和event的数据结构pollfd。
2.根据不同操作系统实现了用于自我唤醒的管道,Windows通过创建一对自己连着自己的socket通道,Linux直接使用系统提供的管道。同时,根据linux的不同内核版本还会选择底层进行事件通知的不同机制select/poll或者epoll。
2.2 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);通道注册
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException{
synchronized (regLock) {
SelectionKey k = findKey(sel);
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
} 如果该channel和selector已经注册过,则直接添加事件和附件。否则通过selector实现注册过程。protected final SelectionKey register(AbstractSelectableChannel ch,
int ops, Object attachment) {
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
protected void implRegister(SelectionKeyImpl ski) {
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
growIfNeeded();
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
fdMap.put(ski);
keys.add(ski);
pollWrapper.addEntry(totalChannels, ski);
totalChannels++;
}
}
private void growIfNeeded() {
if (channelArray.length == totalChannels) {
int newSize = totalChannels * 2; // Make a larger array
SelectionKeyImpl temp[] = new SelectionKeyImpl[newSize];
System.arraycopy(channelArray, 1, temp, 1, totalChannels - 1);
channelArray = temp;
pollWrapper.grow(newSize);
}
if (totalChannels % MAX_SELECTABLE_FDS == 0) { // more threads needed
pollWrapper.addWakeupSocket(wakeupSourceFd, totalChannels);
totalChannels++;
threadsCount++;
}
}
void addEntry(int index, SelectionKeyImpl ski) {
putDescriptor(index, ski.channel.getFDVal());
} 通过selector注册的过程主要完成以下几件事:- 以当前channel和selector为参数,初始化 SelectionKeyImpl 对象,并添加附件attachment。
- 如果当前channel的数量totalChannels等于SelectionKeyImpl数组大小,对SelectionKeyImpl数组和pollWrapper进行扩容操作。
- 如果totalChannels % MAX_SELECTABLE_FDS == 0,则多开一个线程处理selector。windows上select系统调用有最大文件描述符限制,一次只能轮询1024个文件描述符,如果多于1024个,需要多线程进行轮询。
- ski.setIndex(totalChannels)选择键记录下在数组中的索引位置。
- keys.add(ski);将选择键加入到已注册键的集合中。
- fdMap.put(ski);保存选择键对应的文件描述符与选择键的映射关系。
- pollWrapper.addEntry将把selectionKeyImpl中的socket句柄添加到对应的pollfd。
- k.interestOps(ops)方法最终也会把event添加到对应的pollfd。
2.3 selector.select();
public int select() throws IOException {
return select(0);
}
public int select(long timeout) throws IOException
{
if (timeout < 0)
throw new IllegalArgumentException("Negative timeout");
return lockAndDoSelect((timeout == 0) ? -1 : timeout);
}
private int lockAndDoSelect(long timeout) throws IOException {
synchronized (this) {
if (!isOpen())
throw new ClosedSelectorException();
synchronized (publicKeys) {
synchronized (publicSelectedKeys) {
return doSelect(timeout);
}
}
}
}当调用selector.select()以及select(0)时,JDK对参数进行修正,其实传给doSelect的timeout为-1。当调用的是selectNow()的时候,timeout则为0,直接以负数作为参数则会抛出异常,其中的doSelector又回到我们的Windows实现: