大量数据通过kafka中转不会低效林改造技术规程吗

金钱鳘又称黄唇鱼,目前已经接近濒危灭绝的状态。
赴日游客越来越多,国内游客成为黑心商家的肥肉。
声明:本文由入驻搜狐公众平台的作者撰写,除搜狐官方账号外,观点仅代表作者本人,不代表搜狐立场。
  作者:陈华华
  来源:/portal/article/709.html
  Kafka 是分布式发布-订阅消息系统,是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。
  现在被广泛地应用于构建实时数据管道和流应用的场景中,具有横向扩展,容错,快等优点,并已经运行在众多大中型公司的生产环境中,成功应用于大数据领域,本文分享一下我所了解的 Kafka。
  Kafka 高吞吐率性能揭秘
  Kafka 的第一个突出特定就是“快”,而且是那种变态的“快”,在普通廉价的虚拟机器上,比如一般 SAS 盘做的虚拟机上,据 LINDEDIN 统计,最新的数据是每天利用 Kafka 处理的消息超过1万亿条,在峰值时每秒钟会发布超过百万条消息,就算是在内存和 CPU 都不高的情况下,Kafka 的速度最高可以达到每秒十万条数据,并且还能持久化存储。
  作为消息队列,要承接读跟写两块的功能,首先是写,就是消息日志写入 Kafka,那么,Kafka 在“写”上是怎么做到写变态快呢?
  Kafka 让代码飞起来之写得快
  首先,可以使用 Kafka 提供的生产端 API 发布消息到 1 个或多个 Topic(主题)的一个(保证数据的顺序)或者多个分区(并行处理,但不一定保证数据顺序)。Topic 可以简单理解成一个数据类别,是用来区分不同数据的。
  Kafka 维护一个 Topic 中的分区 log,以顺序追加的方式向各个分区中写入消息,每个分区都是不可变的消息队列。分区中的消息都是以 k-v 形式存在。
k 表示 offset,称之为偏移量,一个 64 位整型的唯一标识,offset 代表了 Topic 分区中所有消息流中该消息的起始字节位置。
v 就是实际的消息内容,每个分区中的每个 offset 都是唯一存在的,所有分区的消息都是一次写入,在消息未过期之前都可以调整 offset 来实现多次读取。
  以上提到 Kafka “快”的第一个因素:消息顺序写入磁盘。
  我们知道现在的磁盘大多数都还是机械结构(SSD不在讨论的范围内),如果将消息以随机写的方式存入磁盘,就会按柱面、磁头、扇区的方式进行(寻址过程),缓慢的机械运动(相对内存)会消耗大量时间,导致磁盘的写入速度只能达到内存写入速度的几百万分之一,为了规避随机写带来的时间消耗,KAFKA采取顺序写的方式存储数据,如下图所示:
  新来的消息只能追加到已有消息的末尾,并且已经生产的消息不支持随机删除以及随机访问,但是消费者可以通过重置 offset 的方式来访问已经消费过的数据。
  即使顺序读写,过于频繁的大量小 I/O 操作一样会造成磁盘的瓶颈,所以 Kafka 在此处的处理是把这些消息集合在一起批量发送,这样减少对磁盘 IO 的过度读写,而不是一次发送单个消息。
  另一个是无效率的字节复制,尤其是在负载比较高的情况下影响是显着的。为了避免这种情况,Kafka 采用由 Producer,broker 和 consumer 共享的标准化二进制消息格式,这样数据块就可以在它们之间自由传输,无需转换,降低了字节复制的成本开销。
  同时,Kafka 采用了 MMAP(Memory Mapped Files,内存映射文件)技术。很多现代操作系统都大量使用主存做磁盘缓存,一个现代操作系统可以将内存中的所有剩余空间用作磁盘缓存,而当内存回收的时候几乎没有性能损失。
  由于 Kafka 是基于 JVM 的,并且任何与 Java 内存使用打过交道的人都知道两件事:
  ? 对象的内存开销非常高,通常是实际要存储数据大小的两倍;
  ? 随着数据的增加,java的垃圾收集也会越来越频繁并且缓慢。
  基于此,使用文件系统,同时依赖页面缓存就比使用其他数据结构和维护内存缓存更有吸引力:
  ? 不使用进程内缓存,就腾出了内存空间,可以用来存放页面缓存的空间几乎可以翻倍。
  ? 如果 Kafka 重启,进行内缓存就会丢失,但是使用操作系统的页面缓存依然可以继续使用。
  可能有人会问Kafka 如此频繁利用页面缓存,如果内存大小不够了怎么办?
  Kafka 会将数据写入到持久化日志中而不是刷新到磁盘。实际上它只是转移到了内核的页面缓存。
  利用文件系统并且依靠页缓存比维护一个内存缓存或者其他结构要好,它可以直接利用操作系统的页缓存来实现文件到物理内存的直接映射。完成映射之后对物理内存的操作在适当时候会被同步到硬盘上。
  Kafka 让代码飞起来之读得快
  Kafka 除了接收数据时写得快,另外一个特点就是推送数据时发得快。
  Kafka 这种消息队列在生产端和消费端分别采取的 push 和 pull 的方式,也就是你生产端可以认为 Kafka 是个无底洞,有多少数据可以使劲往里面推送,消费端则是根据自己的消费能力,需要多少数据,你自己过来 Kafka 这里拉取,Kafka 能保证只要这里有数据,消费端需要多少,都尽可以自己过来拿。
  ▲零拷贝
  具体到消息的落地保存,broker 维护的消息日志本身就是文件的目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。维护这个公共的格式并允许优化最重要的操作:网络传输持久性日志块。 现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法:FileChannel.transferTo API。
  要理解 senfile 的影响,重要的是要了解将数据从文件传输到 socket 的公共数据路径,如下图所示,数据从磁盘传输到 socket 要经过以下几个步骤:
  ? 操作系统将数据从磁盘读入到内核空间的页缓存
  ? 应用程序将数据从内核空间读入到用户空间缓存中
  ? 应用程序将数据写回到内核空间到 socket 缓存中
  ? 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出
  这里有四次拷贝,两次系统调用,这是非常低效的做法。如果使用 sendfile,只需要一次拷贝就行:允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。
  常规文件传输和 zeroCopy 方式的性能对比:
  假设一个 Topic 有多个消费者的情况, 并使用上面的零拷贝优化,数据被复制到页缓存中一次,并在每个消费上重复使用,而不是存储在存储器中,也不在每次读取时复制到用户空间。这使得以接近网络连接限制的速度消费消息。
  这种页缓存和 sendfile 组合,意味着 Kafka 集群的消费者大多数都完全从缓存消费消息,而磁盘没有任何读取活动。
  ▲批量压缩
  在很多情况下,系统的瓶颈不是 CPU 或磁盘,而是网络带宽,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。所以数据压缩就很重要。可以每个消息都压缩,但是压缩率相对很低。所以 Kafka 使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩。
  Kafka 允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩。
  Kafka 支持 Gzip 和 Snappy 压缩协议。
  Kafka 数据可靠性深度解读
  Kafka 的消息保存在 Topic 中,Topic 可分为多个分区,为保证数据的安全性,每个分区又有多个 Replia。
  ? 多分区的设计的特点:
