流系统Spark/Flink/Kafka/DataFlow端到端一致性实现对比
分布式最难的2个问题
1. Exactly Once Message processing
2. 保证消息处理顺序.
我们今天着重来讨论一下
为什么很难
怎么解
就作者学习流系统的感受来看, 流系统有2个难点, 第一是end to end consistency,或者说exactly once msg processing; 第二则是event time based window操作; 本来作者想用一篇文章同时概括和比较这2点,无奈第一点写完, 文章已经长度爆炸。于是分开2篇, 此为上篇, 着重于从分布式系统的本质问题出发, 从最底层的各种"不可能", 和它们的解(比如:consensus协议)开始, 一层一层的递进到高层的流系统中, 如何实现容错场景下的end to end consistency,或者说exactly once msg processing。
流系统的具体对比在“9.流系统的EOMP”这一节, 前边都是准备知识...
一些术语
圣光(广告)不会告诉你的事
几个事实
Liveness和Safety的取舍
绝望中的曙光
Zombie Fencing
三节点间的EOMP
加入节点状态的三节点间的EOMP
流系统的EOMP
异步增量checkpointing
系统内与系统外
Latency, 幂等和non-deterministic
REFERENCE
端到端一致性end to end Consistency
一致性其实就是业务正确性, 在不同的业务场景有不同的意思, 在"流系统中间件"这个业务领域, 端到端的一致性就代表Exact once msg processing, 一个消息只被处理一次,造成一次效果; 注意: 这里的"一个消息"代表"逻辑上的一个", 即application对中间件的期待就是把此消息作为一个来处理, 而不是指消息本身的值相等, 比如要求计数+1的一个消息, 消息本身的内容可能一模一样, 但是application发来2次相同消息的"本意"就是要计数两次, 那么中间件就应该处理两次, 如果application由于超时重发了本意只想让中间件处理一次的+1操作, 那么中间件就应该处理一次; 中间件怎么能区分application的"本意"来决策到底处理一次还是多次, 是end to end consistency的关键.
EOMP
由于Exactly once msg processing太经常出现, 我们用EOMP来代替简写一下;
容错failure tolerance
为了方便讨论,后边谈到failure, 我们指的都是crash failure, 你可以想象是任何可以造成“把机器砸了然后任何本地状态丢失(比如硬盘损坏)一样效果的情况出现"; 在今天的虚拟云时代,这其实很常见,比如container或者虚拟机被resource manager突然kill掉回收了, 那么即使物理机其实没有问题, 你的application的逻辑节点也是被完全销毁的样子;
容错在end to end Consistency的语义下,是指在机器挂了,网络链接断开...等情况下,系统的运算结果和没有任何failure发生时是一摸一样的.
Effective once msg processing(应该翻成有效一次性处理?)
后边我们可以看到, 保证字面上的Exact once msg processing(即整个系统在物理意义上真的只对消息处理一次), 这在需要考虑容错的情况下是不可能做到的; Effective once msg processing是一个更恰当的形容,而所有号称可以做到EOMP的系统,其实都只是能做到Effective once msg processing; 即:中间件, 或者说流处理framework可能在failure发生的情况下处理了多次同一个消息,但是最终的系统计算结果和没有任何failure时, 一个消息真的只处理了一次时计算的结果相等; 这和幂等息息相关;
幂等Idempotent
一个相同的操作, 无论重复多少次, 造成的效果都和只操作一次相等; 比如更新一个keyValue, 无论你update多少次, 只要key和value不变,那么效果是一样的; 再比如更新计数器处理一次消息就计数器+1, 这个操作本身不幂等, 同一个消息被中间件重"发+收"两次就会造成计数器统计两次; 而如果我们的消息有id, 那么更新计数器的逻辑修改为, 把处理过的消息的id全记录起来, 接到消息先查重, 然后才更新计数器, 那么这个"更新计数器的逻辑"就变成幂等操作了;
把本不幂等的操作转化为幂等操作是end to end consistency的关键之一.
确定性计算deterministic
和幂等有些类似, 不过是针对一个计算; 相同的input必得到相同的output, 则是一个确定性(deterministic); 比如从一个msg里计算出一个key和一个value, 如果对同一个消息运算无数次得到的key和value都相同, 那么这个计算就是deterministic的, 而如果key里加上一个当前的时钟的字符串表示, 那么这个计算就不是确定性的, 因为如果重新计算一次这个msg, 得到的是完全不同的key;
注意1: 非确定性计算一般会导致不幂等的操作, 比如我们如果要把上边例子里的keyvalue存在数据库里, 重复处理多少次同一个msg, 我们就会重复的插入多少条数据(因为key里的时间戳字符串不同);
注意2: 非确定性计算并非必然导致不幂等的操作,比如这个时间戳没有添加在key里而是添加在value里, 且key总是相同的, 那么这个计算还是"非确定性"计算; 但是当我们存数据的时候先查重才存keyvalue, 那么无论我们重复处理多少次同一个msg, 我们也只会成功存入第一个keyValue, 之后的keyValue都会被过滤掉;
支持非确定业务计算的同时, 还能在容错的情况下达成端到端一致性, 是流系统的大难题, 甚至我们今天会提到的几个state of art的流系统都未必完全支持; (好吧Spark说的就是你)
分布式系统最tricky的问题就是, 问题看起来很普通很简单; 一些问题总是看起来有简单直接的解法,而一个"简单解"被人查出问题时,也总是看起来可以很简单的就可以把这个挑出的edge case很简单的解决掉; 然而我们会立刻发现解决这个edge case而引入的新步骤会引发新的问题... 如此循环, 直到"简单"叠加到"无法解决的复杂".
由于人们对这些问题的"预期是简单的", 所以很多书, online doc, 都大大简化了对问题的描述和对问题的分析; 最普遍的是对failure recovery的介绍, 一般只会简单的写"failure发生时, 系统会怎么recovery", 但是完全不提怎么检测failure和“根本不可能完美检测到failure”这个分布式系统的基本事实, 从而给了读者“failure可以完美检测”的错觉;
这是因为一来说清楚各种edge case会大大增加文档的复杂性, 另外一点是写了读者可能也看不明白, 还有就是广告效应, 比如真正字面意义的exactly once msg processing是不存在的, 但是所有其他做到effective once msg processing的系统都说自己可以支持exactly once, 那自己也得打这个广告不是; 还有就是语焉不详, 比如某stream系统说自己可以实现exactly once msg delivery, 别看delivery和processing好像差不多, 这里边的用词艺术就有意思了,delivery是指消息只在stream里出现一次, 但是在stream里只出现一次的消息却无法保证只被consume一次确根本不提; 再比如某serverless产品处理某stream的消息, 描述是保证旧的消息没有处理之前不会处理新消息, 你会想, 简单描述成保证消息按顺序处理不是一样么? 其实差大了去了, 前者并没有屏蔽掉旧消息突然replay, 覆盖掉新消息的处理结果的edge case, 而这个事实甚至颠覆了很多使用这个服务的Sr. SDE的对其的认知;
没有理解分布式系统的几个简单的本质问题之前, 你读文档的理解很有可能和文档真正精准定义的事实不符; 且读者对“系统保证”的理解, 往往会由于文档"艺术"定义的误导, 而过多的假设系统保证的"强", 直到被坑了去寻根问底, 才会收到"你误读了文档的哪里的详细解释";这是分布式系统"最难的地方在最普通的地方"的直接结果之一;
个人认为最好的办法就是去理解分布式系统软件算法所能达到的上限=>关于各种impossibility的结论的论文,然后去学习克服他们的方法的论文; 这样, 我们才能从各种简化了的 tutorials里, 从API中, 从各种云服务, 框架的广告词背后, 发现“圣光不会告诉你的事", 和"这个世界的真相";(从广告和online doc天花乱坠的描述中看到分布式系统设计真正的取舍, 这是区分API调包侠和分布式系统专家的分水岭之一); 而不是“简单的信了它们的邪”; 而下边,就是学习分布式系统,你所需要了解的最重要事实中, 和end to end consistency相关的几个;
不存在完美的failure detector
很多关于分布式系统的书上都会说,当failure发生时系统应该怎么做来容错, 就好像可以准确的检测到failure一样; 然而事实是, 在目前互联网的物理实现上(share nothing architecture, 只靠网络互联,不直接共享其他比如内存物理硬盘等),我们无法准确的检测到failure;
简单来说,就是当我们发现一个node无反应的时候,比如ping它,给它发消息,request,查询,都没有反应,我们无法知道,这到底是对方已经停止工作了,还是只是处理的很慢而已; 无法制造完美的failure detector, 即使在今天也是分布式系统的基础事实; 本文无意在基础事实上多费唇舌, 无法接受此事实者可以去翻相关论文; ╮( ̄▽ ̄"")╭
Essentially, the impossibility results for Consensus and Atomic Broadcast stem from the inherent difficulty of determining whether a process has actually crashed or is only “very slow”. [30]
The fundamental reason why Consensus cannot be solved in completely asynchronous systems is the fact that, in such systems, it is impossible to reliably distinguish a process that has crashed from one that is merely very slow. In other words, Consensus is unsolvable because accurate failure detection is impossible. [30]
A fundamental problem in distributed systems is that network partitions (split brain scenarios) and machine crashes are indistinguishable for the observer, i.e. a node can observe that there is a problem with another node, but it cannot tell if it has crashed and will never be available again or if there is a network issue that might or might not heal again after a while. Temporary and permanent failures are indistinguishable because decisions must be made in finite time, and there always exists a temporary failure that lasts longer than the time limit for the decision…
A third type of problem is if a process is unresponsive, e.g. because of overload, CPU starvation or long garbage collection pauses. This is also indistinguishable from network partitions and crashes. The only signal we have for decision is “no reply in given time for heartbeats” and this means that phenomena causing delays or lost heartbeats are indistinguishable from each other and must be handled in the same way.[29]
不存在完美的failure detector, 所导致的几个颠覆你认知的问题:
分布式共识问题"Consensus"在"不存在完美的Failure Detector的情况下"不可解“, 这又叫做FLP impossibility[36], 可以说是上世纪,奠定分布式系统研究基石, 方向的一篇论文; 即: 在理论上, 在分布式环境里(更准确说应该是异步环境里), 在最多可能出现一个crash failure的强假设下, 不存在任何一个算法可以保证系统里的所有的"正常"节点对某一信息有共识; 对于"共识"你可以理解为一个数据一摸一样的备份在多个节点上; (那么paxos, raft这种consensus协议是怎么回事呢? 稍后会解释)
在分布式环境, 连分配只增序列号这件事都很难(即不同的进程去向一个系统申请序列号, 从0开始不断增加, 保证process得到的序列号不能重复), 因为本质上这是一个consensus问题, 后边可以看到, 能够分配高可用性的global序列号(epoch id), 是解决zombie leader/master/processor的问题时的一大助力;
在保证liveness的情况下(即检测到失败就在另外的机器重启逻辑节点), 无法保证系统中的Singleton角色“在同一时间点”只有一个; 比如在有leader概念的分布式系统里, 要求任意时间点只有一个leader做决定, 比如HBase需要只能有一个Region Server负责某region的写操作; 再比如kafka或者Kinesis[22]里需要只能有一个partition processor接受一个stream partition的信息并且采取行动; 而事实是, 任何云服务和现有实现, 都无法在物理上保证“在同一时间点”, 真的只有一个这样的逻辑角色存在于机群中; 这就牵涉到一个概念=> Zombie Process.
Zombie process, 由于没有完美的failure detector, 所以即使几率再低, 只要时间够长, 需要failure detection的用例够多, 系统不可避免会错误的判断把一个并没有真正crash掉的process当作死掉了; 而如果系统需要保持高可用性,需要在检测到crash的时候,在新的机器上启动此process继续处理,那么当failure detector出错,则会发生新老process共同工作的问题,此时,这个老的process就是zombie process;
严重注意,在分布式系统里,我们需要单一责任的一个节点/processor/role来做决策或者处理信息时,我们要么不保护系统的高可用性(机器挂了就停止服务),要么解决zombie process会带来的问题;高可用性的系统中, zombie无法消除;这关系到分布式系统设计里的一个核心问题:liveness和safety的取舍;
在缺乏完美的failure detector的情况下, 对方迟迟不回信息(ping它也不回), 不发heartbeat, 那么本机只有2个选择: 1. 认为对方还没有crash, 持续等待; 2. 认为其crash掉了, 进行failover处理;
选择1伤害系统的liveness, 因为如果对方真的挂了,我们会无限等待下去, 系统或者计算就无法进行下去; 选择2伤害系统的safety, 因为如果对方其实没有crash, 那我们就需要处理可能出现的重发去重, 或者zombie问题, 即系统的逻辑节点的“角色唯一性“就会被破坏掉了;
越好的liveness要求越快的响应速度, 而“100%的safety“的意义, 则因系统的具体功能的不同而不同, 但一般都要求系统做决定要小心谨慎, 不能放过一个edge case, 穷尽所有必要的检查来保证"系统不允许出现的行为绝对不会发生"; 在consensus的语义下来说, safety就是绝对不能向外发出不一致的决定(比如向A说决定是X, 后来向B说决定是Y);
可以看到, 系统的edge case越多, safety越难保证, 而edge cases的全集只是可能发生的情况的集合, 而某一次运行只会发生一种情况(且大概率是正常情况); 如果系统不检查最难分辨最耗时的几种小概率发生的edge case, 那么系统大概率(甚至极大概率)也可以完美运转毫无问题几个月, 运气好甚至几年; 这样降低了系统的safety(不再是100%), 但是提高了系统的响应速度(由于是概率上会出问题, 所以即使降低了safety保证, 也不是说就一定会出问题, 只是你把系统的正确性交给了运气和命运); 而如果系统保证检查所有的edge case, 但是系统99.9999%的概率都不会进入一些edge cases, 那么这些检查就会阻碍正常情况的运算速度; Liveness和Safety, 这是分布式系统设计的最基本取舍之一;
而FLP则干脆说: 在分布式consensus这个问题里, 如果你想要获得100%的系统safety, 那么你绝对无法保证系统liveness, 即:系统总是存在活锁的可能性, 算法设计只能减小这个可能性, 而无法绝对消除它;
更多的safety VS. liveness 取舍的例子
Kubernetes StatefulSet, 简单说是可以给容器(pod/container)指定一个名字的, 且保证全cluster总是只有一个容器可以有这个名字, 这样application就可以通过这个保证来指定机群中的逻辑角色, 且用这个逻辑容器中保存一些状态; (一般的replicaSet会load balance连接或请求到背后不同的节点, 你的一个请求要求在server本地存一些状态, 下一个请求未必还会到同一个server)
When a stateful pod instance dies (or the node it’s running on fails), the pod instance needs to be resurrected on another node, but the new instance needs to get the same name, network identity, and state as the one it’s replacing. This is what happens when the pods are managed through a StatefulSet. [37]
Kubernetes StatefulSet在liveness和safety里选择了safety, 当statefulSet所在的的物理节点"挂了"之后, kubernetes默认不会重启这个pod到其他节点去, 因为它无法确定这个物理节点到底死没死, 为了保证safety它选择放弃了liveness, 即系统无法自愈, StatefulSet提供所提供的服务不可用, 直到靠人干预来解决问题;
([38] P305: 10.5. Understanding how StatefulSets deal with node failures)
Node fail cause daemon of Kubelet could not tell state of pod on the node….StatefulSet guarantees that there will never be two pods running with the same identity and storage...
Akka Cluster也做了相同的选择, 在cluster membership管理中,有一个auto-downing的配置, 如果你打开它, 那么cluster就会完全相信Akka的failure detection而自动把unreachable的机器从cluster中删去, 这意味着一些在这个unreachable节点上的Actor会自动在其他节点重启; Akka Cluster的文档中, auto-downing是强烈不推荐使用的[38], 这是由于Akka Cluster提供的很多feature要求角色的绝对单一性, 比如singleton role这个功能, 在保证“cluster里只有这一个节点扮演这个actor"(safety), 和保证"cluster里总要有一个节点扮演这个actor"(liveness) 中, 选择了safety, 即保证at most one actor存在于cluster中, 一旦次actor的节点变成unreachable(比如机器真的挂了), 那么Akka也无能为力, 只能傻等这个节点回来或者人来干预决策:
The default setting in Akka Cluster is to not remove unreachable nodes automatically and the recommendation is that the decision of what to do should be taken by a human operator or an external monitoring system. [29]
一个商用的Akka拓展(Akka Split Brain Resolver)提供了一些智能点的解决方案(基于quorum), 有兴趣的同学可以看引用文档[29];
This is a feature of the Lightbend Reactive Platform. that is exclusively available for Lightbend customers.[29]
为什么Kubernetes和Akka不能同时保证safety和liveness呢?
这是因为这两个作为比较底层的平台, 他们需要对上层提供非常大的自由性, 而不能限制上层的活动; 比如kubernetes没有规定用户不能在pod上跑某种程序, Akka也没有规定用户不能写某种actor的code; 这样, 在不限制自己处理能力的同时要保证任何行为都看起来exactly happen once(因为语义上singleton节点只有一个, 那么就不能让用户写的任意单线程程序出现多节点平行执行的外部效果), 而这对中间件来说是不可能的, 这就引出了另外一篇论文: end to end argument[27], 作者已经写过一篇文章详细介绍end to end argument(阿莱克西斯:End to End Argument(可能是最重要的系统设计论文)), 这里不在赘述; 后边我们可以看到Flink, Spark等流系统为了保证exactly once msg processing需要怎样和end to end argument 搏斗;
可以同时保证safety和liveness么
取决于具体情况下对safety和liveness的具体要求, 在流处理的情况下, 至少本文提到的4种流系统都给出了自己的解; 请耐心往下阅读
可解也不可解的分布式consensus
由于异步环境下, 钉死了我们不可能有一个完美不犯错的failure detector; 这篇著名的论文Unreliable Failure Detectors for Reliable Distributed Systems[30] 详细描述了即使我们用一个不准确的failure detector, 也可以解决consensus的方法; 但是它并没有推翻FLP impossibility的结论:Consensus还是并非绝对可解; 但是, 如果我们对需要consensus的计算加一个限制,则Consensus可解;
这个限制是: 计算和通讯只需要在"安全时间"内完成即可, 对[30]提供的算法来讲, 提供consensus的系统需要在这段时间内"正确识别crash"即可,也就是说(1)识别出真正挂掉的node, 和(2)不要怀疑正确的node;
怎么理解呢, 这两个看似对立的概念: (1)consensus的有解(比如paxos协议)是对的, (2)consensus的无解证明:FLP impossibility也是对的; 要准确且简单的解释为什么它们都是对的有点难, 推荐还是看论文; 但是用比喻来解释的话, 根据[30], Consensus算法可以看作这样一个东西, 当系统出现crash, failure detector判断错误,或者网络突然延迟...等时候, 算法会进入某种循环而不会轻易作出决定;
for example, there is a crash that no process ever detects, and all correct processes are repeatedly (and forever) falsely suspected — the application may lose liveness but not safety [31]
而只要满足必要的条件时(计算和通讯只需要在"安全时间"内完成), 系统则可以跳出循环让机群达成一致[30,31];
(1) There is a time after which every process that crashes is always suspected by some correct process.
(2) There is a time after which some correct process is never suspected by any correct process.
The two properties of <>W0 state that eventually something must hold forever; this may appear too strong a requirement to implement in practice. However, when solving a problem that “terminates,” such as Consensus, it is not really required that the properties hold forever, but merely that they hold for a sufficiently long time, that is, long enough for the algorithm that uses the failure detector to achieve its goal.
而FLP impossibility则可以理解为挑刺儿的说, 那这个条件永远无法出现呢? 你的算法就活锁了呀(丢失liveness);
幸运的是, 在现实世界, 我们总可以对消息传递和处理来估计一个上限, 你可以理解为,只要消息处理总是在这个上限之内完成,那么consensus总是可以实现, 而消息处理的时间即使偶尔超过了这个上限, 我们的consensus协议也会进入安全循环自我保护, 从而不会破坏系统的safety, 而系统总是可以再次回归平稳(处理时间回归上限之内); 而FLP则是像说: 你无法证明系统总是可以回归平稳 (确实无法证明, 因为FLP的前提是异步模型, 而我们的真实世界更像是介于异步和同步模型之间的半同步模型, 我们只能说极大概率系统可以"回归平稳", 而无法证明它的绝对保证; =>可以绝对保证"上限"的模型一般称为同步模型);
其实用paxos来模拟出FLP的活锁的例子也很简单, 你把节点间对leader的heartbeat timeout时间设为0.001ms, 那么所有的节点都会忙着说服别的其他节点自己才是leader(因为太短的保活时间, 除了自己, 节点总是会认为其他的任意节点是leader时, leader死了), 那么系统就会进入活锁, 永远无法前进达成cluster内的consensus, 系统丧失liveness;
即使consensus问题解决了, zombie节点也还是大问题, kubernetes和Akka可以选择避开zombie, 损失liveness; 然而对于绝大多数分布式系统来说, 是必须直面zombie节点这个问题的,比如各种分布式系统的master节点, 如果master挂了整个系统不在另外的机器重启master,整个系统就可能变为不可用; 再比如kafka和Kinesis的单一partition只能有一个consumer, 如果这个msg consumer挂了不自动重启, 对消息的处理就会完全停止;
zombie是最容易被忽视的问题, 比如, 即使我们有了paxos, raft, zookeeper这种consensus工具可以帮我们做leader election, 也不要以为你的系统中不会同时有2个leader做决策了; 这是因为先一代的leader可能突然失去任何对外通信,或者cpu资源被其他进程吃光, 或者各种edge case影响, 使得其他节点无法和其通信, 新的leader被选出, 而老的leader其实还没死, 如果老的leader在失去cpu之前的最后一件事是去写只有leader才能写的数据库, 那么当它突然获得cpu时间且网络恢复正常, 那么这个以为自己还是leader的zombie leader就会出乎意料的去写数据库;
这曾经是HBase的一个重大bug[39, Leader Election and External Resources P105],
Apache HBase, one of the early adopters of ZooKeeper, ran into this problem in the field. HBase has region servers that manage regions of a database table. The data is stored on a distributed file system, HDFS, and a region server has exclusive access to the files that correspond to its region. HBase ensures that only one region server is active at a time for a particular region by using leader election through ZooKeeper...
The region server is written in Java and has a large memory footprint. When available memory starts getting low, Java starts periodically running garbage collection to find memory no longer in use and free it for future allocations. Unfortunately, when collecting lots of memory, a long stop-the-world garbage collection cycle will occasionally run, pausing the process for extended periods of time. The HBase community found that sometimes this stop-the-world time period would be tens of seconds, which would cause ZooKeeper to consider the region server as dead. When the garbage collection finished and the region server continued processing, sometimes the first thing it would do would be to make changes to the distributed file system. This would end up corrupting data being managed by the new region server that had replaced the region server that had been given up for dead.
(解释不动了, 大家看英文吧...)
其实对付zombie已经是分布式系统的共识了,也有很多标准的解法,以至于各个论文都不会太仔细的去描述, 这里简单介绍几种方法:
zombie fencing设计的关键点在于如何阻止已经“成为zombie的自己”搞乱正常的“下一代的自己”的状态;毕竟无论是zombie还是新的要取代可能死掉的上一代的"下一代", 大家跑的都是相同的代码逻辑,也就是说这同一段代码, zombie来跑就"不能过:"(比如不能对系统的状态造成影响), 但是"下一代"来跑, 就可以正常工作;这一般需要满足以下几点:
如何正确区分“正常的下一代”(由于怀疑当前的节点已经死掉了,所以重新创建和启动的新一代逻辑节点)和“zombie”(怀疑错误,当前节点并没有死掉,但是新一代节点已经创建并启动,当前节点成为大家都以为死掉但是还活着的zombie)一般需要一个多机复制且稳定自增的epoch number来确定新老逻辑节点;这个epoch number要在分布式环境中稳定自增,一般只能通过consensus协议来实现;否则要分配新一代epoch number时,由于管理epoch number的机群的failover造成分配了一个老的epoch number给新启动的“下一代”,那么zombie反而会有更大的epoch number,这就会造成整个系统的状态混乱;怎样的混乱在介绍完zombie fencing之后就显而易见了(因为所有其他节点都以为zombie死掉了, 把所有的最新操作和状态发给新节点,但是新节点却有一个比zombie还小的epoch number, 从而被zombie fence掉, 而不是自己可以fence zombie)
会被zombie影响的系统需要特殊设计使得:当“新一代”注册后就拒绝“老一代的任何请求”,特别是写入请求;也就是具体的利用epoch number的zombie fencing的实现; 一般需要具体情况具体分析;
如果被影响的系统是自己的一个microservice,那么可以随意设计协议来验证一个请求所携带的epoch number是不是最新的;而当这个被影响的系统是一个外部系统, 比如是业务系统需要用到的一个数据库,由于你没法改数据库的代码和数据库client与server之间的协议, 那么就要利数据库所提供的功能或者说它的协议来设计application层级的zombie fencing协议;比如对提供test and set,compare and swap的kv数据库来说,application设计自己的业务表时,要求所有的表都必须有一个epoch字段,而所有的写入都必须用test and set操作来检测当前epoch字段是否比要写入的请求的epoch字段大或相等, 否则拒绝写入; 这样, 只有"下一代"可以更改zombie写入的数据, 而zombie永远无法更改"下一代"插入或者更新过的数据;
另一方面,很多时候"下一代"需要读取上一代的信息,继承上一代的数据,然后继续上一代的工作;那么如果刚读取完数据,zombie就改变了数据,那么"下一代"对于当前系统状态的判断就会出差错;一个general的解决的方法也很简单,要读先写,“下一代”开始工作前, 如果要先读入数据了解“系统的当前状态”,必须先改变数据的epoch number为自己的epoch number(当然要遵从只增更改原则test and set,如果发现当前数据的epoch number已经比自己的epoch number还大了,则说明自己也已经是zombie了,更新的"下下一代"已经开始工作), 更改数据的epoch number成功之后,再读入数据,就可以保证比自己老的zombie绝对不可能再更改这个数据,而现在读入的数据可以体现系统的最新状态,从而完成对"老一代"数据的继承;而在增加epoch number之前所有被写入的数据;这里即使是"新一代"启动之后, 读取系统状态之前被zombie写入的数据, 都可以看做老一代的合法数据,只要被新一代开始工作前继承读入即可; 我们所要避免的是"新一代" 所读取的事实被zombie所更改; 而不是在物理时间的意义上在"新一代"启动时就立刻阻止zombie的所有系统改动;
zombie fencing的设计取决于分布式系统的具体情况,比如业务逻辑可能更改的数据范围可能是几百万几千万的数据记录,那么这也意味着zombie可能会修改的数据范围非常大,那么要求"下一代"在开始工作前更改所有数据的epoch number就很不现实;
对于zombie的影响的耐受性也会影响zombie fencing的设计,比如如果"下一代"只需要自己所接触的有限数据在特定时刻之后不被zombie影响就能正确工作, 那么只要在"下一代"需要接触特定数据时才更改此数据的epoch number来屏蔽zombie即可,那么即使业务可能修改的数据范围很大,简单的更改数据的epoch number也还是可以接受的解决方案;
最糟糕的情况,如果"zombie"可能会插入新的数据, 而"下一代"的正常工作 需要不能有非法的新数据插入(比如下一代开始工作前先统计所有资源的个数,然后开始基于这个事实和"只有自己才能更改资源"的假设,作出各种决策, 而此时zombie突然插入了一条新资源记录或者资源使用记录...),如果"新一代"完全无法预测zombie会插入什么记录,要阻止zombie随意插入数据,“新一代”就只能在利用predicate lock来防止新纪录插入,且不说很多数据库根本不支持“锁住不存在的数据”的predicate lock,就算支持此功能的数据库也很有可能是使用表级锁来锁住整张表;如果数据表设计成了需要共享给多个节点使用(比如一张资源表,不同的singleton worker负责维护不同的资源范围),那么表级锁就会妨碍其他worker的工作;
zombie fencing的设计在于如何引入简单的fencing点, 对"新一代"畅通无阻,但是却可以阻止zombie的异常活动, 如果协议设计使得"新一代"可以很容易制造这个fencing点, 则"新一代"在启动或者需要的时候加入fencing点即可, 比如前边说的任何数据都要附带一个epoch值,任何数据写入都要用test and set来对比数据的当前epoch值和请求的epoch值; 对于上文的随机插入的业务需求, 可以要求业务逻辑插入任何数据之前,先在一个注册表的属于自己epoch的一行里记录自己要写的数据的id, 且在记录的时候用test and set来检测自己这一行数据的active值是否被更改为disable了;这样就相当于引入了一个更简单的fencing点,因为"下一代"只要在注册表里把所有上一代的记录写为disable, 就可以阻止zombie的未来任何活动,但是此时无法阻止zombie的最后一个注册的数据插入, 但是"下一代"可以简单的读注册表得知这个数据的id, 从而对这个"最后的zombie写入"采取相应的策略(继承,删除, 甚至fencing, 比如这个id并不存在,那么无法得知是zombie真的在写之前死了所以永远不会插入这个记录了,还是zombie只是卡了, 那么"下一代"可以用自己的epoch和zombie注册的id先插入一条记录来占位,这样无论zombie是真的死了还是卡了,都无法再写入这个数据了);这样,我们就引入了一个连数据插入都可以fencing的fencing点;
Zombie fencing一般都是以上这些套路, 用consensus协议确定epoch number区分"下一代"还是zombie,这个epoch number一般也可以称为fencing token, 通过把fencing token分发给需要拒绝zombie的service,把fencing token和需要保护的数据(防止被zombie修改)存在一起;所以一般论文[7, 26]里只会简单的提到epoch或者sequencer等概念,基本都是zombie fencing的fencing token
三点为 (上游/input提供端)=> (当前计算节点/计算结果发送端) =>(下游/计算接收端)
如果我们考虑必须保证系统的高可用性,即检测到任意process的failure,都会由一个(绝对不死)的高可用的JobManager或者MasterNode,来重启(可能在另外的node)这个process, 所以我们定义这种即使所在的host挂掉, 也会不断重新在其他host上重启的process为逻辑process; 这时我们要面临几种可能造成inconsistent的情况:
"计算接受端"没有成功ack"计算结果发送端"的消息,一般表现为发送端的等待ack 超时;根据之前的讨论,接收端有可能把消息处理完毕了(ack的消息丢失,或者刚处理完消息还没发ack就挂了…等情况),也可能没有处理完毕(没接到或刚接到消息就挂了…等情况);
这种情况发送端可以重发信息, 而发送端是需要“上游input提供端”提供某种数据然后进行某种计算后产生的这个消息/计算结果(设为outputA), 那么"计算结果发送端"有两个策略:
策略1: 利用存储计算结果来尽量避免重算
要实现上下游exact once processing,需要实现4个条件
(a. 结果高可用, b.下游去重, c. 上游可以replay, d. 记录上游进度)
a. 要求结果高可用, 应对timeout时, “下游计算接收端”其实并没有成功处理"计算结果发送端"的计算结果的情况(比如下游挂了), 这时"计算结果发送端"可以把计算的结果存储在高可用的DataStore里(比如DynamoDB,Spanner…或者自己维护的多备份数据库); 这样超时只要重发这个计算结果即可, 自己甚至可以开始去做别的事情, 比如处理和计算下一个来自“上游/input提供端“的event, 而已经被“下游计算接收端”ack的"计算结果"则可以清理,一般由异步的garbage collection清理掉. 注意, 由于存在存储失败的可能性, 或者刚计算完结果还没来得及存储就挂掉重启的可能,我们无法真的保证避免重算;详见:无法避免的重算 的例子
b.下游去重,应对timeout时下游其实已经处理完毕消息的情况
一般的解决方案:当逻辑接收端不固定, 比如发送端要根据计算出来的outputA的某key字段把不同的key发送给负责不同key range(也就是partition)的多个"下游计算接收端"; 只需要一个sequenceId就可以实现接收端的消息去重;接收端和发送端各维护一个partition level的sequenceId即可;这样当发送端收到当前message sequenceId(假设为n)的Ack才发下一个sequenceId为n+1的信息,否则就无限重试;而接收端则根据收到的消息的id是不是已经处理过的最大id+1来判断是这是不是下一个message。
Google MillWheel的特例:Google MillWheel做出了一个很有意思的选择,发送端完全不维护sequenceId,而是为每一个发出的message生成一个全局唯一的id,下游则需要记住"所有"见过的id来去重,但是这样会造成大量查询io和存储cost,所以需要另外的方案来解决性能和下游没有无限的存储所以"不可能记住所有id"的问题。这个例子比较特殊,有兴趣的同学可以查阅[4,7]
c. 要求触发本次计算的“上游input提供端”可以replay input event,否则刚接到event还没计算就挂掉重启, 则event丢失;
无法避免的重算:任何时候计算没完成,或者计算完成后但是成功储存前(a.结果高可用的需求), 计算节点fail掉重启, 我们都需要replay上次计算过的input event,所以由于计算结果都还没存成功,所以从物理上讲, 此时我们还是重算了的; 所以即使我们采用把计算结果记录下来的策略, 我们无法从物理意义上真正避免重算, 我们避免的是有多个"重复的"成功计算结果提交给下游;而当计算不是deterministic的, 这多个“重复的”计算结果可能是不同的值发送给不同的下游((比如按照计算结果的key发送给下游不同的partition); 那么下游就会处理同一个event所产生的本应只有一个的计算结果两次,且由于非确定性计算的原因,这两个计算结果不一样; 这就会造成event不是EOMP的问题; (不仅在物理上计算了2次, 在效果上也影响了2次下游的计算, 打破的effective process once的要求)
d: 要求记录event处理的进度, 并保证储存计算结果不出现重复; 记录event处理的进度, 使得trigger本次计算的"event"可以被屏蔽(比如, ack“上游input提供端”, 告知其input event处理完毕, 可以发下一个了), 来避免计算的re-trigger; 这要求以下策略2选一
记录event处理的进度, 和把计算结果存在高可用存储里的操作是一个原子操作, 要么一起成功, 要么一起失败; 这种策略可以保证当计算结果储存下来, 此计算不会replay了;
或者存储计算结果是一个幂等操作,那么可以先存计算结果,再记录event处理进度,一旦计算计算结果成功但是记录event处理进度失败,重新计算上游的同一个event并储存计算结果也不会引起问题。
否则要么计算没存event就被屏蔽掉了, 要么多次计算结果存储在DataStore里造成下游的重复信息; 注意, 此时下游是无法分辨这是重复信息的, 因为这是datastore里的"2条的记录", 将会获得不同的message id;
幂等和end2end argument: 所以实现原子操作就不需要幂等了么? 是也不是, 在业务层是的, 比如要实现业务层的幂等,我们可以在存计算结果到datastore里的时候把一个与触发本次计算的event的唯一id记录在一起,这样我们每次存的时候就可以使用乐观锁的方式test-and-set, 来保证如果这个id在数据库里没有才插入(取决于业务,我们也可以用这个id当主key来,那么即使多次写入同样的内容也没关系=>要求计算是deterministic的;) 如果我们保证触发计算的event的"屏蔽"和计算结果的储存是一个原子操作,那么我们就不需要上边这种复杂的存储策略,因为一旦计算结果存储成功,触发计算的event必定被"屏蔽"掉了, 而如果没存储成功, 则event一定会replay来重试;
然而在传输层却不是的,比如储存数据库的tcp有可能丢包重发,依靠tcp的传输层id自动去重,实现tcp的幂等;
策略2: 完全依赖重算
高可靠重发的问题是,所有信息都必须先记录在高可用性的DataStore里, 相对于重新计算,重发需要的网络IO, 存储,状态管理的cost是很高的;而如果触发计算的event可以replay的话(其实不管重算还是不重算,为了防止“刚接到event, 计算节点就挂掉的情况”, 我们都要求event可以replay), 我们就可以选择重算然后重发来代替存储计算结果的重发;重算需要2个条件:
计算需要是 deterministic 的,用完全一样的数据,必须算出完全相同的结果,否则,当计算结果所需要发送的逻辑下游是由计算结果所决定的情况下(比如按照计算结果的key发送给下游不同的partition) 那么non-deterministic的重算有可能把计算结果发给不同的下游,这样如果重算发生时,下游(假设是节点A)其实已经成功处理完毕重算前上游发送的信息, 只是ACK丢失, 那么重算的结果却发送给了另外一个(节点B), 那么就会造成一个event造成了2个下游effect的结果, 引起一个event造成2次下游影响的结果, 违反EOMP的原则;
重算之前, 状态需要rollback到没有计算之前, 否则会影响需要状态的计算的结果正确性,如果状态更新非幂等,本次计算所做的状态更新也会更新多次;详见"加入节点状态的三节点间的EOMP"
(在多节点流计算里,要求上游可以重发,意味着上游把计算结果存下来了,或者上游可以重算,如果上游需要重算,那么上游需要上游的上游重发,那么上游的上游可以用储存的结果重发或者重算。。。以此类推)
(2种策略其实都有可能造成重算,也都对event replay有需求;为什么还要浪费资源去存储计算结果呢?这里边的重要区别是,当储存结束时,对触发本次计算的上游event的依赖结束了,而不稳定的下游不会造成额外的重算, 和对上游, 上游的上游....计算的"链式反应", 详见流的EOMP中的讨论)
带状态的计算, 比如流计算的某中间节点需要统计总共都收到多少信息了, 每次从上游收到新信息, 都把自己统计的当前历史信息总数更新并发往下游节点, 那么这个"系统的历史信息"就是这个"统计消息总数"的逻辑节点的状态; 由于状态也需要高保活,所以它也一定需要储存在远端dataStore里, 这样储存状态的远端datastore就相当于一个特殊的下游; 不同点在于, 当采用策略2:重算, 而不存储中间计算结果的话, 重算时则需要datastore可以把它所记录的状态rollback到最初刚开始处理此event的那个点; 这里我们只能rollback, 而不能只是依靠幂等来保证“状态的更新是exactly once”的原因是, 节点在处理任意消息时的状态也和当前信息的数据一样是本次计算的input, 而更新后的状态则是本次消息处理的output, 如果重算时不rollback节点的状态, 那么我们就会用一个被本消息"影响过"的状态来进行计算, 而这是会违反exactly once msg processing语义的; 比如节点的本地状态是上次收到的信息的数据上记录的时间戳, 节点的运算是计算2个event数据之间的时间戳差距; 假设eventA发生在时刻0, eventB发生在时刻10, 那么eventB引发的计算应往下游发送10, 并把节点的本地状态更新为10, 此时如果eventB的这个计算需要重算, 但是我们不rollback状态10回到0的话, eventB重算所得的结果就会变成0;
注意: 由于state更新也是处理event的"下游", 那么计算过程中的所有状态更新都可以算作“计算结果”的一部分, 所以当我们需要储存计算结果时,则需要把
(1)状态更新储存回高可靠的statestore里,
(2)记录event处理进度,
(3)把计算结果存在高可用存储里,
这3个操作作为一个原子操作(以后我们称之为"原子完成"来省略篇幅); 而任何时候需要重算的话, 状态必须恢复到处理event之前的样子。
加入state,我们需要把(d. 要求记录event处理的进度, 并保证储存计算结果不出现重复, 更改为 (d+. 要求记录event处理的进度, 并保证储存计算结果和state的更新不出现重复,
并加入要求(e. state需要在replay 上游event的时候rollback到处理event之前时的状态
这些要求稍有抽象,让我们看一下流系统一般怎么达成这些要求;
考虑一个多节点的流系统,如果我们把上游所发来的计算结果当成前边所说的“触发计算的event”,而自己的发给下游的计算结果msg作为触发下游计算的event;那么我们就可以用上边的模型保证两两节点之间的exact once msg processing,从而最终实现端到端的exact once msg processing; 这就是Google MillWheel(DataFlow) 和Kafka Stream的解决方案; 他们都选择把每个节点的计算结果储存起来,并保证即使non-deterministic的计算, 也只有一次的计算会起作用, 而不会出现(策略2-1中提到的non-deterministic造成的不一致);他们的区别是
如何实现state和
如何实现接收端去重
如何实现“原子完成”
Google MillWheel(DataFlow)
每个节点维护一个用来去重的"已处理msgId"集, 从上游收到新msg之后, 检查去重 (b.下游去重)
开始计算, 所有的状态更新写在本地, 由于一个状态只有一个更新者(本计算), 所以可以在本地维护一个状态的view, 所有的更新只更新本地的view而暂时不commit到"remote的高可用DataStore", MillWheel用的BigTable;
计算完毕后, (1).所有的要发送的计算结果,(有一些可能是在计算过程中产生并要求发送的, 都会cache起来), (2)所有的state的所有更新, (3) 引发计算的msgId, 会用一个atomic write写在BigTable里。(a.要求结果高可用, d+.要求记录event处理的进度, 并保证储存计算结果和state的更新不出现重复)
当commit成功之后, ACK上游, 而由于上游也采用commit计算结果到BigTable里的策略,且只有当自己(这里)发出的消息ACK之后, 才会允许 garbage collection回收计算结果占用的存储, 所以在收到ACK之前, 上游的计算结果, 也就是当前计算所需要的msg, 都可以重发,直至本节点计算成功且commit结果 (c. 要求触发本次计算的event可以replay)
一旦计算过程中failure发生(比如机器挂了), 会在另外的host上重启本process节点,从BigTable恢复本地state和"用来去重的已处理msgId集", 由于上次计算的结果还没有commit, 所以满足(e. state需要在replay event的时候rollback到处理event之前时的状态)
新启动的运算节点在load本地状态之前先用自己的sequencer废掉现存的sequencer, 这样BigTable就可以block zombie计算节点的写;
Kafka Stream
Kafka Stream是建立在kafka分布式队列功能上个一个library, 所以在介绍kafka Stream之前, 我们先来讲一下Kafka
简单介绍Kafka Topic
Kafka的topic可以看作一个partition的queue, 通过发给topic时指定partition(或者用一个partitioner 比如按key做hash来指定使用那个partition), 不同的key的消息可以发送到不同的partition, 相同key的message则可以保证发送到同一个partition去, topic里的信息可以靠一个确定的index来访问, 就好像一个数据库一样,所以只要在data retention到期之前,consumer都可以用同一个index来访问之前已经访问过的数据;
Kafka Transactional Messaging
前边说过, Kafka Stream是建立在kafka分布式队列功能上个一个library, 主要依靠kafka的Transactional Messaging来实现end2end exactly once msg processing;
Transactional Messaging是指用户可以通过类似以下code来定义哪些对kafka topic的写属于一个transaction, 并进一步保证tx的atomic和Isolation
producer.initTransactions();
try {
// called right before sending any records
producer.beginTransaction();
//sending some messages...
// when done sending, commit the transaction
producer.commitTransaction();
} catch (Exception e) {
producer.close();
} catch (KafkaException e) {
producer.abortTransaction();
}
Kafka transaction保证了, beginTransactions之后的, 所有往不同Kafka topic里发送的消息, 要么在commitTransaction之后才能被read-committed的consumer看到, 要么由于close或者failure而全部作废, 从而不为read-committed的consumer所见;
而kafka stream通过用kafka本身的分布式queue的功能来实现了state和记录处理event进度的功能; 因为
(1) 所有的要发送的计算结果(由于可以允许计算发不同消息给多个下游,所以可能发给不同的topic和partition)
(2)记录input event stream的消费进度,
(3)所有的state的所有更新,
这3点, Kafka Stream都是用写消息到kafka topic里实现的, (1)自不必说,本来就是往topic里写数据,(2)其实是写当前consume position的topic; (注意Kafka Stream的上下游消息传递考的是一个中间隐藏的Kafka topic, 上游往这个topic写, 下游从这个topic读, 上游不需要下游的ack,只要往topic里写成功即可, 也不需要管下游已经处理到那里了;而下游则需要维护自己"处理到那里了"这个信息,储存在consume position的topic, 这样比如机器挂掉需要在另外的host上重启计算节点,则计算节点可以从记录consume position的topic里读出自己处理到那里然后从失败的地方重洗开始) (3)其实是写一个内部隐藏的state的change log的topic,和一个本地key value表(也就是本计算节点的state); failover的时候, 之前的"本地"表丢失没关系, 可以冲change log里恢复出失败前确定commit的所有state;
(1)(2)(3)的topic都只是普通的Kafka topic; 只不过(2)(3)由Kafka Stream自动创建和维护(一部分用来支持高层API的(1)也是自动创建)
开始计算时, 在从上游的topic里拿msg之前, Kafka Stream会启动一个tx, 然后开始才开始计算, 此时tx coordinator会分配一个新的epoch id给这个producer并且以后跟tx coordinator通讯都要附带这个epochId
Kafka Stream的计算节点的上游信息都来自kafka topic的分布式partition queue, 且只接收commit之后的record, 在queue里的record都有确定的某种sequenceId, 所以只要计算节点记录好自己当前处理的sequenceId, 处理完一个信息就更新自己的sequenceId到下一个record, 且commit到可靠dataStore里, 就绝对不会重复处理上游event, 而只要没有commit这个位置则可以无数次replay当前的record; (b.下游去重, c. 要求触发本次计算的event可以replay)
在tx内部,每从上游topic里读一条信息就写一条信息到记录consume position的topic里, 每一个state的更改都会更新到本地的state(是一张表)里,且同时写在隐藏的changelog里; 计算过程中需要往下游发信息则写与下游联系的topic;
计算结束后, commit本次的tx, 由Kafka Transactional Messaging来保证本次tx里发生的所有(1)往下游发的消息, (2) 记录input event stream的消费进度,(3)所有的state的所有更新是一个原子操作, 由于结果都成功写入kafka topic,所以达到计算结果的高可用性 (a.要求结果高可用, d+.要求记录event处理的进度, 并保证储存计算结果和state的更新不出现重复)
计算过程中出现failure(比如机器挂了), 那么当计算重启,会重新运行initTransactions来注册tx, 此时tx coordinator会分配一个新的epoch id给此producer, 并且从此以后拒绝老的epoch id的任何commit信息来防止zombie的计算节点; tx coordinator同时roll back(如果上一个tx已经在prepare_commit状态, 继续完成transaction, 具体看下边Transactional Messaging这个章节); 如果rollback,那么input的处理进度, 状态的更改和往下游发送的信息都会rollback, 那么计算可以重新开始,就好像没有上次fail的失败一样; 如果上一个tx已经prepare_commit, 那么完成所有信息的commit; 此时当initTransactions返回,当前计算会接着上一个tx完成的进度继续计算;(e. state需要在replay event的时候rollback到处理event之前时的状态)
Idempotent producer
幂等producer主要解决这么一个问题: Kafka的消息producer, 也就是往Kafka发消息的client 如果不幂等, 那么因为Kafka的接受消息的broker和producer之间在什么是“重复信息”上没有共识的话,则broker无法分辨两个前后一模一样的消息, 到底是producer的本意就是要发两次,还是由于producer的重发(比如:producer在收到broker的"接受成功"的ack之前就挂了,所以不知道之前的消息有没有成功被broker接收, 因此重启后重发了此信息)。此时broker只能选择接受消息,这就造成了同一个消息的多次接受;
同时我们也要解决zombie producer的问题: 如果我们保证producer高可用, 重启我们认为fail掉的producer, 那么其实没死的zombie producer的信息则会造成,重复且乱序的发布消息; (由于zombie的存在, 会有2个producer同时发布我们以为只有一个producer会按顺序发布的消息,这样就无法保证顺序: 比如zombie在发送A, B, C...的时候, 新启动的producer也开始发送A, B, C... )
Kafka的解法:
用一个producer指定的固定不变的transactional.id(非自增id,叫producerName可能更好)来识别可能会在不同机器上重启的同一个logical producer; 相当于给producer起了一个logical name;
注册transaction.id来开始session, 而在session里此tx发来的消息都可以通过维护一个sequenceid来dedup.
非正常结束tx的话, 比如机器挂了, producer重启, 那么就会再次注册自己的transaction.id, 则标志前一个session失效, 而所有属于上一个session的信息全部作废(具体看下一节Atomic and Isolation), 这样就可以做到producer的zombie fencing
(Further, since each new instance of a producer is assigned a new, unique, PID, we can only guarantee idempotent production within a single producer session ---- Idempotent Producer Guarantees [26])
Atomic and Isolation
Producer Zombie fencing: 注册transaction.id会申请高可靠epoch id, broker和tx coordinator可以依此fencing zombie的任何写操作 (e.g. tx coordinator关闭tx);. Zombie fencing in confluent.io/blog/trans
多个Tx coordinator跑在kafka broker里, 写是按照tx.id hash给不同的Tx coordinator, 每一个tx coordintor负责subset的transactionlog的partition; 这样保证同一个logic produce启动的tx必定连接同一个tx coordinator; tx coordinator保证所有的状态都在的高可用高一致性的写在tx log里, (且用queue zombie fencing来保护自己的状态一致性, Discussion on Coordinator Fencing in [26]) (Discussion on Transaction-based Fencing, => 如果zombie不跟coordinator再联系,那么可以一直跟broker发垃圾信息... P39in [26])
Producer注册新的tx之后,在给任意topic的任意partition发消息之前,先跟tx coordinator注册这个partition,
当写完毕,producer给tx coordinator发commit,tx coordinator执行2PC,在transaction log里写prepare_commit, 这样就一定会commit了,因为producer 通知commit就代表所有的写已经写成功了, 这一步其实只是把决定记下来;
Tx coordinator联系所有的注册过的topic的partition,写一个commit marker进去。
当所有的marker写完,在transaction log里记录commit complete。
注意:当在第一步tx coordinator在发现新的重复transaction.id来注册时,会检查有没有相同的transaction.id下未关闭的tx,有的话发起rollback,先在transaction log里记下rollback的决定,然后联系所有的注册过的topic的partition, 写入一个ABORT marker;而如果此tx的状态已经时prepare_commit了,那么有可能tx coordinator在下边第6步联系所有partition来commit中间挂掉了,那么要接着完成这个commit过程;即roll forward而不是roll back,
Read_commit等级的consumer需要等待transaction有结果,consumer library读到任何与Transactional Messaging相关的信息,就开始进入cache阶段,并不会运行任何consumer端的计算,只有当读到commit mark,则把cache住的record依次交给consumer端的计算,而当读到ABORT mark,则把相关tx的record全部filter掉;注意: pending的tx会block所有Read_commit等级的consumer对topic的读;
在保证两两节点之间的EOMP来实现整个流的EOMP的模型里,如果我们某一个或多个节点的状态和计算结果根本不记录在高可用DataStore里,我们还是可以实现EOMP, 我们只需要(1)replay这个节点的上游来重算这个节点的状态和发给下游的计算结果, (2)下游去重;
如果上游也没计算结果记, 那么replay上游的上游即可, 如果上游的上游也没记....一直追溯到记录了计算结果的上游节点即可;
如果一直都没有failure,那么比起Dataflow和Kafka Stream那种记录所有计算结果的模型 我们少记录一些额外的计算结果和状态就减少了很多系统负载; 这就是重算与记录计算结果模型的结合;
重算与记录计算结果的结合
考虑 A,B,C, D 4个节点, A的计算结果传给B, 而B则把一部分计算结果给C一部分给D, 如果B没有记录自己的output, 则Cfail掉之后需要replay上游的input,这就需要B的一些重算来重新制造C所需要的input, 即使B的input(即A)记录了所有的计算结果, 我们还需要"恰巧可以产生这些历史计算结果的"B的历史状态,才能重算出C所需要的input; (所以B必须保存历史状态或者用某种方法重建自己的历史状态才能保证可以重算C所需要的input)
如果C的状态也丢失了, 那么对上游的负担则更重些, B需要重新计算来提供所有的历史计算结果(即C的所有历史input)来让C重建自己的历史状态
可以看到, 任意一个节点的某状态S(n+1)是
(1)上一个历史状态S(n), 和
(2)从历史状态S0建立开始所接收到的信息M(n),
同时作为输入而得到的输出; 而这个过程中又会向下游发出一些计算结果O(n+1)
所以M(n) + S(n) => S(n+1) + O(n+1), 当下游crash重启需要O(n+1)时, 我们则有2种选择:
1.记录O(n+1),
2.不记录O(n+1)但是记住, O(n+1)是根据什么数据生成的
1.是记录计算结果, 2是重算; 两者并用的好处在于, 1可以异步batch进行而不需要节点必须等待O(n+1)记录成功才往下游发送O(n+1); 而2保证了当1还没有完成时, 系统也有足够的信息可以重建O(n+1);
这是一个链式反应, 当重算需要M(n)和S(n)时, 而如果M(n)并没有存则需要上游重算M(n), 上游还没存这些重算M(n)的信息则需要replay上游的上游来重算这些信息,这就是所谓的链式反应...;最极端的情况是什么都没存,那么需要从头开始跑我们的stream程序;
可以看到, 如果没有存中间计算结果或者状态, 那么当这个数据被下游重算需要的时候, 需要我们重算这个数据, 这就会产生对上游的计算结果或者状态的需求, 这就要求我们如果不存下这些数据, 我们就需要记住计算这个数据的数据依赖图, 所以要么把"中间"数据和状态存起来待用, 要么记住他们的数据依赖图; 而这些记录的中间结果只有当对其的所有依赖从计算图中消失时, 我们才可以垃圾回收/删除这些数据(比如所有基于某状态的计算结果都已经存下来了, 那么这个状态的数据就可以删除, 再比如某计算结果所引发的下游计算结果和状态都已经存下来了, 那么此计算结果的数据就可以删除了);从而不会造成储存数据爆炸;
这, 也就是Spark Streaming的解法;
Spark
Spark有三种Stream...
(1)快要被deprecate掉的DStreaming [10, 14]
(2)新一代为了弥补和Flink之间差距的, 支持event time的Structural Streaming(可惜还是有很多不足, 具体的不同和哪里有不足, 要留到对比各个系统对event time和windows操作的支持的对比, 也就是下篇来详细描述了) [12,13]
(3)实验中的Continuous Streaming(Spark Continuous Processing) [11, 20]
(3)还在实验状态, 基本上是把底层都改掉来使用了和Flink相同的Chamdy-Lamport算法[20], 但是貌似还有很多问题需要解决所以目前不支持EOMP, 这里不多聊了;
根据Structural Streaming的论文[12], (2)和(1)使用了相似的方法来保证EOMP, 但是其实作者发现(2)比起(1)还是有一些性能上的改进[21], 但是总体原则还是和(1)类似的利用一个重算关系图lineage来维护各个状态计算结果的依赖关系, 通过异步的checkpoint来截断lineage也就是各个节点状态和计算结果复杂的关系(比如一个数据如果已经checkpoint了, 那么它所依赖的所有状态和计算结果都可以在关系图里删去, 因为replay如果依赖于这个数据, 那么使用它的checkpoint即可, 而不需要知道这个数据是怎么算出来的, 如果还没checkpoint成功, 则需要根据数据依赖图来重算这个数据), 像这样利用checkpoint, 就可以防止lineage无限增长;
但是维护关系图需要利用micro-batch来平衡"关系维护"造成的cost, 否则每一条信息的process都产生一个新状态和新计算结果的话, 关系图会爆炸式增长(用micro-batch, 可能1000条信息会积累起来当作"一个信息"发给下游, 只需要在关系图里记录一个batch-id即可, 而不是1000个msg id, 对与状态来说也是这样,处理1000个msg之前的状态分配一个id, 处理这1000个信息之后的状态一个id, 而不需要记录1000个状态id, 同时他们之间的联系线也从1000条降低为1条;这样就大大减小了关系图维护的负担);
但这样造成的结果是micro-batch会造成很高的端到端处理的latency, 因为micro-batch里的第一条信息要等待micro-batch里的最后一条信息来了之后才能一起传给下游; 而这个等待是叠加的,当stream的层数越深,每一层的micro-batch的第一条信息都需要等待最后一条信息被处理完毕,相比在每一层都毫不等待,micro-batch造成的额外latency就会叠加式的增高;
注意, Spark Structured Stream提供了一种continuous mode[11,12,13,20]来替代micro-batch,解决了latency的问题,但是目前支持的operator很少,且不能做到exact once msg processing, 这里不多加讨论了(不过将来有望做成和flink一样的模式, 毕竟也用的Chandy-Lamport Distributed Snapshot algorithm). : Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.[13]
spark的micro-batch会造成严重的latency问题, 而Dataflow和Kafka Stream的方案要求记录每一个计算结果, 则会在大大增加系统负担的同时也会有不小的latency附加; 那么有没有一种方法可以不记录所有中间计算结果, 并且也不使用micro-batch呢?
我们来看看flink的艺术;
Flink
如果我们不储存流系统中间节点的计算结果在高可用DataStore里, 也不想维护复杂的数据依赖图(需要micro-batch的根源), 那么当一个节点fail掉需要replay上游的input的时候,上游就必定需要replay自己的上游,且自己的状态要rollback到没有接收这些要replay的消息之前的状态;对上游的上游就有相同的要求,那么最终所有节点的上游最终会归向数据源节点,并要求"重新replay";总而言之2个要求:
数据源节点可以replay, 并产生层层的蝴蝶效应的"每个节点对上游要求的replay"
所有的计算节点的状态,要恢复到没有接收到"上游所replay的消息"之前的样子(所以replay后可以回到现在的状态,且重新生成下游所需要的input, 即当前节点在处理这些replay消息时产生的计算结果集);
全局一致点和全局一致状态集
为了方便讨论,我们定义2里所提到的global的状态为一个全局稳定点; 显然,如果我们一条消息一条消息的处理,数据源节点等待直到所有流节点处理完这条消息所产生的蝴蝶效应信息之后,才发出下一个消息B0,那么在消息B要发出但是没发出之前,所有的节点的状态就满足我们对全局稳定点的需要; 比如当我们持续处理B1,B2,B3...B100, 这时一个节点fail掉了,那么我们只要流系统的所有节点rollback他们的状态到发出B0前的"全局稳定点", 整个系统的计算和状态就会干净的回道任何节点都不曾被任何B0-B100所影响的状态, 那么此时从数据源节点replay B0, B1, B2... B100 成功, 这些消息就"exactly once process"掉了。所以,我们找到了第一个不需要micro-batch, 也不需要记录中间节点计算结果,就能实现EOMP的方法:
每n条信息, 或者每一段时间, 数据源节点(或者流系统的第一个入口节点)停止向下游发送任何信息, 直到所有节点报告说有关这条信息的所有派生信息(由于这条信息引起的第一个计算节点的计算结果会发送给它的下游, 下游的计算结果又会发送给它的下游...等等这些都是派生信息)都已经处理完毕, 此时把所有节点的状态checkpointing在高可用DataStore里, 建立一个全局稳定状态集(由流系统中每个计算节点各自的全局稳定状态所组成), 数据源才开始继续发送信息...这样, 任意的节点fail掉, 我们只要在别的机器上重启这个计算节点并download之前checkpoint的状态,流系统的所有节点也rollback到上一个全局稳定状态即可, 由于数据源发送数据的进度也属于全局稳定状态集中的一员, 所以当数据源rollback自己的状态,则可以开始replay 全局稳定点checkpoint之后才发送的信息,而此时所有节点都已经rollback到一个"从没见过这些信息和它们的派生信息"的状态了,整个系统就好像从来没有见过这些信息一样, 从而实现即使failure发生,我们的系统也可以实现EOMP;
更进一步, 我们来看如何不停住数据源的信息接收,我们所需要处理的问题;
任意时间点的全局状态,都不是全局稳定点: 如果所有节点都不等待后续节点有没有处理完信息, 那么任意时间点, 在流的中间节点建立全局稳定状态的时候 ,流上游的节点已经开始处理新的信息, 它们的全局稳定状态早已被新的信息所影响了, 而下游可能还没收到建立全局稳定状态所需要的信息;
随意指定的全局稳定状态集可能根本不存在, 比如数据源连续给A和B发出x,y两条信息, 而A和B则需要把计算结果都发送给C,如果我们想定义全局稳定状态为所有节点"处理完x相关的消息之后", 但是"处理完y相关的信息之前"的状态;那么考虑这样一个运行顺序: A处理完x向C发出x-A, B处理完x, y后向C发出x-B, y-B, 然而由于网络和处理速度的因素, C在还没有收到x-A的情况下就处理完了x-B, y-B, 所以C的一个"干净"的从未被y信息影响的状态,但包含了所有需要的x信息的稳定状态, 在C的状态变迁过程中是从来不存在的 (即, 处理完x-A和x-B,但是没有处理y-B时的状态)。
问题1意味着我们不能用物理时间来建立全局一致状态集, 那么既然流的不同节点接收到数据源任意消息x的派生消息的时间不同, 那么只要我们能让所有节点分清哪些是x的消息和派生消息, 哪些是x之后的消息和派生消息, 所有节点就可以在处理完x的派生消息之后把本地状态复制一份储存在高可用DataStore里, 作为全局一致状态集的一员;
问题2意味着即使允许计算节点连续处理input而不必等待所有下游建立好全局一致状态才发下一个计算结果, 计算节点也不能盲目的不加考虑的处理上游信息, 我们要使得计算节点的状态变迁过程中, 至少全局一致状态是可以出现的;
Flink的解法就是由一个高可用的coordinator连续发出不同的stage barrier(比如先给所有src发1,然后1分钟后发2,2分钟后发3..... 如此增长), 夹杂在发给数据源发出的数据流里, 所有的节点都必须忠实的转发这个stage barrier, 这样所有的节点的
input都分为了接收到某barrier(设为barrier-a)之前的信息和收到barrier-a之后的信息,
所有的发给下游的计算结果也分为自己发出barrier-a之前的信息和发出barrier-a之后的信息;
所有的状态变迁也分为,用所有接收到barrier-a之前的信息, 所建立的状态, 和收到barrier-a之后被新的信息影响了的状态;
那么如果所有节点都遵循2个原则:
只用"接收到barrier-a之前的所有信息", 来建立自己的本地状态,并备份在高可用DataStore里
只使用"接收到barrier-a之前的所有信息"来计算结果并发送给下游之后, 才转发barrier-a; 然后才开始处理"接收到barrier-a之后的信息"; 这样就保证了自己在往下游发送barrier-a之前所发的所有计算结果, 都没有被自己所收到的barrier-a之后的新消息所影响(自己发送的barrier-a之前的计算结果只和自己接收的barrier-a前的input集合相关)。
而当所有的节点都保证"自己发送的barrier-a之前的计算结果只和自己接收的barrier-a前的input集合相关", barrier-a就成了系统系统的分隔点,而所有节点遵循原则-1所建立的本地状态备份, 也绝对没有被数据源发出的在barrier-a之后的信息和它们的派生信息所影响; 而这些所有本地状态备份的全集,则组成了全局一致状态集;
一个细节, 当一个节点只有一个input channel的时候, 只要按顺序处理input信息即可; 而当一个节点有多于一个input channel的时候, 一个input channel的barrier-a已经接收到, 但是其他channel的barrier-a还没有收到怎么办呢?
从收到barrier-a的channel接收新的信息并处理可行么? 显然不行, 这样违反了原则-1, 因为"barrier-a之前的信息全集"还没有凑齐(其他channel的barrier-a还没有收到), 此时如果处理了任何属于barrier-a后的"新"信息, 我们就再也无法在状态变迁中得到一个"干净"不受barrier-a后的"新"信息所影响的状态了, 这意味着我们必须block 这个已经收到barrier-a的channel;
我们可以向下游转发barrier-a么? 显然也不行, 这样违反了原则-2, 理由相同, 我们还没有收到"barrier-a之前的信息全集", 而从其他channel收到barrier-a之前还收到其他信息的话, 它们所产生的计算结果也必须在转发barrier-a之前发送;
由1,2就很清楚可以推理出flink的算法了:
收到任意input channel 的barrier-a之后, block此channel;
收到所有input channel的barrier-a之后, 把当前状态checkpoint并备份到高可用的DataStore里; (这里可以做到异步checkpoint并不会影响latency, 详细介绍看后边的异步checkpointing这一节),
收到所有input channel的barrier-a之后, 并且处理完所有此前收到的信息并向下游发送计算结果完毕后, 向所有和自己相连的下游转发barrier-a;
当所有节点都备份完成,我们就得到了一个全局一致状态集, 或者说全局一致状态快照; 系统的稳定点就进步到了barrier-a, 如果下一个barrier是barrier-b, 那么在得到barrier-b的全局一致状态集之前, 如果系统出现failure, 我们就可以通过重启所有计算节点的方式, 让所有节点reload barrier-a所记录的状态集, 从而实现把所有节点的状态rollback到"上一个全局一致"的状态, 使得流系统可以重置到好像根本没有看到过任何barrier-a到barrier-b之间的信息的一样, 然后重跑这段信息;
通过干净的rollback了可能造成的重复处理的痕迹, 使得所有信息的效果都只发生了一次, 所以我们得到了一个端到端的EOMP系统;
异步checkpoint可以使得, checkpoint本身不会block流本身的计算,增量checkpoint避免了,每次一点小变动都需要checkpoint全部的state,可以节省计算机资源(比如网络压力)
flink和spark这种需要checkpoint的系统都可以做到异步增量checkpoint, 且这个技术也很成熟了, 本文只选flink的方法[35]来简单说明一下 , Spark的可以看[21]
Flink的异步增量checkpointing
Flink使用RocksDB 作为本地状态储存, RocksDB本质上就是一个LSM tree, 对状态的写会写在内存的memtable, 一般是一个linked hashmap, 写到一定大小就存到硬盘里变成sstable(sorted-string-table), 不再更改; 此后会开一个新的memtable来接受新的写; 这样会按历史时间来生成很多小文件, 读的时候先读memtable,如果里边有想要的key对应的value,必定是最新的,否则按历史时间顺来查sstable(sstable有自己的cache, 所以未必需要读硬盘); 对于flink来说, 当需要checkpoint的时候, 只需要把当时的memtable写在硬盘里即可, 这是唯一一个需要block住当前计算的操作, 此后也只需要把从上个checkpoint开始, 新生成的sstable异步发送到高可用的远程文件系统即可(比如S3, HDFS); 这样就做到了异步(发送到高可用datastore是异步执行的),和增量(只发送新增文件);
注意, 由于太多的小文件的sstable会造成读的性能问题, 所以RocksDB需要异步的compact这些小文件到一个大文件, 对此flink也需要做出一些应对, 详见[35], 例子给的非常清楚,这里不再赘述;
以上的讨论都是关于中间件内部如何实现EOMP, 但是由于end to end argument的影响, 中间件提供的保证再多, 没有source的支持, 它也无法区分source(流系统的event来源)发来的2个内容一样的event, 到底是"同一个"信息的重发, 还是"本意"就是想要中间件处理两次的两个"不同"event; 对sink(流系统计算结果的去处)来说,由于failure造成的重算,zombie的存在, 则需要sink能够"融入"到流系统的EOMP体系中去; 对于source的要求基本就是重发和对消息提供能区分到底是不是一个event的eventId,一般就是Kafka那样就OK, 比较简单就不多讨论了; 这里着重聊一下sink; Sink主要有两种手段来配合流系统中间件的EOMP, 幂等和2阶段提交(2PC)
幂等Sink
最简单的来配合流系统EOMP的策略就是幂等, 由于是外部系统, 所以重用我们的"两节点EOMP模型"基本不可能, 因为基本不可能用一个tx来把要写外部系统的操作和记录已经处理过这个操作用一个原子tx来commit, 这也是流系统为什么要支持2PC的原因;
由于幂等保证对同一个计算结果写多次和写一次一样, 所以无论是什么流系统, 无论系统是重算型, 还是记录计算结果来避免重算型, 幂等的sink都可以很好的支持; 所以Dataflow/Spark/Kafka Stream都是靠幂等的sink来实现EOMP
幂等的问题在于无法应对需要重算, 且计算可以是non-deterministic的情况, 详见: 后边(Latency, 幂等和non-deterministic)一节的讨论; 这也是Spark Streaming, 使用幂等sink的Flink无法支持non-deterministic计算的本质原因;
相比之下, dataflow总是记录计算结果来避免重算(即使重算也只会有一次重算的结果会影响下游), Kafka Stream支持tx可以保证只有一次计算结果可以被commit到Kafka Stream里, 如果sink也只读committed上游kafka stream, 则可以保证即使计算是non-deterministic的, 也只会有唯一commit的计算结果被读到(其他的计算结果没有commit marker而被Kafka data comsume API忽略)从而影响sink的外部系统; 而Flink的2PC sink也做到了重算会直接导致sink的外部系统可以配合flink的global rollback, 所以只会有一次的计算结果被外部系统接受(commit);
所以Spark Stream在4个流系统里, 是唯一一个完全无法支持non-deterministic计算的流系统;
Flink独特的2PC Sink
2PC对很多熟悉数据库的人来说应该是臭名昭著了, 这是很复杂和很容易造成问题而需要极力避免的东西; 但是时代在变化, 2PC在新时代也有了弥补自己问题的很多解法了,这里简单介绍一下;
2PC协议由一个coordinator,和很多参与2PC的异构系统组成,发起2PC的时候 coodinator要求所有人pre-commit,这是2PC的第一个P(phase),如果所有tx参与者都可以pre-commit并告知coordinator,则coordinator告诉所有人commit,否则告诉所有人abort,这是2PC的第二个P(phrase)
2PC最大的问题是它是一个blocking协议,blocking的点在于当coordinator和某一个2PC的参与者A挂了,其他参与者无法作出任何决定,只能等待coordinator或者死掉的那个参与者A上线,因为这时所有其他参与者都无法判断以下两种情况到底那种发生了,从而无法决定到底是commit还是abort
coodinator已经收到了所有人的pre-commit并告知参与者A commit,A commit后就挂了
A并不能pre-commit,但是coodinator在告诉所有人需要abort之前就挂了
在情况1. 所有其他参与者都应该commit,在情况2,所有其他参与者都应该abort;由于无法辨别到底是情况1. 还是2. 所有其他参与者必须block等待,这对很多数据库来说意味着为此tx加的锁都不能放掉,从而影响数据库的其他不参与2PC的操作,甚至锁死整个数据库;而如果coordinator或者参与者A无法再上线或者状态丢失,则需要非常复杂的人工操作来解决其他参与者应该如何决策的问题;
虽然2PC有各种问题, 但是在consensus协议早已经成功分布式系统的基石, 各种开源和标准实现可以被轻松获得的今天, 用consensus协议来弥补2PC的问题已经成为一个"已经解决的问题", 如[25]4.2 The Paxos Commit Algorithm 中所说:
…We could make that fault-tolerant by simply using a consensus algorithm to choose the committed/aborted decision, letting the TM be the client that proposes the consensus value…
解决2PC问题的关键在于保持coordinator状态的高可用性, 那么只要coordinator保证把commit或者abort的决定记录在一个consensus cluster里即可,比如etcd或者zookeeper,这样coordintor死了,重启从consensus cluster里恢复状态重新告知所有参与者到底应该commit还是abort即可; 这也是为什么各种流行的分布式系统实现分布式tx都是用2PC的原因, 比如dynamoDB, Spanner, Flink, Kafka...
Flink的2PC Sink
2PC的第一个P的关键在于所有tx参与者在不知道其他参与者状态的情况下,承诺未来一定可以前进commit成功或者干净的回退abort;当前的tx参与者准备好了,且同意commit,2PC的第二个P的关键点在于整体系统的”唯一决定”统一的推进或者回退各个参与者的状态; 而Flink的global state其实可以看做一个2PC,当一个节点收到所有的上游的barrier-n时,这个“契机”可以看做收到了coordinator的可不可以precommit的问询,而当localstate已经在remote 存好之后,当前节点就可以告诉coordinator它准备好了,这可以看做回复precommit(如果此节点在发给precommit);而当所有的节点都通知coordinator“准备好了”之后,coordinator就可以记录下barrier-n的global state完整checkpoint的这个事实,这相当于一个不需要发给tx参与者的commit;这是由于当failover的时候,是由coordinator告诉所有节点应该从哪个checkpoint点来恢复本地状态,所以各个节点的localstate到底是commit了还是rollback了,可以完全由“有没有记录下barrier-n的global state完整checkpoint成功”这个metadata推算出来,所以也就不需要单独给各个节点发commit/abort信息来让各个节点commit或者abort了。
当系统状态只涉及到flink的内部状态时(flink提供的stateApi所提供的statestore), 如果一个某节点X在回复precommit之后挂了,coordinator还是可以选择commit,因为组成global state的节点X的local state已经完整的存储在remote的datastore里了;但是如果涉及到外部状态,比如sink需要把计算结果存储到一个非flink控制的数据库中去时,flink的sink节点就相当于这个外部数据库的client,需要连接外部数据库并把数据存入外部数据库;要使得外部数据库的状态和flink的状态保持一致,则需要sink把外部数据库的状态引入到flink global state的2PC里,而coordinator在决定commit或者abort的时候,必须通知sink来执行外部状态的commit或者abort,因为coordinator是不知道外部状态到底是什么,也无法简单的用通知sink从不同的globalstate点恢复来代替2PC的commit/abort通知;同时sink收到barrier-n时,sink要保证外部数据库里与barrier-(n-1)到barrier-n之间信息相关的数据更改,处于一种“在任何情况下都一定可以commit成功,但是还没有真的commit,所以外部数据库的消费者不可见这些状态,且可以rollback的,可进可退的状态”;[40]给出了如何用文件实现的一个例子;我这里给出一个如何使用支持transaction的数据库的例子;
首先为了避免产生歧义, 我们定义:
flink-precommit ack为 barrier-n流到各个节点(包括sink), 各个节点完成local snapshot checkpoint后发给coordinator的ack, sink则是完成“某个操作”后发给coordinator的ack, 这个操作需要把外部系统(比如数据库)置于一种, 保证任何情况下都可以服从coordinator的最终决定的状态, 一个既可以commit(如果coordinator最终决定commit), 又可以rollback(如果coordinator决定abort)的状态, 且数据不为外部系统的consumer所见;
定义flink-commit为coordinator收到所有人的pre-commit ack后的的最终commit决定;
定义db-commit就是普通的外部数据库的commit;
当程序开始,sink立刻开一个外部数据库的transaction,当sink收到上游的所有的barrier-1,则立刻db-commit当前transaction然后回复coordinator flink-precommit成功(flink-precommit ack),因为此时如果不db-commit,一旦回复coordinator flink-precommit之后,这个sink挂了,那么外部数据库一般就会自动rollback;此时就算sink在其他机器上重启,我们也丢失了所有要最终flink-commit的数据; 而如果这个sink的crash是发生在coordinator收到所有节点的flink-precommit ack并最终决定flink-commit之后, 所有其他节点(比如另外一个sink)的状态可能都commit了(所以无法简单rollback); 而只有此sink的所有数据都无法恢复, 这就破坏了global consistency;
但是上边我们在flink-precommit阶段就db-commit了外部数据库的transaction; 这时会有两个问题: 第一, 我们暴露了只是应该precommit的数据(这些数据不应被数据库的外部consumer所见); 第二, 如果有一个其他节点不同意commit而发给coordinator abort的决定, 那么coordinator则会决定abort, 所以我们的sink则需要服从rollback的决定, 但是我们已经db-commit了的数据, 而一般数据库都不支持rollback已经commit的数据, 这就造成了问题; 为了解决这两个问题, 这时我们需要设计一个和外部数据库的数据消费者的数据“屏蔽协议”;比如利用一个字段来表示当前数据只是“precommit”,所有的外部数据库的读写者都应该忽略这些数据(而只有当这个字段是committed才能读写);这样当flink的coordinator通知flink-commit时,我们用另外一个外部数据库的tx来把所有涉及到的precommit的数据的这个字段改为committed即可, 这就解决了第一个问题;对于第二个问题来说, 如果最终flink coordinator决定abort, 我们把此字段设为abort并利用一个异步垃圾回收的程序把所有标记为abort的数据清理掉即可;
这样设计的关键是, 即使sink precommit ack之后挂了, 我们要flink-commit的数据也不会丢; 所以其实flink-precommit ack时, sink把数据写在任何其他可以保证数据高可用的地方都行(只要sink fail掉重启之后还能找到它), 未必需要是同一个数据库的同一个表; 如果采取这种策略, 那么在flink-commit时则需要重新把要db-commit的数据从存的地方读出来, 然后重新写入到真正要写的数据库并db-commit;
flink提供了一个TwoPhaseCommitSinkFunction,[40]里有详细描述如何简单的extends这个interface来实现一个可以和flink的global consistency配合的sink节点的逻辑,本文不再赘述;需要注意的一点是,当sink收到coordinator的flink-commit指令之后,运行sink的db-commit逻辑,在外部数据库的db-commit更改完毕(比如把要commit的数据的status的值从precommit改为committed)后,但是flink记住sink已经完成commit之前(flink在跑完sink的commit函数后会记住这个sinki已经commit了, 所以不再重复call sink的commit, 否则flink就会一直重试commit), 此时,一旦sink挂了,那么在另外的机器重启的sink,flink无法得知外部数据库已经commit成功了,所以flink会再次重试commit函数来尝试commit;从而造成重复commit,这也是[40]中提到的commit必须设计为幂等操作的原因。
注意1: 可以使用2PC作为sink的关键是, 你的sink可以保证在ack pre-commit之后, 保证无论任何情况都可以成功commit; 这不是说你的sink所连接的外部系统支持tx就可以的, 需要application设计者根据情况具体设计; [1]的P213页, 就描述了sink是用kafka transaction记录计算结果到kafka,但是即使用了transaction也可能丢数据的一种edge case; 而[41] Kafka 0.11 and newer=>Caveats 里也有提到;
丢失数据的原因就在于, kafka sink的默认实现:FlinkKafkaProducer011, 在precommit的时候没有真的commit数据, 因此当kafka sink fail掉没有及时重启, 一旦kafka tx超时, 所有tx里的数据都会丢失, 而此时如果coordinator已经决定commit就绝不会再重发数据(source也已经commit发出的消息的index),从而kafka sink的此次tx的所有数据永久丢失;
这里提供的DB版本的sink实现思路, 在precommit阶段就commit数据, 来保证“无论如何数据都不会丢”, 但是用app level的flag屏蔽外部可见; 这样做的原因就是为了克服类似kafka sink的这种缺陷.
注意2: 使用2PC Sink的Flink应该是可以应对non-deterministic计算的, 因为一旦failure发生, 所有之前的状态和对sink的写入都会被rollback; 但是这样的话, Flink在sink端就变成了micro-batch模型, batch大小取决于发barrier的频率; 但是即使这样, 由于只有sink需要聚集一个batch才能做一次2PC, 但是中间节点往下游发送计算结果还是即算即发的, 所以比起Spark这种所有中间计算都是micro-batch,micro-batch造成的额外latency会叠加式的增高的模型, 端到端的latency应该还是会要小一些;
利用幂等的sink可以做到实时记录计算结果, 达到最小的end to end latency; 因为sink根本不需要等待barrier, 来一条计算结果就向外部系统commit一条记录就好, 而由幂等保证了就算整个系统开始重算, 在sink端也会表现出每个source端的event只产生了一次效果的结果;
但是幂等是很难克服non-deterministic计算的; 因为non-deterministic计算使得同一个source发出的event引起千变万化的"蝴蝶效应" (比如第一次计算event生成的Key是A, 第二次重算生成的Key是B, 如果下一个节点是partitionByKey, 那么这里的2次计算结果就会发送给了完全不同的下游节点, 考虑几百次不确定计算引起的不同蝴蝶效应, 等计算结果到达各个sink节时, 计算的key和value甚至结果的个数和在sink节点的distribution都完全不同了, 那么sink也就完全无法利用幂等来屏蔽掉同一个event replay所造成的"蝴蝶效应"了)
相比之下, 如果整个流系统的计算都是确定性的, 那么无论在source端replay多少次同一个event, 它所产生的"蝴蝶效应"在sink端也必定相同, 则application设计者则可以很容易设计出幂等操作来屏蔽掉重复的计算结果;
如果业务里无法去除non-determnistic的计算, 那么你只能选择Google Dataflow, KafkaStream,或者Flink+2PCSink; 而只支持幂等的Spark和利用幂等sink的Flink无法支持non-determnistic的业务计算.
Stream Processing with Apache Flink: Fundamentals, Implementation, and Operation of Streaming Applications
Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems
Lightweight Asynchronous Snapshots for Distributed Dataflows
Streaming Systems: The What, Where, When, and How of Large-Scale Data Processing
Kafka Streams in Action: Real-time apps and microservices with the Kafka Streams API
The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, Out-of-Order Data Processing
MillWheel: Fault-Tolerant Stream Processing at Internet Scale
Distributed Snapshots: Determining Global States of Distributed Systems (Chandy-Lamport)
State Management in Apache Flink R Consistent Stateful Distributed Stream Processing
Discretized Streams: Fault-Tolerant Streaming Computation at Scale
Continuous Processing in Structured Streaming Design Sketch
Structured Streaming: A Declarative API for Real-Time Applications in Apache Spark
Structured Streaming Programming Guide 2.4.3
Spark Streaming Programming Guide2.4.3
Watermarks, Tables, Event Time, and the Dataflow Model
Kafka Streams’ Take on Watermarks and Triggers
Streams Architecture Kafka
Enabling Exactly Once in Kafka Streams
Transactions in Apache Kafka
Introducing Low-latency Continuous Processing Mode in Structured Streaming in Apache Spark 2.3
State Management in Spark Structured Streaming
Process Large DynamoDB Streams Using Multiple Amazon Kinesis Client Library (KCL) Workers
Big Data: Principles and best practices of scalable realtime data systems
Making Sense of Stream Processing
Consensus on Transaction Commit
Exactly Once Delivery and Transactional Messaging in Kafka=>docs.google.com/documen
End-to-End Arguments in System Design
Transactional Messaging in Kafka
Akka Split Brain Resolver
Unreliable Failure Detectors for Reliable Distributed Systems
The Weakest Failure Detector for Solving Consensus
Exactly once Semantics are Possible: Here’s How Kafka Does it
24/7 Spark Streaming on YARN in Production
Monitoring Back Pressure (flink)
Managing Large State in Apache Flink: An Intro to Incremental Checkpointing
Impossibility of Distributed Consensus with One Faulty Process (AKA, FLP impossibility)
Kubernetes in Action
Akka:Auto-Downing(DO NOT USE)
ZooKeeper: Distributed Process Coordination
An Overview of End-to-End Exactly-Once Processing in Apache Flink
Kafka producers and fault tolerance
本文授权转载自知乎:https://zhuanlan.zhihu.com/p/77677075
1、
2、
3、
4、