Kafka之性能与高可用概要设计
消息引擎系统
消息引擎,又称消息队列、消息中间件,英文称为Message System,一般情况下用于传递语义准确的消息,消息引擎通常提供一组软件接口,供应用之间使用,应用通过消息引擎可实现松耦合、异步式的消息传递。
消息引擎中的消息可以是任何形式的数据,电子邮件、传真、即时消息等等,只要是业务需要用到的数据都可以放在消息引擎中传递给另一个或多个应用。
消息引擎需要考虑的两个重要因素,消息如何设计、传输协议如何设计。
1. 消息设计
消息一般要语义清晰并且格式通用,方便拓展重用,一般消息格式都是结构化的,例如SOAP协议中的消息采用XML格式、Web Service支持JSON格式,Kafka采用二进制的方式,但内容依然是结构化的。
2. 传输协议设计
消息传输协议指定了消息在不同系统之间如何进行传输,常见的有AMQP、Web Service + SOAP、MSMQ等,Kafka自己设计了一套二进制的消息传输协议。
3. 消息引擎范型
消息引擎范型决定消息以何种方式传输到下游的系统中,常见的范型有两种:
消息队列模型 (message queue)
消息队列模型基于队列提供消息传输服务,多用于进程间通信(IPC,inter-process communication)、线程间通信。
该模型定义了三个关键的角色:
消息队列(queue)
消息发送者(sender)
消息接收者(receiver)
发送者发送的消息存入到消息队列中,接收者从队列中接收数据,一旦消息被消费(consumed),则从该队列中移除该消息。
每条消息只能被一个消费者消费,发送者和消费者是一对一的关系,因此消息队列模型也被称为基于 点对点(point-to-point, p2p)的消息传输方式 。
发布订阅模型 (publish/subscribe , pub/sub)
与消息队列模型不同,发布订阅模型有一个 主题(topic) 的概念,一个topic用于存放同一种类型的消息,该模型定义了三个关键的角色:
消息主题 (topic)
消息发布者 (publisher)
消息订阅者 (subscriber)
发布者将消息发送到指定的topic中,订阅者订阅topic下的所有消息,通常同一个消息会被多个订阅者订阅到,类似报纸。
kafka同时支持上面两种模型。
4. JMS
JMS即Java消息服务(Java Message Service),它是一套API规范,列出了分布式系统间消息传递的必要接口由具体的协议实现。
JMS完全支持上面提到的两种模型,主流的消息引擎基本都实现了JMS规范 ActiveMQ、RabbitMQ(Rabbit MQ JMS Client)、IBM WebSphere MQ。
Kafka没有完全遵循JMS的规范,做出了一些自己的设计。
kafka的概要设计
Kafka的设计初中是为了解决超大量级数据的实时传输,为了达到这个目的,Kafka对下面4个问题给出了相应的解决方案:
吞吐量/延时
消息持久化
负载均衡和故障转移
伸缩性
1. 吞吐量/延时
吞吐量(throughput)指的是 某种处理能力的最大值,在Kafka中指的是每秒能够处理的消息数,吞吐量越大当然越好,延时指的是一次请求,客户端发送到服务端请求的时间间隔,延时越短越好。
吞吐量和延时通常比较矛盾,调优一个会导致另一个变差,但是可以通过调优达到一个相对均衡的最优效果,例如:
若Kafka处理一次消息请求需要2毫秒,每次请求仅发送一条消息,那么1秒的吞吐量就是500条消息
如果采用批处理(batching),进行一小批一小批(micro-batch)的消息发送,在发送消息前等待8毫秒
此时消息延时由2毫秒变成了10毫秒(2+8),延时增加4倍,假设8毫秒内积攒了1000条消息
那么1秒内的吞吐量就成了1000/0.01 = 100000条,吞吐量提升了200倍左右
可以看到通过micro-batch对延时的稍微增加,让吞吐量暴增,Storm Trident、Spark Streaming也是采用类似的思想
Kafka服务端对吞吐量/延时的优化
1. 写入优化
kafka写入数据时,默认将数据写到操作系统的页缓(page cache)存中,由操作系统决定何时刷盘,该设计有如下优势:
操作系统页缓存在内存中分配,写入速度极快
kafka不直接与底层文件系统打交道,I/O操作由操作系统处理
kafka的写入操作采用追加(append)写方式,避免随机IO操作
在普通磁盘上,随机IO会非常慢,但如果使用追加写的方式,其速度可以达到内存的随机IO速度,有测试表明对内存进行随机IO操作速度是36.7MB/s,对普通磁盘顺序IO的速度是52.2MB/s,
所以kafka采用了追加写消息的方式,即只能在日志文件末尾追加消息,不可修改已写入的消息,在实际使用过程中可以做到每秒几万甚至几十万的消息写入。
2. 读取优化
Kafka在对消息进行写入时,会将消息写入到操作系统页缓存中,在读取消息时也会尝试从页缓存中获取,如果命中页缓存则直接发送到网络的Socket上,
避免了数据在内核态和用户态之间来回复制的开销,这个技术被称为零拷贝(Zero Copy),由Linux平台提供的sendfile系统调用提供。
sendfile与零拷贝
在之前的Linux系统中,IO接口通过数据拷贝实现,即将内核态缓冲区中的数据拷贝到用户态应用程序缓冲区中,或者反过来,
这个过程既涉及到数据拷贝,又涉及到内核态和用户态的上下文切换,对CPU的开销占用非常大,限制了OS高效进行数据传输的能力,
例如没有使用零拷贝的一次消息读取的过程如下:
使用零拷贝可以省去内核缓冲区与用户态应用程序缓冲区之间的数据拷贝,其利用DMA(直接存储器访问计数,Direct Memory Access)执行IO操作,也避免了内核缓冲区之间的数据拷贝(因此取名零拷贝), 极高的提升了数据传输效率。
由此可以看出,两种不同的拷贝方式步骤分别如下:
正常拷贝:磁盘 -> 内核空间 -> user空间 -> 内核空间 -> 目的缓冲区
零拷贝方式:磁盘 -> 内核空间 -> 目的缓冲区
后者明显优于前者,Linux提供的sendfile系统调用提供了零拷贝技术的实现,kafka的消息消费机制,通过Java的FileChannel.transferTo 方法进行系统调用,提升读取吞吐量。
由于kafka大量的使用了页缓存,很多消息可能都不需要从磁盘读取,直接命中了页缓存,又进一步提升了读取的吞吐量。
3. 小结
kafka对高吞吐/低延时的优化总结为下面几列:
大量使用操作系统缓存页,提升写入速度和读取缓存命中率
kafka不直接进行磁盘IO操作,由OS操作
追加写方式,避免磁盘随机读写
采用零拷贝提升消息读取网络传输效率
2.消息持久化
kafka的消息需要持久化到磁盘中,主要为了实现下面的目的:
解耦消息发送与消息消费:生产者将消息交付给kafka后,就不需要关心后续消费者如何消费了,kafka会持久化消息,由相应的消费者消费。
实现灵活的消息处理:对于消费者,很多消息可能消费过之后,还需要重新消费一次,或者新加入的消费者需要从头开始消费,即消息重演(message replay),消息持久化可以方便的实现这种需求。
kafka接收到消息后会直接将其进行持久化操作,然后再返回客户端操作成功,但这个持久化并不是kafka直接去写盘,而是写到操作系统的页缓存中,由操作系统定时刷盘,
有些消息系统在持久化前会尽量先使用内存,当内存资源耗尽时,再一次性将内存中的刷盘,这意味着消息既可能丢失又占用大量内存空间,kafka的做法既可实时保存数据,又可将节省出的内存留给OS页缓存使用,提升了整体性能。
交由操作系统做持久化会有一个问题,如果操作系统挂了,那么写入的数据就丢失了,用户可通过配置强制写入磁盘,但是刷盘是一个很慢的过程,会降低整体吞吐量,
通过OS担心宕机数据丢失,手动刷盘又会降低吞吐量,kafka如何解决的呢?
kafka默认还是推荐由OS自动刷盘,可以保证持久性来源自replica副本备份,即kafka官方认为写入的消息写到多个replica中,不至于多个OS系统同时宕机,
交由OS刷盘,本身就是一种对性能和持久化双方的平衡取舍,kafka在这之上又增加了replica的备份保障,在提升写入速度时,又提升了持久可靠性。
3. 负载均衡和故障转移
任何一个高可用的系统都需要提供负载均衡(load balancing)与故障转移(fail-over)的功能。
1. 负载均衡
负载均衡即让系统的工作量按照一定的规则均衡的分配在所有参与工作的服务器上,避免某台服务器因为工作太多出现资源耗尽停止工作,甚至宕机的可能性,提升系统整体的运行效率。
默认情况下,kafka集群中的每个服务器都有机会为客户端提供服务,kafka将负载分散在所有集群的服务器上,避免某台机器资源耗尽的可能性,
kafka提供了智能的分区领导者选举(partition leader election)算法, 将partition的leader均分在集群的机器上,同时对外提供服务,实现负载均衡。
2. 故障转移
故障转移即当集群中的某个节点意外中止时,集群可以快速的检测到失效(failure)节点,并将该节点上的应用或服务转移到其他可用节点上继续对外提供服务。
故障转移通常使用心跳或会话机制实现,即主服务器与备份服务器之间的心跳无法维持或主服务器注册到服务中心的会话超时过期了,
这两种情况即认为主服务器已经失效,无法正常运行,集群将从备份服务器中选举一台服务器来替代已经失效的主服务器工作。
kafka采用会话机制,当kafka启动后以会话形式将自己注册到Zookeeper上,一旦该kafka节点出现问题,那么与Zookeeper的会话将超时失效,此时kafka集群将选举一台新的服务器来替代失效的服务器对外提供服务。
4. 伸缩性
伸缩性(scalability)指的是向分布式系统中增加额外的计算资源(CPU、内存、带宽)时,系统吞吐量提升的能力,
例如对于计算密集型应用,一个CPU的计算能力是U,那么两个CPU我们希望是2U,通过增加计算资源,计算能力可线性提升,
但是分布式系统中有很多单点瓶颈会制约线性计算能力的扩容,很难做到最理想状态的线性伸缩性。
阻止线性拓容最常见的一个因素就是状态的保存,分布式系统中,每个服务会维护很多内部状态,如果状态由服务自身保存,就需要解决一致性的问题,
如果服务状态交由专门的协调服务来做,例如由Zookeeper对状态进行保存和管理,那么服务之间就无需繁重的状态共享,降低了维护复杂度,如果需要拓容集群节点,直接启动新的节点加入服务集群即可进行负载均衡对外提供服务。
kafka将服务内部状态交给Zookeeper管理,扩容kafka服务只需启动新的节点,每个kafka服务内部仅维护一小部分必要的状态,降低了状态维护复杂性。