vlambda博客
学习文章列表

ZK、KAFKA面经大全套,进大厂别忘回来点赞留言





以下文章内容来源于网络

内容若有雷同,
必是相互抄袭。
不,是相互学习!

    ZK、KAFKA面经大全套,进大厂别忘回来点赞留言


Zookeeper篇

1. ZooKeeper 是什么?

ZooKeeper由雅虎研究院开发,是Google Chubby的开源实现,后来托管到Apache,于2010年11月正式成为Apache的顶级项目。

ZooKeeper是一个经典的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调服务。

分布式应用程序可以基于ZooKeeper实现数据发布与订阅、负载均衡、命名服务、分布式协调与通知、集群管理、Leader选举、分布式锁、分布式队列等功能。ZooKeeper致力于为分布式应用提供一个高性能、高可用,且具有严格顺序访问控制能力的分布式协调服务

Zookeeper 保证了如下分布式一致性特性:
(1)顺序一致性

    从同一个客户端发起的请求,最终将会严格按照其发送顺序进入ZooKeeper中(2)原子性

    所有请求的响应结果在整个分布式集群环境中具备原子性,即要么整个集群中所有机器都成功的处理了某个请求,要么就都没有处理,绝对不会出现集群中一部分机器处理了某一个请求,而另一部分机器却没有处理的情况

(3)单一视图

    无论客户端连接到ZooKeeper集群中哪个服务器,每个客户端所看到的服务端模型都是一致的,不可能出现两种不同的数据状态,因为ZooKeeper集群中每台服务器之间会进行数据同步

(4)可靠性

    一旦服务端数据的状态发送了变化,就会立即存储起来,除非此时有另一个请求对其进行了变更,否则数据一定是可靠的

(5)实时性(最终一致性)

    当某个请求被成功处理后,ZooKeeper仅仅保证在一定的时间段内,客户端最终一定能从服务端上读取到最新的数据状态,即ZooKeeper保证数据的最终一致性
2. ZooKeeper 提供了什么?
(1)文件系统
(2)通知机制
3.Zookeeper 文件系统
Zookeeper 提供一个多层级的节点命名空间(节点称为 znode)。与文件系统不同的是,这些节点都可以设置关联的数据,而文件系统中只有文件节点可以存放数据而目录节点不行。Zookeeper 为了保证高吞吐和低延迟,在内存中维护了这个树状的目录结构,这种特性使得Zookeeper 不能用于存放大量的数据,每个节点的存放数据上限为1M。
4. ZAB 协议?
ZAB 协议是为分布式协调服务 Zookeeper 专门设计的一种支持崩溃恢复的原子广播协议。
ZAB 协议包括两种基本的模式:崩溃恢复和消息广播。
当整个 zookeeper 集群刚刚启动或者 Leader 服务器宕机、重启或者网络故障导致不存在过半的服务器与 Leader 服务器保持正常通信时,所有进程(服务器)进入崩溃恢复模式,首先选举产生新的 Leader服务器,然后集群中 Follower 服务器开始与新的 Leader 服务器进行数据同步,当集群中超过半数机器与该 Leader服务器完成数据同步之后,退出恢复模式进入消息广播模式,Leader 服务器开始接收客户端的事务请求生成事物提案来进行事务请求处理。
5. 四种类型的数据节点 Znode
(1)PERSISTENT-持久节点
除非手动删除,否则节点一直存在于 Zookeeper 上
(2)EPHEMERAL-临时节点
临时节点的生命周期与客户端会话绑定,一旦客户端会话失效(客户端与zookeeper 连接断开不一定会
话失效),那么这个客户端创建的所有临时节点都会被移除。
(3)PERSISTENT_SEQUENTIAL-持久顺序节点
基本特性同持久节点,只是增加了顺序属性,节点名后边会追加一个由父节点维护的自增整型数字。
(4)EPHEMERAL_SEQUENTIAL-临时顺序节点
基本特性同临时节点,增加了顺序属性,节点名后边会追加一个由父节点维护的自增整型数字。
6. Zookeeper Watcher 机制 -- 数据变更通知
Zookeeper 允许客户端向服务端的某个 Znode 注册一个 Watcher 监听,当服务端的一些指定事件触发了这个 Watcher,服务端会向指定客户端发送一个事件通知来实现分布式的通知功能,然后客户端根据Watcher 通知状态和事件类型做出业务上的改变。
工作机制:
(1)客户端注册 watcher
(2)服务端处理 watcher
(3)客户端回调 watcher
Watcher 特性总结:
(1)一次性
无论是服务端还是客户端,一旦一个 Watcher 被 触 发 ,Zookeeper 都会将其从相应的存储中移除。这样的设计有效的减轻了服务端的压力,不然对于更新非常频繁的节点,服务端会不断的向客户端发送事件通知,无论对于网络还是服务端的压力都非常大。
(2)客户端串行执行
客户端 Watcher 回调的过程是一个串行同步的过程。
(3)轻量
3.1、Watcher 通知非常简单,只会告诉客户端发生了事件,而不会说明事件的具体内容。
3.2、客户端向服务端注册 Watcher 的时候,并不会把客户端真实的 Watcher 对象实体传递到服务端,仅仅是在客户端请求中使用 boolean 类型属性进行了标记。
(4)watcher event 异步发送 watcher 的通知事件从 server 发送到 client 是异步的,这就存在一个问题,不同的客户端和服务器之间通过 socket 进行通信,由于网络延迟或其他因素导致客户端在不通的时刻监听到事件,由于 Zookeeper 本身提供了 ordering guarantee,即客户端监听事件后,才会感知它所监视 znode发生了变化。所以我们使用 Zookeeper 不能期望能够监控到节点每次的变化。
Zookeeper 只能保证最终的一致性,而无法保证强一致性。
(5)注册 watcher getData、exists、getChildren
(6)触发 watcher create、delete、setData
(7)当一个客户端连接到一个新的服务器上时,watch 将会被以任意会话事件触发。当与一个服务器失去连接的时候,是无法接收到 watch 的。而当 client 重新连接时,如果需要的话,所有先前注册过的 watch,都会被重新注册。通常这是完全透明的。只有在一个特殊情况下,watch 可能会丢失:对于一个未创建的 znode的 exist watch,如果在客户端断开连接期间被创建了,并且随后在客户端连接上之前又删除了,这种情况下,这个 watch 事件可能会被丢失。
7. 客户端注册 Watcher 实现

