vlambda博客
学习文章列表

一文总结 Apache Kafka 知识要点

kafka 的定位是一个分布式的流处理平台。提供消息订阅-发布、消息容错存储等功能。本文尝试将 kafka 重要知识点进行总结。

kafka 的设计动机是作为一个各类大公司实时数据的统一处理平台。想要满足几点特性:

  • 高吞吐,支撑类似于实时日志聚合的功能;

  • 消息存储高可用,能够支持离线处理系统所需要的阶段性的大量日志;

  • 低延迟,作为一个消息系统,应该能像传统消息系统一样做到低延迟;

1. 架构模型基本概念

  • Broker,消息中间件处理节点,一个 Kafka 节点就是一个 broker,一个或者多个 Broker 可以组成一个 Kafka 集群

  • Topic,主题,Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都需要指定一个topic

  • Producer,消息生产者,向Broker发送消息的客户端

  • Consumer,消息消费者,从Broker读取消息的客户端

  • ConsumerGroup,每个Consumer属于一个特定的Consumer Group,一条消息可以发送到多个不同的Consumer Group,但是一个Consumer Group中只能有一个Consumer能够消费该消息

  • Partition,一个topic可以分为多个partition,每个partition内部是有序的。一个 partition 只能被 consumer group 中的至多一个 consumer 所绑定消费,正常情况下,期望 partition 的数量能够与 consumer group 中 consumer 的数量相等,这样能够做到消息的均衡消费

2. 常见特性

2.1 消息投递方式

这类问题可以分成两个部分分析:发送消息的持久性保障,以及在消费消息时的保证措施。

  • 在新版本的 kafka 中,对于 producer 来说,可以设置同一个消息不会被多次投递,这依赖于 broker 给每个 producer 一个编号,以及对 message 也有相关的编号,所以会对相同编号的消息进行去重。

  • kafka 同样支持 transaction message,即 batch message 要么全部写入成功,要么全部写入失败。这个可能会带来一些延迟,如果说不一定需要这个的话,就可以直接异步发送即可。

at most once

消息可能丢失,但是绝对不会多次发送

对于 consumer 来说,如果在消息处理结束之前就更新了 offset,则可能因为 customer 挂掉出现消息丢失的情况。

at least once

消息可能重复,但是绝对不会被丢失

对于 consumer 来说,如果在消息处理结束之后再更新 offset,则可能出现消息处理完了,但 offset 未更新情况,这就需要系统能够接受幂等处理,或者将重复消息进行抛弃。

exactly once

消息被发送有且仅有一次

  • 如果 consumer 在读取消息之后又重新把消息发送给 kafka 的另一个 topic,则可以将这些消息 以及 更新 offset 的消息作为一个 transaction 进行发送。如果事务等级设置的是 read commited,那么消息未成功提交之前,用户就不会获取到相关消息。

  • 如果 consumer 读取消息之后,是发送到其他不支持 2PC 的系统中,往往可以采取的一种方式是,将 offset 与数据一块儿发送给相关系统,这样即使消息没有 unique id,也能通过 offset 来判断是否是同一条消息。

2.2 数据存储与复制

replication

  • kafka 的 topic 的 partition 可以设置 replica,各个replica 之间会出现一个 leader,其他的作为 follower,所有的读写都会对 leader partition 进行操作。各个 follower replica 也类似于 consumer 一样,会对 leader 的消息进行消费,然后追加的自己对应的 log file 中。

  • 分布式系统中,节点是否是 alive 需要一个比较明确的定义。kafka 中 follower 定义是,zk 中有相关的状态,以及 follower 并没有落后 leader 太多。

数据压缩

  • kafka 允许对过往数据进行压缩。主要是依据 message 的 key 进行,如果某个 key 的 message 在以后出现了一个新的值,在做数据压缩的时候会将之前的变动内容抛弃。

