kafka是如何做到百万级高并发低迟延的?

Kafka是高吞吐低延迟的高并发、高性能的消息中间件,在大数据领域有极为广泛的运用。配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。Kafka到底是如何做到这么高的吞吐量和性能的呢?我们今天来走进kafka的server端探究一下它的Reactor高并发网络模型机制。

1.1、Kafka Reactor模型架构

Kafka客户端和服务端通信采取的是NIO的reactor模式,它是一种事件驱动模式。那么一个常见的单线程Reactor模式下,NIO线程的职责都有哪些呢?我们整理了如下几点:
1、作为NIO服务端,接收客户端的TCP连接
2、作为NIO客户端,向服务端发起TCP连接
3、读取通信对端的请求或者应答消息
4、向通信对端发送消息请求或者应答消息
以上四点对应的一个Reactor模式的架构图如下:
kafka是如何做到百万级高并发低迟延的?

对于一些小容量的业务场景,这种单线程的模式基本够用。但是对于高负载、大并发的应用场景却并不适合,主要原因如下:
性能问题1:一个NIO线程同时处理数十万甚至百万级的链路性能是无法支撑的
性能问题2:如果超时发生重试会加重服务端处理负荷,导致大量处理积压
可靠性问题:单个线程出现故障,整个系统无法使用,造成单点故障
所以一个高并发的处理服务需要对以上架构进行优化改造,例如处理采取多线程模式,将接收线程尽量简化,相当于将接收线程作为一个接入层。那么我们回到主题kafka的reactor模式架构是怎样的?
kafka是如何做到百万级高并发低迟延的?

在上面这个kafka的架构图中可以看出,它包含以下几个流程:
1、客户端请求NIO的连接器Acceptor,同时它还具备事件的转发功能,转发到Processor处理
2、服务端网络事件处理器Processor
3、请求队列RequestChannel,存储了所有待处理的请求信息
4、请求处理线程池(RequestHandler Pool)作为守护线程轮训RequestChannel的请求处理信息,并将其转发给API层对应的处理器处理
5、API层处理器将请求处理完成之后放入到Response Queue中,并由Processor从Response Queue取出发送到对应的Client端
需要注意的一点是虽然Broker层包含多个Acceptor,但是kafka的reactor模式里面还是单线程Acceptor多线程handler的模式,这里的多个Acceptor是针对一个服务器下多网卡场景的,每个EndPoint就是一个网卡它对应于一个ip和port的组合,而每个Endpoint只有一个Acceptor。

1.2、Kafka Reactor模型源码详解

按照上面架构图阐述的几个流程,它分别对应着kafka里面的事件接收、处理、响应等几个阶段,我们下面从具体实现这几个阶段的源码层面来分析。
1.2.1、SocketServer
SocketServer是一个标准的NIO服务端实现,它主要包含以下变量:
1、RequestChannel:Processor和KafkaRequestHandler 之间数据交换的队列
2、Processors:processor的容器,存放的是processor的id和processor对象的映射
3、Acceptors:acceptor的容器,存放的是EndPoint和acceptor的映射
4、ConnectionQuotas:链接限制器,针对每个IP的链接数进行限制
SocketServer的启动流程如下:
kafka是如何做到百万级高并发低迟延的?

部分源码如下,启动入口:

def startup(startupProcessors: Boolean = true) {
    this.synchronized {
      connectionQuotas = new ConnectionQuotas(maxConnectionsPerIp, maxConnectionsPerIpOverrides)
      createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
      if (startupProcessors) {
        startProcessors()
      }
}

创建Acceptor及Proccessor实现逻辑:

private def startProcessors(processors: Seq[Processor]): Unit = synchronized {
    processors.foreach { processor =>
      KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
        processor).start()
    }
  }

1.2.2、Acceptor 
Acceptor是NIO里面的一个轻量级接入服务,它主要包含如下变量:
1、nioSelector:Java的NIO网络选择器
2、serverChannel:ip和端口绑定到socket
3、Processors:processor的容器,存放的是processor对象
它的主要处理流程如下:
1、将nioSelector注册为OP_ACCEPT
2、轮训从nioSelector读取事件
3、通过RR的模式选择processor
4、接收一个新的链接设置(从serverSocketChannel获取socketChannel,并对它的属性进行设置)
5、移交processor的accept处理
重要逻辑代码如下:

def run() {
    serverChannel.register(nioSelector, SelectionKey.OP_ACCEPT)
    startupComplete()
    try {
      var currentProcessor = 0
      while (isRunning) {
        try {
          val ready = nioSelector.select(500)
          if (ready > 0) {
            val keys = nioSelector.selectedKeys()
            val iter = keys.iterator()
            while (iter.hasNext && isRunning) {
              try {
                val key = iter.next
                iter.remove()
                if (key.isAcceptable) {
                  val processor = synchronized {
//通过RR选择Processor
                    currentProcessor = currentProcessor % processors.size
                    processors(currentProcessor)
                  }
                  accept(key, processor)
                } else
                  throw new IllegalStateException("Unrecognized key state for acceptor thread.")

                // round robin to the next processor thread, mod(numProcessors) will be done later
                currentProcessor = currentProcessor + 1
              } catch {
                case e: Throwable => error("Error while accepting connection", e)
              }
            }
          }
        }

socketChannel的链接设置逻辑:

def accept(key: SelectionKey, processor: Processor) {
    val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
    val socketChannel = serverSocketChannel.accept()
    try {
      connectionQuotas.inc(socketChannel.socket().getInetAddress)
      socketChannel.configureBlocking(false)
      socketChannel.socket().setTcpNoDelay(true)
      socketChannel.socket().setKeepAlive(true)
      if (sendBufferSize != Selectable.USE_DEFAULT_BUFFER_SIZE)
        socketChannel.socket().setSendBufferSize(sendBufferSize)

      processor.accept(socketChannel)
    }

1.2.3、Processor 
Processor的主要职责是将来自客户端的网络链接请求封装成RequestContext并发送给RequestChannel,同时需要对handler处理完的响应回执发送给客户端。它主要包括:
1、newConnections:是一个线程安全的队列,存放从acceptor接收到的网络新链接
2、inflightResponses:已发送客户端的响应,存放了和客户端的链接id(由本地ip、port以及远端ip、port还有额外一个序列值组成)和响应对象的映射
3、responseQueue:是一个阻塞队列,存放handler的响应请求
我们在前面使用的kafka reactor模型架构图上改造一下,就得到如下proccessor的核心逻辑架构:
kafka是如何做到百万级高并发低迟延的?

它的核心逻辑如下几个步骤:
1、proccessor线程从newConnections中轮询获取socketChannel,并将selector监听事件修改为OP_READ;
2、processNewResponses处理新的响应需求,其中类型为SendAction的就是向客户端发送响应,并将发送的响应记录在inflightResponses ,它的核心逻辑是sendResponse如下:

protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
    val connectionId = response.request.context.connectionId
    if (channel(connectionId).isEmpty) {
      response.request.updateRequestMetrics(0L, response)
    }
    if (openOrClosingChannel(connectionId).isDefined) {
      selector.send(responseSend)
      inflightResponses += (connectionId -> response)
    }
  }

3、Selector调用poll从客户端获取到的请求信息,并将获取到的NetworkReceive添加到completedReceives缓存中。
4、而processCompletedReceives负责处理completedReceives中的接收信息,最后封装为RequestChannel.Request,再调用requestChannel将请求添加到发送队列(即requestQueue)当中,源码逻辑如下所示:

private def processCompletedReceives() {
    selector.completedReceives.asScala.foreach { receive =>
      try {
        openOrClosingChannel(receive.source) match {
          case Some(channel) =>
            val header = RequestHeader.parse(receive.payload)
            val context = new RequestContext(header, receive.source, channel.socketAddress,
              channel.principal, listenerName, securityProtocol)
            val req = new RequestChannel.Request(processor = id, context = context,
              startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
            requestChannel.sendRequest(req)
            selector.mute(receive.source)
          case None =>
            throw new IllegalStateException(s"Channel ${receive.source} removed from selector before processing completed receive")
        }
      } catch {
        case e: Throwable =>
          processChannelException(receive.source, s"Exception while processing request from ${receive.source}", e)
      }
    }
  }

1.2.4、RequestChannel
requestChannel承载了kafka请求和响应的所有转发,它包含有如下两个变量:
1、requestQueue:是一个加锁阻塞队列,RequestChannel传输请求和响应信息的重要组件,上面讲到的RequestChannel.Request就是被放入到这个队列中
2、Processors:存储了processorid和processor的映射关系,主要是在response发送的时候从中选择对应的processor
它的两个核心功能是添加请求和发送响应回执,源码逻辑分别如下:

def sendRequest(request: RequestChannel.Request) {
    requestQueue.put(request)
  }

发送响应回执和之前processor略有不同,这里只是将response再添加到responseQueue中,之后由processor轮训从里面取出回执发送到客户端。

def sendResponse(response: RequestChannel.Response) {
   //省略log trace
    val processor = processors.get(response.processor)
    if (processor != null) {
      processor.enqueueResponse(response)
    }
  }

1.2.5、KafkaRequestHandler 
说到KafkaRequestHandler ,首先要往回聊一聊,看看它是如何产生的。它被KafkaRequestHandlerPool所创建,而pool是在kafkaServer启动的时候创建的,源码如下:

val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
  for (i <- 0 until numThreads) {
    createHandler(i)
  }

  def createHandler(id: Int): Unit = synchronized {
    runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
    KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
  }

好了,讲解完KafkaRequestHandler 的创建过程,接下来就是它的处理逻辑了,它的逻辑很简单,流程如下几个步骤:
1、从requestChannel拉取请求
2、判断请求类型,如果是Request类型则调用KafkaApis处理相应的请求

1.3、改进和优化

至此,我们已经将kafka的reactor模型分析完,最后提一个发散性问题,基于kafka实现的这个reactor模型以及源码的分析实现,如果让你来设计,你觉得还有哪些是可能存在性能瓶颈的地方可以做进一步优化的,大家可以在下面留言发表你的看法,下期会把我对这个问题的一些思考分享出来。
kafka是如何做到百万级高并发低迟延的?