首先,注册一个Watcher对象可以通过创建一个ZooKeeper客户端对象实例时或者使用ZooKeeper客户端的getData、getChildren和exist三个接口来向ZooKeeper服务器注册Watcher。在注册Wacher后,客户端首先会将当前客户端请求request进行标记,将其设置为Watcher监听,同时会封装一个Watcher的注册信息WatchRegistration对象,用于暂时保存数据节点路径和Watcher的对应关系。

        在ZooKeeper中,Packet可以看作一个最小通信协议单元,用于进行客户端与服务端之间的网络传输,任何需要传输的对象都需要包装成一个Packet对象。在ZooKeeper源码中,当使用异步操作时,都会出现一行代码:

cnxn.queuePacket(h, new ReplyHeader(), request, null, cb, clientPath, serverPath, ctx, null);


这说明操作结束之后会调用cnxn对象的queuePacket方法:


Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration) { Packet packet = null; synchronized (outgoingQueue) { if (h.getType() != OpCode.ping && h.getType() != OpCode.auth) { h.setXid(getXid()); } packet = new Packet(h, r, request, response, null, watchRegistration); packet.cb = cb; packet.ctx = ctx; packet.clientPath = clientPath; packet.serverPath = serverPath; if (!zooKeeper.state.isAlive() || closing) { conLossPacket(packet); } else { // If the client is asking to close the session then // mark as closing if (h.getType() == OpCode.closeSession) { closing = true; } outgoingQueue.add(packet); } }  sendThread.wakeup(); return packet; }

        从以上可以看出通过这个方法,WatchRegistration会被封装到Packet中,然后放入发送队列中等待客户端发送。随后,ZooKeeper客户端就会向服务端发送这个请求,同时等待请求的返回。这时,激活了客户端的sendThreand线程,该线程的readResponse方法负责接收来自服务端的响应,然后将Watcher注册到ZKWatchManager中进行管理。这里需要注意的是,在底层实际的网络传输序列化过程中,并没有将WatchRegistration对象完全地序列化到底层字节数组中去。这样可以减轻服务端资源开销以及网络传输开销。

8. 服务端处理 Watcher 实现
(1)服务端接收 Watcher 并存储
接收到客户端请求,处理请求判断是否需要注册 Watcher,需要的话将数据节点的节点路径和ServerCnxn(ServerCnxn 代表一个客户端和服务端的连接,实现了 Watcher 的 process 接口,此时可以看成一个 Watcher 对象)存储在WatcherManager 的 WatchTable 和 watch2Paths 中去。
(2)Watcher 触发
以服务端接收到 setData() 事务请求触发 NodeDataChanged 事件为例:
2.1 封装 WatchedEvent
将通知状态(SyncConnected)、事件类型(NodeDataChanged)以及节点路径封装成一个WatchedEvent 对象
2.2 查询 Watcher
从 WatchTable 中根据节点路径查找 Watcher
2.3 没找到;说明没有客户端在该数据节点上注册过 Watcher
2.4 找到;提取并从 WatchTable 和 Watch2Paths 中删除对应 Watcher(从这里可以看出 Watcher 在服务端是一次性的,触发一次就失效了)
(3)调用 process 方法来触发 Watcher
这里 process 主要就是通过 ServerCnxn 对应的 TCP 连接发送 Watcher 事件通知。
9. 客户端回调 Watcher
SendThread接收事件通知


void readResponse() throws IOException { ByteBufferInputStream bbis = new ByteBufferInputStream( incomingBuffer); BinaryInputArchive bbia = BinaryInputArchive.getArchive(bbis); ReplyHeader replyHdr = new ReplyHeader(); replyHdr.deserialize(bbia, "header"); if (replyHdr.getXid() == -2) { // -2 is the xid for pings if (LOG.isDebugEnabled()) { LOG.debug("Got ping response for sessionid: 0x" + Long.toHexString(sessionId) + " after " + ((System.nanoTime() - lastPingSentNs) / 1000000) + "ms"); } return; } if (replyHdr.getXid() == -4) { // -4 is the xid for AuthPacket if(replyHdr.getErr() == KeeperException.Code.AUTHFAILED.intValue()) { zooKeeper.state = States.AUTH_FAILED; eventThread.queueEvent( new WatchedEvent(Watcher.Event.EventType.None, Watcher.Event.KeeperState.AuthFailed, null) ); } if (LOG.isDebugEnabled()) { LOG.debug("Got auth sessionid:0x" + Long.toHexString(sessionId)); } return; } if (replyHdr.getXid() == -1) { // -1 means notification if (LOG.isDebugEnabled()) { LOG.debug("Got notification sessionid:0x" + Long.toHexString(sessionId)); } WatcherEvent event = new WatcherEvent(); event.deserialize(bbia, "response"); // convert from a server path to a client path if (chrootPath != null) { String serverPath = event.getPath(); if(serverPath.compareTo(chrootPath)==0) event.setPath("/"); else event.setPath(serverPath.substring(chrootPath.length())); } WatchedEvent we = new WatchedEvent(event); if (LOG.isDebugEnabled()) { LOG.debug("Got " + we + " for sessionid 0x" + Long.toHexString(sessionId)); } eventThread.queueEvent( we ); return; } if (pendingQueue.size() == 0) { throw new IOException("Nothing in the queue, but got " + replyHdr.getXid()); } Packet packet = null; synchronized (pendingQueue) { packet = pendingQueue.remove(); } /* * Since requests are processed in order, we better get a response * to the first request! */ try { if (packet.header.getXid() != replyHdr.getXid()) { packet.replyHeader.setErr( KeeperException.Code.CONNECTIONLOSS.intValue()); throw new IOException("Xid out of order. Got " + replyHdr.getXid() + " expected " + packet.header.getXid()); } packet.replyHeader.setXid(replyHdr.getXid()); packet.replyHeader.setErr(replyHdr.getErr()); packet.replyHeader.setZxid(replyHdr.getZxid()); if (replyHdr.getZxid() > 0) { lastZxid = replyHdr.getZxid(); } if (packet.response != null && replyHdr.getErr() == 0) { packet.response.deserialize(bbia, "response"); } if (LOG.isDebugEnabled()) { LOG.debug("Reading reply sessionid:0x" + Long.toHexString(sessionId) + ", packet:: " + packet); } } finally { finishPacket(packet);            }}

SendThread的response()方法负责接收这个客户端事件通知。首先会对replyHdr中xid进行判断,如果是-1,则说明是一个通知类型的响应。作以下处理:

1.反序列化:ZooKeeper客户端接到请求后,首先将字节流转换成WatcherEvent对象。

2.处理chrootPath:如果客户端设置了chrootPath属性,那么需要对服务端传过来的完整的节点路径进行chrootPath处理,生成客户端的一个相对节点路径。

3.还原WatchedEvent:将WatcherEvent对象转换成WatchedEvent。

4.回调Watcher:将WatchedEvent对象交给EventThread线程,在下一个轮询周期中进行Watcher回调。

10. ACL 权限控制机制




Kafka篇


1、如何获取 topic 主题的列表

bin/kafka-topics.sh --list --zookeeper localhost:2181

2、生产者和消费者的命令行是什么?
生产者在主题上发布消息:

bin/kafka-console-producer.sh --broker-list 192.168.43.49:9092 --topicHello-Kafka

注意这里的 IP 是 server.properties 中的 listeners 的配置。接下来每个新行就是输入一条新消息。
消费者接受消息:

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topicHello-Kafka --from-beginning

3、consumer 是推还是拉?
Kafka 最初考虑的问题是,customer 应该从 brokes 拉取消息还是 brokers 将消息推送到 consumer,也就是 pull 还 push。在这方面,Kafka 遵循了一种大部分消息系统共同的传统的设计:producer 将消息推送到 broker,consumer 从broker 拉取消息。
一些消息系统比如 Scribe 和 Apache Flume 采用了 push 模式,将消息推送到下游的 consumer。这样做有好处也有坏处:由 broker 决定消息推送的速率,对于不同消费速率的 consumer 就不太好处理了。消息系统都致力于让 consumer 以最大的速率最快速的消费消息,但不幸的是,push 模式下,当broker 推送的速率远大于 consumer 消费的速率时,consumer 恐怕就要崩溃了。最终 Kafka 还是选取了传统的 pull 模式。
Pull 模式的另外一个好处是 consumer 可以自主决定是否批量的从 broker 拉取数据 。Push 模式必须在不知道下游 consumer 消费能力和消费策略的情况下决定是立即推送每条消息还是缓存之后批量推送。如果为了避免 consumer 崩溃而采用较低的推送速率,将可能导致一次只推送较少的消息而造成浪费。Pull 模式下,consumer 就可以根据自己的消费能力去决定这些策略。
Pull 有个缺点是,如果 broker 没有可供消费的消息,将导致 consumer 不断在循环中轮询,直到新消息到 t 达。为了避免这点,Kafka 有个参数可以让 consumer阻塞知道新消息到达(当然也可以阻塞知道消息的数量达到某个特定的量这样就可以批量发送)。
4、讲讲 kafka 维护消费状态跟踪的方法
大部分消息系统在 broker 端的维护消息被消费的记录:一个消息被分发到consumer 后 broker 就马上进行标记或者等待 customer 的通知后进行标记。这样也可以在消息在消费后立马就删除以减少空间占用。
但是这样会不会有什么问题呢?如果一条消息发送出去之后就立即被标记为消费过的,旦 consumer 处理消息时失败了(比如程序崩溃)消息就丢失了。为了解决这个问题,很多消息系统提供了另外一个个功能:当消息被发送出去之后仅仅被标记为已发送状态,当接到 consumer 已经消费成功的通知后才标记为已被消费的状态。这虽然解决了消息丢失的问题,但产生了新问题,首先如果 consumer处理消息成功了但是向 broker 发送响应时失败了,这条消息将被消费两次。第二个问题时,broker 必须维护每条消息的状态,并且每次都要先锁住消息然后更改状态然后释放锁。这样麻烦又来了,且不说要维护大量的状态数据,比如如果消息发送出去但没有收到消费成功的通知,这条消息将一直处于被锁定的状态,Kafka 采用了不同的策略。Topic 被分成了若干分区,每个分区在同一时间只被一个 consumer 消费。这意味着每个分区被消费的消息在日志中的位置仅仅是一个简单的整数:offset。这样就很容易标记每个分区消费状态就很容易了,仅仅需要一个整数而已。这样消费状态的跟踪就很简单了。这带来了另外一个好处:consumer 可以把 offset 调成一个较老的值,去重新消费老的消息。这对传统的消息系统来说看起来有些不可思议,但确实是非常有用的,谁规定了一条消息只能被消费一次呢?
5、讲一下主从同步
Kafka允许topic的分区拥有若干副本,这个数量是可以配置的,你可以为每个topci配置副本的数量。
Kafka会自动在每个个副本上备份数据,所以当一个节点down掉时数据依然是可用的。
Kafka的副本功能不是必须的,你可以配置只有一个副本,这样其实就相当于只有一份数据。
6、为什么需要消息系统,mysql 不能满足需求吗?
(1)解耦:
允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束。
(2)冗余:
消息队列把数据进行持久化直到它们已经被完全处理,通过这一方式规避了数据丢失风险。许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除之前,需要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
(3)扩展性:
因为消息队列解耦了你的处理过程,所以增大消息入队和处理的频率是很容易的,只要另外增加处理过程即可。
(4)灵活性 & 峰值处理能力:
在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。
(5)可恢复性:
系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。
(6)顺序保证:
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。(Kafka 保证一个 Partition 内的消息的有序性)
(7)缓冲:
有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。
(8)异步通信:
很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。
7、Zookeeper 对于 Kafka 的作用是什么?
Zookeeper 是一个开放源码的、高性能的协调服务,它用于 Kafka 的分布式应用。
Zookeeper 主要用于在集群中不同节点之间进行通信
在 Kafka 中,它被用于提交偏移量,因此如果节点在任何情况下都失败了,它都可以从之前提交的偏移量中获取除此之外,它还执行其他活动,如: leader 检测、分布式同步、配置管理、识别新节点何时离开或连接、集群、节点实时状态等等。
8、数据传输的事务定义有哪三种?
和 MQTT 的事务定义一样都是 3 种。
(1)最多一次: 消息不会被重复发送,最多被传输一次,但也有可能一次不传输
(2)最少一次: 消息不会被漏发送,最少被传输一次,但也有可能被重复传输.
(3)精确的一次(Exactly once): 不会漏传输也不会重复传输,每个消息都传输被一次而且仅仅被传输一次,这是大家所期望的
9、Kafka 判断一个节点是否还活着有那两个条件?
(1)节点必须可以维护和 ZooKeeper 的连接,Zookeeper 通过心跳机制检查每个节点的连接
(2)如果节点是个 follower,他必须能及时的同步 leader 的写操作,延时不能太久
10、Kafka 与传统 MQ 消息系统之间有三个关键区别
(1).Kafka 持久化日志,这些日志可以被重复读取和无限期保留
(2).Kafka 是一个分布式系统:它以集群的方式运行,可以灵活伸缩,在内部通过复制数据提升容错能力和高可用性
(3).Kafka 支持实时的流式处理
11、讲一讲 kafka 的 ack 的三种机制
request.required.acks 有三个值 0 1 -1(all)
0:生产者不会等待 broker 的 ack,这个延迟最低但是存储的保证最弱当 server 挂掉的时候就会丢数据。
1:服务端会等待 ack 值 leader 副本确认接收到消息后发送 ack 但是如果 leader挂掉后他不确保是否复制完成新 leader 也会导致数据丢失。
-1(all):服务端会等所有的 follower 的副本受到数据后才会受到 leader 发出的ack,这样数据不会丢失
12、消费者如何不自动提交偏移量,由应用提交?
将 auto.commit.offset 设为 false,然后在处理一批消息后 commitSync() 或者异步提交
commitAsync()
即:

ConsumerRecords<> records = consumer.poll();for (ConsumerRecord<> record : records){。。。tyr{consumer.commitSync()} 。。。


13、消费者故障,出现活锁问题如何解决?
出现“活锁”的情况,是它持续的发送心跳,但是没有处理。为了预防消费者在这种情况下一直持有分区,我们使用 max.poll.interval.ms 活跃检测机制。在此基础上,如果你调用的 poll 的频率大于最大间隔,则客户端将主动地离开组,以便其他消费者接管该分区。发生这种情况时,你会看到 offset 提交失败(调用commitSync()引发的 CommitFailedException)。这是一种安全机制,保障只有活动成员能够提交 offset。所以要留在组中,你必须持续调用 poll。消费者提供两个配置设置来控制 poll 循环:
max.poll.interval.ms:增大 poll 的间隔,可以为消费者提供更多的时间去处理返回的消息(调用poll(long)返回的消息,通常返回的消息都是一批)。缺点是此值越大将会延迟组重新平衡。
max.poll.records:此设置限制每次调用 poll 返回的消息数,这样可以更容易的预测每次 poll 间隔要处理的最大值。通过调整此值,可以减少 poll 间隔,减少重新平衡分组的对于消息处理时间不可预测地的情况,这些选项是不够的。处理这种情况的推荐方法是将消息处理移到另一个线程中,让消费者继续调用 poll。但是必须注意确保已提交的 offset 不超过实际位置。另外,
你必须禁用自动提交,并只有在线程完成处理后才为记录手动提交偏移量(取决于你)。还要注意,你需要 pause 暂停分区,不会从 poll 接收到新消息,让线程处理完之前返回的消息(如果你的处理能力比拉取消息的慢,那创建新线程将导致你机器内存溢出)。

14、如何控制消费的位置
kafka 使用 seek(TopicPartition, long)指定新的消费位置。用于查找服务器保留的最早和最新的offset的特殊的方法也可用(seekToBeginning(Collection) 和seekToEnd(Collection))
15、kafka 分布式(不是单机)的情况下,如何保证消息的顺序消
费?
Kafka 分布式的单位是 partition,同一个 partition 用一个 write ahead log 组织,所以可以保证 FIFO的顺序。不同 partition 之间不能保证顺序。但是绝大多数用户都可以通过 message key 来定义,因为同一个 key 的 message 可以保证只发送到同一个 partition。
Kafka 中发送 1 条消息的时候,可以指定(topic, partition, key) 3 个参数。partiton 和 key 是可选的。

如果你指定了 partition,那就是所有消息发往同 1个 partition,就是有序的。并且在消费端,Kafka 保证,1 个 partition 只能被1 个 consumer 消费。或者你指定 key( 比如 order id),具有同 1 个 key的所有消息,会发往同 1 个 partition。
16、kafka 的高可用机制是什么?
这个问题比较系统,回答出 kafka 的系统特点,leader 和 follower 的关系,消息读写的顺序即可。
17、kafka 如何减少数据丢失
Kafka到底会不会丢数据(data loss)? 通常不会,但有些情况下的确有可能会发生。下面的参数配置及
Best practice列表可以较好地保证数据的持久性(当然是trade-off,牺牲了吞吐量)。
block.on.buffer.full = true
acks = all
retries = MAX_VALUE
max.in.flight.requests.per.connection = 1
使用KafkaProducer.send(record, callback)
callback逻辑中显式关闭producer:close(0)
unclean.leader.election.enable=false
replication.factor = 3
min.insync.replicas = 2
replication.factor > min.insync.replicas
enable.auto.commit=false
消息处理完成之后再提交位移
18、kafka 如何不消费重复数据?

比如扣款,我们不能重复的扣。
其实还是得结合业务来思考,我这里给几个思路:
比如你拿个数据要写库,你先根据主键查一下,如果这数据都有了,你就别插入了,update 一下好吧。
比如你是写 Redis,那没问题了,反正每次都是 set,天然幂等性。
比如你不是上面两个场景,那做的稍微复杂一点,你需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,类似订单 id 之类的东西,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下,之前消费过吗?如果没有消费过,你就处理,然后这个 id 写 Redis。如果消费过了,那你就别处理了,保证别重复处理相同的消息即可。比如基于数据库的唯一键来保证重复数据不会重复插入多条。因为有唯一键约束了,重复数据插入只会报错,不会导致数据库中出现脏数据

   
     
     
   
         
           
           
         
制作不易,各位看官做完
大全套(点赞+在看+分享+收藏)
一定发、发、发大财!

            
              
              
            

             
               
               
             

1、

2、

3、

4、

5、

6、

7、

8、

9、

10、

11、

12、

13、