为了并发读写,加快读写速度;
是利用多分区的存储,利于数据的均衡;
是为了加快数据的恢复速率,一但某台机器挂了,整个集群只需要恢复一部分数据,可加快故障恢复的时间。
  每个 Partition 分为多个 Segment,每个 Segment 有 .log 和 .index 两个文件,每个 log 文件承载具体的数据,每条消息都有一个递增的 offset,Index 文件是对 log 文件的索引,Consumer 查找 offset 时使用的是二分法根据文件名去定位到哪个 Segment,然后解析 msg,匹配到对应的 offset 的 msg。
  &Partition recovery过程&
  每个 Partition 会在磁盘记录一个 RecoveryPoint,,记录已经 flush 到磁盘的最大 offset。当 broker 失败重启时,会进行 loadLogs。首先会读取该 Partition 的 RecoveryPoint,找到包含 RecoveryPoint 的 segment 及以后的 segment, 这些 segment 就是可能没有完全 flush 到磁盘 segments。然后调用 segment 的 recover,重新读取各个 segment 的 msg,并重建索引。每次重启 Kafka 的 broker 时,都可以在输出的日志看到重建各个索引的过程。
  & 数据同步&
  Producer 和 Consumer 都只与 Leader 交互,每个 Follower 从 Leader 拉取数据进行同步。
  如上图所示,ISR 是所有不落后的 replica 集合,不落后有两层含义:距离上次 FetchRequest 的时间不大于某一个值或落后的消息数不大于某一个值,Leader失 败后会从 ISR 中随机选取一个 Follower 做 Leader,该过程对用户是透明的。
  当 Producer 向 Broker 发送数据时,可以通过 request.required.acks 参数设置数据可靠性的级别。
  此配置是表明当一次 Producer 请求被认为完成时的确认值。特别是,多少个其他 brokers 必须已经提交了数据到它们的 log 并且向它们的 Leader 确认了这些信息。
  ?典型的值:
  0: 表示 Producer 从来不等待来自 broker 的确认信息。这个选择提供了最小的时延但同时风险最大(因为当server宕机时,数据将会丢失)。
  1:表示获得 Leader replica 已经接收了数据的确认信息。这个选择时延较小同时确保了 server 确认接收成功。
  -1:Producer 会获得所有同步 replicas 都收到数据的确认。同时时延最大,然而,这种方式并没有完全消除丢失消息的风险,因为同步 replicas 的数量可能是 1。如果你想确保某些 replicas 接收到数据,那么你应该在 Topic-level 设置中选项 min.insync.replicas 设置一下。
  仅设置 acks= -1 也不能保证数据不丢失,当 ISR 列表中只有 Leader 时,同样有可能造成数据丢失。要保证数据不丢除了设置 acks=-1,还要保证 ISR 的大小大于等于2。
  ?具体参数设置:
  request.required.acks:设置为 -1 等待所有 ISR 列表中的 Replica 接收到消息后采算写成功。
  min.insync.replicas:设置为 &=2,保证 ISR 中至少两个 Replica。
  Producer:要在吞吐率和数据可靠性之间做一个权衡。
  Kafka 作为现代消息中间件中的佼佼者,以其速度和高可靠性赢得了广大市场和用户青睐,其中的很多设计理念都是非常值得我们学习的,本文所介绍的也只是冰山一角,希望能够对大家了解 Kafka 有一定的作用。
