vlambda博客
学习文章列表

kafka-2-生产者分区机制/消息压缩/无丢失配置介绍

分区机制

为什么分区

上文提过,客人太多一个房间接客太慢,可以多开几个房间。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。如下图:


分区策略介绍:

轮询:

kafka-2-生产者分区机制/消息压缩/无丢失配置介绍


随机数:

nextInt(partitions.size())

kafka-2-生产者分区机制/消息压缩/无丢失配置介绍


Key-ordering(消息键保序策略)

Math.abs(key.hashCode()) % partitions.size()


自定义分区方法:

实现org.apache.kafka.clients.producer.Partitioner

(kafka-clients:2.5.0)接口的partition(...)方法,同时设置

partitioner.class参数为实现类的全限定名(Full Qualified Name)


kafka默认实现按消息键保序策略;如果没有指定 key,则使用轮询策略。


消息压缩

生产者中增加参数compression.typeProducer 端压缩、Broker 端保持、Consumer 端解压缩,为了避免broker端对消息进行重新压缩, 

compression.type 值默认为producer,即与生产者保持一致,消息格式不一致也可能导致重新压缩。


压缩算法对比如下图:

Kafka 2.1.0 版本之前支持gzip/snappy/lz4,2.1.0开始增加了zstd的支持,压缩本就是一种以消耗cpu资源来换取磁盘空间和带宽占用的手段,当客户端cpu资源富裕,集群磁盘空间/带宽资源紧张时可以开启zstd压缩。


无丢失配置

1.不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。


2.设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。


3.设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。


4.设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。


5.设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。


6.设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。


7.确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。


8.确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。