存储方式

  • kafka 在存储是以 partition 为单位,每个 partition 包含一组 Segment 文件以及一组索引文件,消息文件和索引文件一一对应,文件名就是这个文件中第一条消息的索引序号。索引文件中保存了索引的序号和对应的消息在 segment 文件中的位置。在设计索引上,kafka 为了节省空间,不会为每条消息都创建索引,而是每隔几条消息创建一条索引,采用的是稀疏索引的方式。

  • kafka 写入消息的时就是在 segment 文件尾部连续追加写入,一个文件写到了指定大小(一般是 1G )再写下一个文件。查找消息时,首先根据文件名找到的索引文件,然后用二分法查找索引在里面找到离目标消息最近的索引,再去消息文件中找到这条最近的索引指向的消息位置,然后从这个位置开始顺序遍历消息文件找到目标消息。

3. 设计特点

3.1 充分利用文件系统缓存

  • 随机写与顺序写的差别可能达到 5000+ 倍,顺序读在某些情况下甚至能够比随机访问内存还要快。

  • 文件系统在读取的时候,一般会提供缓存,如果这时还在应用中增加额外的缓存,会重复消耗资源。同时,因为 kafka 基于 java 实现, JVM 的内存使用并不节省,同时在进行 GC 的时候,大的内存消耗也会浪费更多的 GC 时间。

  • 有些 message queue 在实现的时候,使用 btree 存储相关 metadata,速度大概在 O(logN),但是,在磁盘访问时,O(logn)是个挺大的数量级了,而且只能每次进行单个 seek,限制了并发,比如想要读与写并发。kafka 选择的数据结构是直接一个 linear log,从前面读取,往后面追加,时间复杂度可以在 O(1),同时还可以支持并发。

  • 只需要设计好的磁盘存储结构,磁盘的读取也能够像网络一样快。单个 partition 对应到同一个 log file,分为好几个 segment 文件,文件按照顺序进行写入。不论读增删都是顺序处理,省去了很多磁盘扫描的时间。

3.2 batch 处理

  • 为了加快效率,消息的读取,以及落地都会采用 batch 的方式进行处理,即多读,以及 批次 flush 处理。

  • 同时为了节省网络带宽,会对发送的数据进行压缩,log 会将压缩后的数据进行存储,comsumer 在读取之后再进行解压。

3.3 数据拷贝利用 sendFile 机制

  • consumer, broker, producer 都使用同样的日志格式,各个组件在处理的时候不用做额外的动作

  • 文件数据不用被应用读出来之后,再重新通过 socket 发送出去,利用 sendfile 系统调用,可以直接将文件内容通过 pagecache 转发到 socket 中,只用一次 copy 即可完成数据发送。

3.4 failover recover

  • partition leader 的选举,不适合采用 raft 等类似投票协议。这类协议同等 replica 的前提下需要的消息数更多,更占空间,吞吐量也会受到影响。因为3个副本只能支持 1 个节点宕机

  • kafka 从 partition 的 in-sync server 中直接选取 leader 进行处理。leader 由 broker 中推举出来的 controller 进行选择,如果某个 broker 挂掉,则交由 controller 节点重新选择合适的 partition leader.

  • 如果所有的 partition 挂掉,则可以在可用性和一致性之间做一个 tradeoff,要么从 in-sync server 中选取,要么从任意 replica 中选取

3.5 consumer reblance

  • kafka 中,默认的消费策略是单个 partition 只能被 consumser group 中至多一个 partition 消费

  • broker 会充当 group coordinate 的角色,会负责管理该 consumer group 的所有心跳,维护 consumer 的列表,如果有新增的或者丢失的 consumer,则会出发 reblance 消息

  • 每个 consumer group 中会包括一个 group leader,group leader 一般是加入到 group 中的第一个节点,group leader会知道 group 中所有 consumer 的信息,然后根据 partition 信息进行相关分配,然后将分配信息交给 coordinate 进行广播及处理。

  • 之所以会将 partition assignment 放到 client 来做这类事情,主要是为了服务拆分。将稳定的功能继续放到 broker 中,将这些易变的功能拆分出来,可以增加分配策略的灵活性和可拓展性,同时不需要对 broker 进行变动或者 rebooting。

4. Ref

  • https://kafka.apache.org/documentation/#design