欢迎举报抄袭、转载、暴力色情及含有欺诈和虚假信息的不良文章。
请先登录再操作
请先登录再操作
微信扫一扫分享至朋友圈
搜狐公众平台官方账号
生活时尚&搭配博主 /生活时尚自媒体 /时尚类书籍作者
搜狐网教育频道官方账号
全球最大华文占星网站-专业研究星座命理及测算服务机构
主演:黄晓明/陈乔恩/乔任梁/谢君豪/吕佳容/戚迹
主演:陈晓/陈妍希/张馨予/杨明娜/毛晓彤/孙耀琦
主演:陈键锋/李依晓/张迪/郑亦桐/张明明/何彦霓
主演:尚格?云顿/乔?弗拉尼甘/Bianca Bree
主演:艾斯?库珀/ 查宁?塔图姆/ 乔纳?希尔
baby14岁写真曝光
李冰冰向成龙撒娇争宠
李湘遭闺蜜曝光旧爱
美女模特教老板走秀
曝搬砖男神奇葩择偶观
柳岩被迫成赚钱工具
大屁小P虐心恋
匆匆那年大结局
乔杉遭粉丝骚扰
男闺蜜的尴尬初夜
客服热线:86-10-
客服邮箱:  作者:陈华华
  来源:/portal/article/709.html
  Kafka 是分布式发布-订阅消息系统,是一个分布式的,可划分的,冗余备份的持久性的日志服务。它主要用于处理活跃的流式数据。
  现在被广泛地应用于构建实时数据管道和流应用的场景中,具有横向扩展,容错,快等优点,并已经运行在众多大中型公司的生产环境中,成功应用于大数据领域,本文分享一下我所了解的 Kafka。
  Kafka 高吞吐率性能揭秘
  Kafka 的第一个突出特定就是“快”,而且是那种变态的“快”,在普通廉价的虚拟机器上,比如一般 SAS 盘做的虚拟机上,据 LINDEDIN 统计,最新的数据是每天利用 Kafka 处理的消息超过1万亿条,在峰值时每秒钟会发布超过百万条消息,就算是在内存和 CPU 都不高的情况下,Kafka 的速度最高可以达到每秒十万条数据,并且还能持久化存储。
  作为消息队列,要承接读跟写两块的功能,首先是写,就是消息日志写入 Kafka,那么,Kafka 在“写”上是怎么做到写变态快呢?
  Kafka 让代码飞起来之写得快
  首先,可以使用 Kafka 提供的生产端 API 发布消息到 1 个或多个 Topic(主题)的一个(保证数据的顺序)或者多个分区(并行处理,但不一定保证数据顺序)。Topic 可以简单理解成一个数据类别,是用来区分不同数据的。
  Kafka 维护一个 Topic 中的分区 log,以顺序追加的方式向各个分区中写入消息,每个分区都是不可变的消息队列。分区中的消息都是以 k-v 形式存在。
