找回密码
 会员注册
查看: 22|回复: 0

Kafka源码分析之网络层(一)

[复制链接]

2万

主题

0

回帖

6万

积分

超级版主

积分
64104
发表于 2024-10-11 20:59:22 | 显示全部楼层 |阅读模式
Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。小编会给大家带来几期 Kafka 相关的源码分析文章。这一系列文章是基于kafka 0.9.1版本,今天先来网络层的第一部分-概述和网络层模型实现。PS:丰富的一线技术、多元化的表现形式,尽在“HULK一线技术杂谈”,点关注哦!Kafka的网络层模型概述这个模型其实一点也不神秘,很质朴,很清晰,也很好用,引用源码中的一句话:The threading model is 1 Acceptor thread that handles new connections Acceptor has N Processor threads that each have their own selector and read requests from socketsM Handler threads that handle requests and produce responses back to the processor threads for writing再来张图:Acceptor 作两件事: 创建一堆worker线程;接受新连接, 将新的socket指派给某个 worker线程;Worker线程处理若干个socket,接受请求转给各种handler处理,response再经由worker线程发送回去.总结起来就是个半同步半异步模型(https://github.com/DavidLiuXh/lightningserver).Kafka的网络层模型实现虽然kafka用scala实现,但里面也用了大量的java类, 这部分主要是用了NIO(http://tutorials.jenkov.com/java-nio/index.html);主要实现文件:core/src/main/scal/kafka/network/SocketServer.scala,里面包括了SocketServer, Acceptor, Processor等;数据传输层实现:clients/src/main/java/org/apache/kafka/common/network,里面包括了Channel,TransportLayer,Authenticator等.Kafka网络层一哥:SocketServer类所在文件: core/src/main/scala/kafka/network/SocketServer.scala;统筹组织所有的网络层组件;startup()方法1) 根据配置的若干endpoint创建相应的Acceptor及相关联的一组Processor线程;for (i throw new KafkaException("Socket server failed to bind to %s:%d: %s.".format(socketAddress.getHostName, port, e.getMessage), e) }(2) 开启分配到的若干Processor:this.synchronized { processors.foreach { processor => Utils.newThread("kafka-network-thread-%d-%s-%d".format(brokerId, endPoint.protocolType.toString, processor.id), processor, false).start() } }(3) run()-利用NIO的selector来接收网络连接:var currentProcessor = 0 while (isRunning) { try { val ready = nioSelector.select(500) if (ready > 0) { val keys = nioSelector.selectedKeys() val iter = keys.iterator() while (iter.hasNext & isRunning) { try { val key = iter.next iter.remove() if (key.isAcceptable) accept(key, processors(currentProcessor)) else throw new IllegalStateException("Unrecognized key state for acceptor thread.") // round robin to the next processor thread currentProcessor = (currentProcessor + 1) % processors.length } catch { case e: Throwable => error("Error while accepting connection", e) } } } }这里面最主要的就是 accept(key,processors(currentProcessor))(4) accept: 设置新连接socket的参数后交由Processor处理:socketChannel.configureBlocking(false)socketChannel.socket().setTcpNoDelay(true)socketChannel.socket().setKeepAlive(true)socketChannel.socket().setSendBufferSize(sendBufferSize)processor.accept(socketChannel)Kafka网络层堂口扛把子rocessor类所在文件:core/src/main/scala/kafka/network/SocketServer.scala;从单个连接进来的request都由它处理;每个Processor对象会创建自己的nio selector;每个连接有唯一标识ConnectionIdlocalHostlocalPort-$remoteHostremotePort,这个非常重要!!!accept(socketChannel::SocketChannel):将新连接的SocketChannel保存到并发队列Q1中;run()是核心, 包裹在一个循环里,直接线程退出, while(isRunning){},里面依次调用如下函数1) configureNewConnections():从并发队列Q1里取出SocketChannel,添加到自身的nio selector中,监听读事件; (2) processNewResponses():处理当前所有处理完成的request相应的response, 这些response都是从RequestChannel获得( requestChannel.receiveResponse),根据request的类型来决定从当前连接的nio selector中暂时删除读事件监听/添加写事件/关闭当前连接;(3) selector.poll(300): 这个就不用解释了, 这个selector是对nio selector的又一封装,我们后面一章会讲到,它完成具体的数据接收和发送;(4) selector.completedReceives.asScala.foreach: 处理当前所有的从selector返回的完整request,将其put到RequestChannel的一个阻塞队列里,供应用层获取并处理;同时会暂时删除些连接上的读事件监听: selector.mute(receive.source);(5) selector.completedSends.asScala.foreach: 处理当前所有的从selector返回的写操作,重新将读事件添加到连接的selector监听中 selector.unmute(send.destination);(6) selector.disconnected.asScala.foreach: 处理当前所有将关闭的连接;Kafka网络层中间人:RequestChannel类所在文件: core/src/main/scala/kafka/network/RequestChannel.scala;保存所有从网络层拿到的完整request和需要发送的response;一般是RequestHandler会周期性从RequestChannel获取request,并将response保存回RequestChannel;processNewResponses() 处理RequestChannel中所有的response;Kafka网络层看门小弟:ConnectionQuotas类所在文件:core/src/main/scala/kafka/network/SocketServer.scala;当某个IP到kafka的连接数过多时,将抛出TooManyConnectionsException异常;实现就是通过加锁的加减计数;SocketServer 图解:下一篇咱们来讲在Processor中使用的nio selector的又一封装,负责具体数据的接收和发送。扫描下方二维码了解更多内容
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 会员注册

本版积分规则

QQ|手机版|心飞设计-版权所有:微度网络信息技术服务中心 ( 鲁ICP备17032091号-12 )|网站地图

GMT+8, 2024-12-26 12:30 , Processed in 1.063141 second(s), 26 queries .

Powered by Discuz! X3.5

© 2001-2024 Discuz! Team.

快速回复 返回顶部 返回列表