kafka-2-生产者分区机制/消息压缩/无丢失配置介绍
分区机制
为什么分区:
上文提过,客人太多,一个房间接客太慢,可以多开几个房间。不同的分区能够被放置到不同节点的机器上,而数据的读写操作也都是针对分区这个粒度而进行的,这样每个节点的机器都能独立地执行各自分区的读写请求处理。并且,我们还可以通过添加新的节点机器来增加整体系统的吞吐量。每条消息只会保存在某一个分区中,而不会在多个分区中被保存多份。如下图:
分区策略介绍:
轮询:
随机数:
nextInt(partitions.size())
Key-ordering(消息键保序策略):
Math.abs(key.hashCode()) % partitions.size()
自定义分区方法:
实现org.apache.kafka.clients.producer.Partitioner
(kafka-clients:2.5.0)接口的partition(...)方法,同时设置
partitioner.class参数为实现类的全限定名(Full Qualified Name)
kafka默认实现按消息键保序策略;如果没有指定 key,则使用轮询策略。
消息压缩
生产者中增加参数compression.type,Producer 端压缩、Broker 端保持、Consumer 端解压缩,为了避免broker端对消息进行重新压缩,
compression.type 值默认为producer,即与生产者保持一致,消息格式不一致也可能导致重新压缩。
压缩算法对比如下图:
Kafka 2.1.0 版本之前支持gzip/snappy/lz4,2.1.0开始增加了zstd的支持,压缩本就是一种以消耗cpu资源来换取磁盘空间和带宽占用的手段,当客户端cpu资源富裕,集群磁盘空间/带宽资源紧张时可以开启zstd压缩。
无丢失配置
1.不要使用 producer.send(msg),而要使用 producer.send(msg, callback)。记住,一定要使用带有回调通知的 send 方法。
2.设置 acks = all。acks 是 Producer 的一个参数,代表了你对“已提交”消息的定义。如果设置成 all,则表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义。
3.设置 retries 为一个较大的值。这里的 retries 同样是 Producer 的参数,对应前面提到的 Producer 自动重试。当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失。
4.设置 unclean.leader.election.enable = false。这是 Broker 端的参数,它控制的是哪些 Broker 有资格竞选分区的 Leader。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。故一般都要将该参数设置成 false,即不允许这种情况的发生。
5.设置 replication.factor >= 3。这也是 Broker 端的参数。其实这里想表述的是,最好将消息多保存几份,毕竟目前防止消息丢失的主要机制就是冗余。
6.设置 min.insync.replicas > 1。这依然是 Broker 端参数,控制的是消息至少要被写入到多少个副本才算是“已提交”。设置成大于 1 可以提升消息持久性。在实际环境中千万不要使用默认值 1。
7.确保 replication.factor > min.insync.replicas。如果两者相等,那么只要有一个副本挂机,整个分区就无法正常工作了。我们不仅要改善消息的持久性,防止数据丢失,还要在不降低可用性的基础上完成。推荐设置成 replication.factor = min.insync.replicas + 1。
8.确保消息消费完成再提交。Consumer 端有个参数 enable.auto.commit,最好把它设置成 false,并采用手动提交位移的方式。就像前面说的,这对于单 Consumer 多线程处理的场景而言是至关重要的。