k 表示 offset,称之为偏移量,一个 64 位整型的唯一标识,offset 代表了 Topic 分区中所有消息流中该消息的起始字节位置。
v 就是实际的消息内容,每个分区中的每个 offset 都是唯一存在的,所有分区的消息都是一次写入,在消息未过期之前都可以调整 offset 来实现多次读取。
  以上提到 Kafka “快”的第一个因素:消息顺序写入磁盘。
  我们知道现在的磁盘大多数都还是机械结构(SSD不在讨论的范围内),如果将消息以随机写的方式存入磁盘,就会按柱面、磁头、扇区的方式进行(寻址过程),缓慢的机械运动(相对内存)会消耗大量时间,导致磁盘的写入速度只能达到内存写入速度的几百万分之一,为了规避随机写带来的时间消耗,KAFKA采取顺序写的方式存储数据,如下图所示:
  新来的消息只能追加到已有消息的末尾,并且已经生产的消息不支持随机删除以及随机访问,但是消费者可以通过重置 offset 的方式来访问已经消费过的数据。
  即使顺序读写,过于频繁的大量小 I/O 操作一样会造成磁盘的瓶颈,所以 Kafka 在此处的处理是把这些消息集合在一起批量发送,这样减少对磁盘 IO 的过度读写,而不是一次发送单个消息。
  另一个是无效率的字节复制,尤其是在负载比较高的情况下影响是显着的。为了避免这种情况,Kafka 采用由 Producer,broker 和 consumer 共享的标准化二进制消息格式,这样数据块就可以在它们之间自由传输,无需转换,降低了字节复制的成本开销。
  同时,Kafka 采用了 MMAP(Memory Mapped Files,内存映射文件)技术。很多现代操作系统都大量使用主存做磁盘缓存,一个现代操作系统可以将内存中的所有剩余空间用作磁盘缓存,而当内存回收的时候几乎没有性能损失。
  由于 Kafka 是基于 JVM 的,并且任何与 Java 内存使用打过交道的人都知道两件事:
  ? 对象的内存开销非常高,通常是实际要存储数据大小的两倍;
  ? 随着数据的增加,java的垃圾收集也会越来越频繁并且缓慢。
  基于此,使用文件系统,同时依赖页面缓存就比使用其他数据结构和维护内存缓存更有吸引力:
  ? 不使用进程内缓存,就腾出了内存空间,可以用来存放页面缓存的空间几乎可以翻倍。
  ? 如果 Kafka 重启,进行内缓存就会丢失,但是使用操作系统的页面缓存依然可以继续使用。
  可能有人会问Kafka 如此频繁利用页面缓存,如果内存大小不够了怎么办?
  Kafka 会将数据写入到持久化日志中而不是刷新到磁盘。实际上它只是转移到了内核的页面缓存。
  利用文件系统并且依靠页缓存比维护一个内存缓存或者其他结构要好,它可以直接利用操作系统的页缓存来实现文件到物理内存的直接映射。完成映射之后对物理内存的操作在适当时候会被同步到硬盘上。
  Kafka 让代码飞起来之读得快
  Kafka 除了接收数据时写得快,另外一个特点就是推送数据时发得快。
  Kafka 这种消息队列在生产端和消费端分别采取的 push 和 pull 的方式,也就是你生产端可以认为 Kafka 是个无底洞,有多少数据可以使劲往里面推送,消费端则是根据自己的消费能力,需要多少数据,你自己过来 Kafka 这里拉取,Kafka 能保证只要这里有数据,消费端需要多少,都尽可以自己过来拿。
  ▲零拷贝
  具体到消息的落地保存,broker 维护的消息日志本身就是文件的目录,每个文件都是二进制保存,生产者和消费者使用相同的格式来处理。维护这个公共的格式并允许优化最重要的操作:网络传输持久性日志块。 现代的 unix 操作系统提供一个优化的代码路径,用于将数据从页缓存传输到 socket;在 Linux 中,是通过 sendfile 系统调用来完成的。Java 提供了访问这个系统调用的方法:FileChannel.transferTo API。
  要理解 senfile 的影响,重要的是要了解将数据从文件传输到 socket 的公共数据路径,如下图所示,数据从磁盘传输到 socket 要经过以下几个步骤:
  ? 操作系统将数据从磁盘读入到内核空间的页缓存
  ? 应用程序将数据从内核空间读入到用户空间缓存中
  ? 应用程序将数据写回到内核空间到 socket 缓存中
  ? 操作系统将数据从 socket 缓冲区复制到网卡缓冲区,以便将数据经网络发出
  这里有四次拷贝,两次系统调用,这是非常低效的做法。如果使用 sendfile,只需要一次拷贝就行:允许操作系统将数据直接从页缓存发送到网络上。所以在这个优化的路径中,只有最后一步将数据拷贝到网卡缓存中是需要的。
  常规文件传输和 zeroCopy 方式的性能对比:
  假设一个 Topic 有多个消费者的情况, 并使用上面的零拷贝优化,数据被复制到页缓存中一次,并在每个消费上重复使用,而不是存储在存储器中,也不在每次读取时复制到用户空间。这使得以接近网络连接限制的速度消费消息。
  这种页缓存和 sendfile 组合,意味着 Kafka 集群的消费者大多数都完全从缓存消费消息,而磁盘没有任何读取活动。
  ▲批量压缩
  在很多情况下,系统的瓶颈不是 CPU 或磁盘,而是网络带宽,对于需要在广域网上的数据中心之间发送消息的数据流水线尤其如此。所以数据压缩就很重要。可以每个消息都压缩,但是压缩率相对很低。所以 Kafka 使用了批量压缩,即将多个消息一起压缩而不是单个消息压缩。
  Kafka 允许使用递归的消息集合,批量的消息可以通过压缩的形式传输并且在日志中也可以保持压缩格式,直到被消费者解压缩。
  Kafka 支持 Gzip 和 Snappy 压缩协议。
  Kafka 数据可靠性深度解读
  Kafka 的消息保存在 Topic 中,Topic 可分为多个分区,为保证数据的安全性,每个分区又有多个 Replia。
  ? 多分区的设计的特点:
为了并发读写,加快读写速度;
是利用多分区的存储,利于数据的均衡;
是为了加快数据的恢复速率,一但某台机器挂了,整个集群只需要恢复一部分数据,可加快故障恢复的时间。
  每个 Partition 分为多个 Segment,每个 Segment 有 .log 和 .index 两个文件,每个 log 文件承载具体的数据,每条消息都有一个递增的 offset,Index 文件是对 log 文件的索引,Consumer 查找 offset 时使用的是二分法根据文件名去定位到哪个 Segment,然后解析 msg,匹配到对应的 offset 的 msg。
  &Partition recovery过程&
  每个 Partition 会在磁盘记录一个 RecoveryPoint,,记录已经 flush 到磁盘的最大 offset。当 broker 失败重启时,会进行 loadLogs。首先会读取该 Partition 的 RecoveryPoint,找到包含 RecoveryPoint 的 segment 及以后的 segment, 这些 segment 就是可能没有完全 flush 到磁盘 segments。然后调用 segment 的 recover,重新读取各个 segment 的 msg,并重建索引。每次重启 Kafka 的 broker 时,都可以在输出的日志看到重建各个索引的过程。
  & 数据同步&
  Producer 和 Consumer 都只与 Leader 交互,每个 Follower 从 Leader 拉取数据进行同步。
  如上图所示,ISR 是所有不落后的 replica 集合,不落后有两层含义:距离上次 FetchRequest 的时间不大于某一个值或落后的消息数不大于某一个值,Leader失 败后会从 ISR 中随机选取一个 Follower 做 Leader,该过程对用户是透明的。
  当 Producer 向 Broker 发送数据时,可以通过 request.required.acks 参数设置数据可靠性的级别。
  此配置是表明当一次 Producer 请求被认为完成时的确认值。特别是,多少个其他 brokers 必须已经提交了数据到它们的 log 并且向它们的 Leader 确认了这些信息。
  ?典型的值:
  0: 表示 Producer 从来不等待来自 broker 的确认信息。这个选择提供了最小的时延但同时风险最大(因为当server宕机时,数据将会丢失)。
  1:表示获得 Leader replica 已经接收了数据的确认信息。这个选择时延较小同时确保了 server 确认接收成功。
  -1:Producer 会获得所有同步 replicas 都收到数据的确认。同时时延最大,然而,这种方式并没有完全消除丢失消息的风险,因为同步 replicas 的数量可能是 1。如果你想确保某些 replicas 接收到数据,那么你应该在 Topic-level 设置中选项 min.insync.replicas 设置一下。
  仅设置 acks= -1 也不能保证数据不丢失,当 ISR 列表中只有 Leader 时,同样有可能造成数据丢失。要保证数据不丢除了设置 acks=-1,还要保证 ISR 的大小大于等于2。
  ?具体参数设置:
  request.required.acks:设置为 -1 等待所有 ISR 列表中的 Replica 接收到消息后采算写成功。
  min.insync.replicas:设置为 &=2,保证 ISR 中至少两个 Replica。
  Producer:要在吞吐率和数据可靠性之间做一个权衡。
  Kafka 作为现代消息中间件中的佼佼者,以其速度和高可靠性赢得了广大市场和用户青睐,其中的很多设计理念都是非常值得我们学习的,本文所介绍的也只是冰山一角,希望能够对大家了解 Kafka 有一定的作用。
声明:本文由入驻搜狐公众平台的作者撰写,除搜狐官方账号外,观点仅代表作者本人,不代表搜狐立场。Kafka技术内幕: 生产者_架构师_传送门
Kafka技术内幕: 生产者
@xuyuantree
架构师(JiaGouX)我们都是架构师!概述消息系统通常都会由生产者,消费者,Broker三大部分组成,生产者会将消息写入到Broker,消费者会从Broker中读取出消息,不同的MQ实现的Broker实现会有所不同,不过Broker的本质都是要负责将消息落地到服务端的存储系统中。不管是生产者还是消费者对于Broker而言都是客户端,只不过一个是生产消息一个是消费消息。图2-1中生产者和消费者都是通过客户端请求的方式发送给服务端去执行存储消息或者获取消息的流程,在客户端和服务端这一层都有一个连接对象专门负责发送请求和接收请求,具体步骤如下:生产者客户端应用程序产生消息客户端连接对象将消息包装到请求中发送到服务端服务端的入口也有一个连接对象负责接收请求,并将消息以文件的形式存储起来服务端返回响应结果给生产者客户端消费者客户端应用程序消费消息客户端连接对象将消费信息也包装到请求中发送给服务端服务端从文件存储系统中取出消息服务端返回响应结果给消费者客户端客户端将响应结果还原成消息并开始处理消息图2-1 客户端和服务端交互Kafka作为一个分布式的消息存储系统,生产者客户端需要将消息传给Kafka集群完成消息存储,本章从Kafka的消费者实现为入口,在源码分析的过程中,思考以下几个问题是如何实现的:生产者是如何确保将消息以分布式的方式存储到Kafka集群?生产者客户端是如何组织消息,发送消息,并接收服务端的响应?客户端和服务端的通信机制,如何有效运用线程模型更高效地通信本章的着重点主要在于客户端和服务端的网络通信流程,暂时还没有涉及到Kafka的服务端具体实现。因为对于任何的分布式系统而言,必须有一套负责不同节点之间数据传输的网络层通信机制,这套底层的框架要能够处理协议的编解码,客户端和服务端的请求发送和接收等等。在Java中的网络编程中最早是Socket模式,后来进化出了Selector选择器模式,再结合上队列模型,缓冲区机制,就可以设计出一套适合自己系统的网络层通信协议框架。虽然通信模型和服务端的架构实现上没有太大的关联,不过可以在这最底层的框架上添加一些额外的功能比如超时重试,序列化等功能,那么服务端就可以更专注地处理主体业务逻辑,而不需要花太多的精力去关注网络层的各种异常情况。在分布式的系统中,协议是由服务端定制的,客户端只要遵循这种协议发送请求,服务端就可以确保可以正常地接收并处理客户端的请求。所以实际上客户端的实现可以由不同的语言自己去实现,官方的wiki中列出了目前已经支持的绝大多数语言。因为对于不同语言都有自己的网络层编程API,比如Golang使用channel通信,Akka使用Actor方式传递消息,它们就可以充分利用自己的语言特性去实现不同的客户端。Kafka初期使用Scala编写,因此早期scala版本的producer、consumer和服务端的实现都放在core包下,最新的客户端使用了Java重新实现,被放在了clients包下。本章我们主要分析如下几个部分的内容:新版本的Producer客户端实现(Java)旧版本的Producer客户端实现(Scala)服务端的网络连接实现(SocketServer)双端队列InFlightRequests队列图2-32是记录收集器的batches队列和NetworkClient的inFlightRequests队列的对比,记录收集器双端队列中的元素只保存数据,没有状态信息,所以针对这个队列的操作只是简单地追加到队列最后一个,取出时取的是队列第一个元素。而inFlightRequests队列中的元素是客户端请求对象,它是有状态的,比如这个请求是否已经发送完成就是一种状态。请求发送完成并不代表就可以从队列中移除,不过如果客户端不需要响应结果发送完成则是可以删除的。图2-32 InFlightRequests双端队列实际上如果客户端请求添加到队列尾部也是可以的,如图2-33只不过对应的peek和poll的顺序都要做出改变:图2-33 添加最新元素到双端队列的两种方式图2-34中以新请求添加到队列头部为例,模拟了多个请求是如何加入队列以及完成时如何从队列中移除,其中[r1,r2,r3]需要响应结果,而r4不需要响应结果,假设[r1-r4]四个请求都属于某个节点,所以客户端会按照顺序依次加入到队列中。不过后一个请求必须要保证前一个请求发送到服务端节点后才可以进入队列等待发送,当收到响应请求完成时,r4是从队列头部被删除,而其他请求则是从队列尾部删除。图2-34 双端队列操作客户端请求的生命周期客户端在和服务端某个节点建立连接时,会根据客户端中目前的请求队列判断第一个请求是否已经完成来判断这个节点是否可以发送更多的请求canSendMore。那么客户端请求什么时候才算是completed?注意虽然队列中保存的是ClientRequest,不过在add和peek时都是取出ClientRequest里的RequestSend对象。RequestSend到Send的继承体系是RequestSend->NetworkSend->ByteBufferSend->Send。对于ByteBufferSend而言完成的条件是没有要发送的数据了,即缓冲区中的数据都写完了。所以这里请求完成指的是当前发送请求已经成功发送到服务端了,但并不需要等待这个请求收到响应结果。即使在同一个目标节点的同一个队列中,多个不同ClientRequest请求也是有顺序的,在前面的分析中已经有两个地方限制了客户端请求并不是可以随便添加到队列中的:在准备连接时,queue.peekFirst().request().completed()=true可以连接发送请求后,KafkaChannelsetSend也要确保send!=null,一个KafkaChannel只允许同时运行一个Send其中第二个条件也将直接影响第一个条件,如果第一个请求没有发送完毕,它还会存在于KafkaChannel中,此时来了第二个请求,如果不加以限制即使send!=null,也要将第二个请求设置到KafkaChannel中,这样第一个请求返回的时候却返回了第二个请求,因为Send已经被第二个请求更新了,所以这是有问题的。不过ClientRequest.RequestSend完成,并不表示这个ClientRequest在NetworkClient中就完成了,客户端的请求被发送到服务端,还需要等待收到服务端的响应结果。所以inFlightRequests表示正在进行中还没有完成的请求,下面几种场景都表示还没有完成的ClientRequest:ClientRequest请求等待发送,ClientRequest请求正在发送,ClientRequest请求已经发送(这时RequestSend完成),ClientRequest对应的请求还未收到响应结果,图2-35是ClientRequest在InFlightRequests中的生命周期。图2-35 客户端请求在队列中的生命周期客户端请求发送和接收示例我们从发送线程开始,举例多个请求的发送和接收,以及在队列中的操作。发送线程第一次运行在准备工作时选择readyNodes,然后为已经准备好的节点创建连接和客户端请求ClientRequest,调用NetworkClient.send会将请求先加入请求对应的目标节点的队列中,然后设置到KafkaChannel中,每个KafkaChannel只有一个正在进行中的Send,如果已经存在Send(比如正在进行中的客户端请求没有被发送完成就不会被重置为空)则不允许再次调用。当选择器轮询时会将选择到的KafkaChannel中的Send通过底层的SocketChannel发送给服务端。图2-36模拟了第一个请求加入队列后的工作过程。图2-36 NetworkClient.send包括入队列和调用Selector.send假设第一个请求还没有发送完成比如还在步骤2/3时,发送线程第二次运行准备发送第二批数据(假设这两个请求都是要发送到同一个目标节点),由于队列中的第一个请求还没有完成,canSendMore返回false,在准备工作时就会将其从readyNodes中移除,这样就不会为这个节点创建新的ClientRequest,即第二个请求根本就不会被生成。即使没有canSendMore这一层判断,假设创建了第二个请求,当准备调用NetworkClient.send时,可是又遇到了第二个拦路虎,因为KafkaChannel.setSend要求send不能为空时才可以设置,而现在send已经被第一个客户端请求占着不放,还没有重置,所以客户端请求还是无法被成功地设置。这样就存在一个问题,请求已经被添加到队列中,但是却没办法设置到KafkaChannel中,只能等下次再调用一次NetworkClient.send,不过这样请求队列针对同一个请求就被加入多次了,所以能够尽早在第一道门框拦下第二个请求就不要在放进来了。所以新的请求被创建的时机必须等到队列头部第一个请求已经完成才会创建,而且此时第一个请求在完成的时候就设置了send=null,新创建的请求也可以被成功地设置到KafkaChannel中,所以说如果第一个条件满足(canSendMore=true)后通常第二个条件也是满足(send=null),图2-37是请求[R2,R3]分别在每次允许加入队列时加入到队列头部,图2-38是不需要响应结果的请求R3从队列头部删除请求,图2-39是需要响应结果的请求[R1,R2,R4]分别收到响应结果后从队列尾部删除请求。图2-37 往队列中添加一个新的请求必须确保上一个请求已经完成图2-38 不需要响应结果的请求发送完成从队列头部删除图2-39 需要响应结果的请求收到响应后从队列尾部删除排队的示例这里的双端队列和现实世界的排队方式是类似的,如图2-40以去银行办理业务为例,排队机给每个人一个号码表示ClientRequest请求的顺序,只有上一个号码的人办理完了业务,下一个人才能办理。为了和这里的NetworkClient语义相同,我们稍微修改下排队规则,假设办理业务分成三个步骤:告诉业务员要办理什么业务,业务员处理业务,业务员完成业务,这些步骤都是可以并行执行的,而且执行完一个小步骤都要回到自己的座位上继续等待,假设只有一个业务办理窗口时(不过其实你不用担心,假设这个业务员只是一个入口而已,他的后台即服务端是开着很多线程在处理的)。第一个人开始办理业务时首先加入到inFlightRequests,并告诉业务员要取钱,业务员收到指令后,记录了这个信息(可以把业务员看做专门负责接收业务指令,但是不办理具体的业务),第一个人回到自己座位,他还不能离开大厅,因为他只是传达了这个指定,但是钱还没取到;因为此时队列中的第一人已经完成了发送指令请求,第二个人可以办理了,同样先加入到inFlightRequests队列中,然后第二个人说要改密码,业务员收到指令后同样不真正执行改密码的命令,但是如果这个时候第三个人等不住了,还没等第二个人传达完指令就想强行插队,对不起,请稍等下!所以inFlightRequest表示已经发送完请求,或者正在发送请求的,但是他们都还不能离开大厅,因为还没有收到响应结果。因为每个请求发送给业务员都是有顺序的,所以加入到inFlightRequest中的ClientRequest也都是有顺序的,这个队列是个双端队列,队列头部是最近加入的请求,队列尾部是最早加入的请求,如果队列第一个元素的请求还没有发送完成,不允许下一个请求加入队列中,所以新加入队列的元素,在这之前的请求一定都是已经发送完成了,否则他就不可能被加入队列中了。图2-40 银行办理业务与队列图2-40中虽然符合新请求添加到队列头部(我们把尾部设置为面对业务员),按照排队的方式理解起来也比较直观,第一个请求先于第二个请求被处理,不过似乎业务员总是面对着第一个请求。为了更好地理解这个双端队列图2-41中分成两个队列,排队队列负责接收请求,处理队列负责处理收到的请求,请求按照发送顺序加入排队队列,一旦请求发送完毕,业务员就会把收到的请求放入另一个队列中,这样两种队列其实都满足了排队论。不过双端队列本来就可以在头尾同时操作,所以实际上只需要一个队列即可。图2-41 排队队列和处理队列现在如果从一个业务窗口推到多个窗口,如图2-42就类似于客户端可以向多个服务端节点同时发送请求,每个服务端目标节点都有一个双端队列,每个队列的处理方式和上面一个窗口都是类似的,只不过现在每个请求都携带了自己将会被排队到指定窗口。图2-42 多个窗口的队列假设第一个人的业务被成功受理,并且也成功取到钱了,他就可以拿着钱开开心心地离开银行大厅了,现在他的业务已经全部办理好了,就会从inFlightRequests中移除了,因为inFlightRequests中保存的是发送完或正在发送请求,但是没有收到响应结果,一旦收到响应结果就不应该继续在大厅里呆下去了,毕竟inFlightRequests的容量也是有限制的,如果银行大厅座位都做满了,说明请求量太大了,所以取完钱就赶紧回家。对于需要响应的请求,请求在服务端慢吞吞地处理,返回也是有顺序的,也是说服务端是按照客户端请求的顺序处理的,只有第一个请求返回后才会接着返回第二个请求结果,并不会出现第二个请求先于第一个请求返回结果给客户端。所以对于不需要响应结果的客户端请求如果在handleCompletedSends中没有删除而是等到handleCompletedReceives才删除显然是不公平的,因为他本来可以立即返回,但是却要等到他前面的人都收到结果后才能轮到他。比如超市通常会设置无购物快速通道,如果顾客没有购买任何东西不需要在购物通道上排队就可以快速出去。如果客户端请求不需要响应就会像上面那样,在发送完就被清理掉了,这是因为客户端既然不想要响应结果,那么就让请求越快完成越好。通过这种快速清理的方式确保了下一个请求进来之前,保存在队列中的一定都是需要响应的:因为上一个请求是不需要响应的,那么在下一个请求加入队列头部之前,上一个请求已经从队列头部移除掉了。以超市为例,进入购物通道排队的人一定知道排队的人都是有购物的,没有人那么傻没有买任何东西却还傻傻地在排队。同样以银行办理业务为例,假设有些人是来咨询业务的,业务员是立即可以回答的,不需要和后台的服务端交互(或者尽管有交互,但是客户端并不关心这个结果,在你出来结果之后,他可能已经都走了)。这样的客户端请求也要排队,在准备发送请求时加入队列头部第一个元素,完成时就可以立即从队列头部移除,不需要进入处理队列了。现在Java版本的生产者客户端已经分析完毕,表2-4总结了客户端发送过程涉及到的主要组件和其用途:表2-4 Java版本的生产者主要组件本章总结本章主要分析了两种版本的生产者客户端以及服务端的网络层实现,重点介绍了客户端的NetworkClient和服务端的SocketServer,Java版本的客户端和服务端的Processor都使用了Selector选择器模式和KafkaChannel,而Scala版本的客户端则使用比较原始的BlockingChannel。在客户端服务端的通信模型中,通常一个客户端会连接到多个服务端,一个服务端也会接受多个客户端的连接,所以使用Selector模式可以使得网络通信更加高效,在服务端还运用了Reactor模式将IO部分和业务处理部分的线程进行分离。除此之外,客户端和服务端在很多地方都运用了队列这种数据结构来对请求或者响应进行排队,队列是一种保证数据被有序地处理并且能够缓存的结构。表2-8总结了Scala版本的生产者客户端和服务端中使用队列的地方,这里并不包括Java版本的生产者使用更高级的双端队列。在客户端要向服务端发送消息时我们会获取Cluster集群状态(Java版本)/集群元数据TopicMetadata(Scala版本),为消息选择Partition,选择Partition的Leader作为目标节点,在服务端SocketServer会接收客户端发送的请求交给Handler和KafkaApis处理,具体和消息相关的处理逻辑由KafkaApis以及KafkaServer中的其他组件一起完成。图2-57是Kafka服务端的内部组件图,网络层包括一个Acceptor线程和多个Processor线程;API层的多个API线程指的是多个KafkaRequestHandler线程,网络层和API层中间有一个RequestChannel,它是请求和响应的数据交换中转站;API层和日志子系统有关联因为API层的请求要读取或写入日志文件,Replication子系统主要的管理类是ReplicaManager,而KafkaApis和它有直接的关联;一个KafkaBroker和其他Broker以及依赖的ZK也有关联,这些关联系统在后续的章节中都会分析到。图2-57 KafkaBroker的内部组件图片引自:https://cwiki.apache.org/confluence/display/KAFKA/Index本章分析的Producer包括后面要分析的Consumer都不是作为Kafka的内置服务,而是一种客户端(所以它们都在clients包),客户端可以独立于Kafka集群,因此开发客户端应用程序时只需要提供一个Kafka集群的地址即可,说明客户端可以和Kafka集群独立开来,图2-58展示了一种典型的生产者、消费者和Kafka集群交互方式,其中Kafka集群还会和ZooKeeper互相通信。图2-58 生产者、消费者、Kafka集群交互客户端有发送和接收请求,服务端同样也有接收和发送的逻辑,因为对于I/O来说是双向的:客户端发送请求,就意味着服务端要接收请求,同样服务端对请求作出响应并发送响应结果给客户端,客户端就要接收响应。接下来我们会分析客户端发送的请求在服务端是怎么被KafkaApis处理的。来源:zqhxuyuan.github.io原文:http://zqhxuyuan.github.io//-Kafka-Book-Sample/#%E7%AC%AC%E4%BA%8C%E7%AB%A0_%E7%94%9F%E4%BA%A7%E8%80%85如有侵权或不周之处,敬请劳烦联系若飞(微信:)马上删除,谢谢!·END· 架构师我们都是架构师!
觉得不错,分享给更多人看到
架构师 微信二维码
分享这篇文章
5月30日 20:56
架构师 最新头条文章
架构师 热门头条文章

我要回帖

更多关于 低效性呼吸形态 的文章

 

随机推荐