vlambda博客
学习文章列表

初探响应式编程框架Combine构建的分工协作体系(一)

Combine 是苹果公司于 WWDC2019 推出的响应式编程框架,它的出现,使得代码的组织结构可以更加紧凑强健,非常有利于表述繁复的程序逻辑,尤其是当一个程序任务包含了大量的异步处理步骤时,与非响应式编程范式相比,其优势显著。

在讨论响应式编程时,一般是采用弹珠图 (Marble Diagrams)的形式来配合理解它是怎样运作的,但是我个人在学习这一概念的过程中,发现弹珠图有时候并不够好用,理解其中一些知识要点时依然困难,最终我逐渐形成了一套自己的独特理解方法,因此本文会尝试使用一个新的效用模型来配合讨论,这些效用模型是用来向我自己解析疑点难点的重要工具,希望对读者诸君也能有所帮助。

一: 由静态到动态

我们生活在数据的海洋之上,但大多数时候只能偶尔捧起一掬,而无法时刻关注所有的汹涌澎湃。而如何对待那些时刻涌动的数据变化,极大地影响到我们的编程质量和效率。

先由一个简单的例子说起:

class NumbersController { var numbers = [3,6,9,7,4,2] var avg: Float?
func caculateAverage() { guard numbers.count > 0 else {return}
let sum = numbers.reduce(0, +) avg = Float(sum)/Float(numbers.count) print(avg!) }}
let ctr = NumbersController()ctr.caculateAverage()ctr.numbers.append(5)ctr.numbers.append(8)
//输出:5.1666665

在第一次运行caculateAverage方法后,再对numbers进行了两次元素添加,不会再次触发caculateAverage方法,对avg的数值毫无影响。也就是说,每当numbers发生改变时,如果没有及时再运行caculateAverage方法,和数组关系密切的变量avg包含的会是老旧的错误数据。这种根据某个时间点的数据处理一些逻辑任务,处理完毕后对后续的数据变化不敏感的事例,可以理解为 单次静态  的响应。很明显,这个机制就好像一台不带动力的机器,推它一次只运转一次然后它就停下来了,因此它的缺点很明显,很可能因为开发者的失误导致使用的数据是老旧数据,并不能反映最新的数据变化。

变量numbers可能会随时发生变化,如果把一次变化称为一个事件(Event),把对应的数据称为一个元素(Element),这样一个可能持续输出元素的源头称为事件源(Event Source)。很明显,我们需要一套能持续响应事件源的机制。我们希望能接入数据的汪洋,而不是捧起一掬就匆匆上了岸边;我们不希望忽视这种随时间流逝而后续产生的数据变化,把时间这个维度也添加到考量中;我们希望针对事件源的动态变化特性,设立持续性的关注和响应,对它输出的每一个元素敏感,一遍又一遍地对其做出响应,这可以理解为 持续动态  的响应。

一般来说,事件源输出一个元素后,需要经过一系列环节的处理才能得到最终适合使用的数据,这样势必形成一条处理链,处理链将成为持续动态响应机制的标志性形态,而极为注重效率的现代工业化大生产,已经为我们建立这样一个持续动态响应机制提供了一个极佳的参考,那就是生产流水线。

二: 流水线

The Combine framework provides a declarative Swift API for processing values over time.

Combine 正是参考生产流水线(往后简称流水线)的形式建立起来的新型响应式编程框架。一条Combine流水线由一系列工序 单向有序 地连接在一起组成,末端的工序向 上游 声明所期望的元素数量后激活流水线(有限个或无限个),开端的事件源开始持续动态地向 下游 输出元素(每一轮输出一个元素,直到满足指定的输出数量,或者全部元素输出完毕),该元素经过中间系列工序处理后,形成满足需要的数据到达末端的工序供开发者使用。

在Combine框架里,Publisher 协议定义了对一道工序输出的规范要求,Subscriber 协议则定义了对一道工序输入的规范要求,当一道工序的输出规范和另一道工序的输入规范兼容时,两道工序便可以连接在一起成为流水线的一部分。作为开端工序的事件源,通常仅采纳 Publisher 协议,所以又将其称为Publisher;末端工序通常仅采纳 Subscriber 协议,所以又将其称为Subscriber;而中间的系列工序,既需要接收上一道工序的输出元素作为自己的输入,也需要输出元素到下一道工序,所以同时采纳 SubscriberPublisher 两个协议,这样它的身份既是Subscriber又是Publisher,这时将它称作Operator。

也就是说,除了作为末端工序的Subscriber外,其余工序都有一个Publisher的身份;除了作为开端工序的Publisher外,其余工序都有一个Subscriber的身份。

初探响应式编程框架Combine构建的分工协作体系(一)

一个元素从上游流向一道工序作为输入,经过该工序处理后,它的值可能发生改变甚至可能转变为完全不同类型的数据,然后再作为该工序的输出继续往下游流动。

初探响应式编程框架Combine构建的分工协作体系(一)

更为重要的是,这样一条流水线它的每一道工序既可以容许同步(synchronously)的输出也可以容许异步(asynchronously)的输出,在异步网络操作日益增长的今天,赋予了它极大的运用优势。

2.1 同步输出

例子里用到的工序,暂时不明白它具体的效用也没关系,后文将会对它们有详细的讨论,现在只要对一个个工序如何组成一个流水线形成一个直观感性的认识即可。

先来看一个同步输出的例子,假设需要打造一条依次处理一个整数数组里所有元素的流水线,首先需要依据数组打造一个事件源,它会在流水线激活后以同步的方式依次输出这些整数元素,这也是流水线的开端工序:

var numbers = [0,1,2,3,4,5,6]numbers.publisher

接下来我们添加一道工序,将小于3的元素过滤掉,只有大于等于3的元素才会继续往下处理,这时使用名为 .filter 的工序:

var numbers = [0,1,2,3,4,5,6]numbers.publisher .filter{ i -> Bool in return i >= 3 }

接着将每一个元素放大到两倍,这时使用名为 .map 的工序:

var numbers = [0,1,2,3,4,5,6]numbers.publisher .filter{ i -> Bool in return i >= 3 } .map { i -> Int in let n = i*2 print("map: \(i) -> \(n)") return n }

然后使用.collect工序收集完所有的元素后重组成一个新的数组:

var numbers = [0,1,2,3,4,5,6]numbers.publisher .filter{ i -> Bool in return i >= 3 } .map { i -> Int in let n = i*2 print("map: \(i) -> \(n)") return n } .collect()

到目前为止,流水线开始一步步成型,但它还没有被激活,暂时不会有任何数据流动,这时我们最后接入名为 .sink 的工序,这将会产生一个Subscriber激活流水线,这也是我们使用最终数据的地方。另外,在实际项目中通常还需要使用一个变量来保存该流水线才可以正常使用。

var storage = Set<AnyCancellable>()var numbers = [0,1,2,3,4,5,6]numbers.publisher .filter{ i -> Bool in return i >= 3 } .map { i -> Int in let n = i*2 print("map: \(i) -> \(n)") return n } .collect() .sink { completion in print(completion) } receiveValue: { array in print("received: \(array)") } .store(in: &storage)
/*输出:map: 3 -> 6map: 4 -> 8map: 5 -> 10map: 6 -> 12received: [6, 8, 10, 12]finished */

这条流水线的示意图如下:

初探响应式编程框架Combine构建的分工协作体系(一)

2.2 异步输出

最适合代表异步输出的莫过于使用URLSession从网络获取数据:

var storage = Set<AnyCancellable>()let url = URL(string:"https://jsonplaceholder.typicode.com/posts/1")!URLSession.shared.dataTaskPublisher(for: url) .map { tuple -> Data in //tuple的类型为(data:Data,response:URLResponse) return tuple.data } .sink { completion in print(completion) } receiveValue: { data in print(data.description) } .store(in: &storage)
/*输出:292 bytesfinished */

这是一个极为简单的流水线,它的事件源是一个异步输出的Publisher,可以看到,无论同步还是异步的输出,在Combine框架里形成了统一无区别的处理方式。

初探响应式编程框架Combine构建的分工协作体系(一)

2.3 元素流动的两个渠道和三种形态

在任何两道工序间流动的元素,有必要明确它可能存在的形态是怎样的。这可以从上文例子中的.sink工序得到提示:

 .sink { completion in print(completion) } receiveValue: { data in print(data.description) }


流水线中元素的流动实际上分为两个渠道,一个渠道用于流水线正常工作阶段的数据流动,另外一个渠道专门用于传递正常结束和异常结束这两个事件。为方便行文,前者称为上渠,后者称为下渠。

于下渠流动的正常结束和异常结束这两个事件,它们都归属于同一个枚举类型Subscribers.Completion,正常结束由case .finished代表,异常结束由case .failure(Failure)代表,其中Failure是采纳Error协议的一个泛型,代表异常情况下所产生的错误。唯有当这两个事件流通于下渠时,才代表着结束事件,而当其流通于上渠时,只能作为普通数据。

于上渠流动的元素可以是各种类型的数据,哪怕这个元素的内容是case .finished或case .failure(Failure),只要它是在上渠内流动,那它就不作为结束事件,而是作为普通的数据。

那么可以根据这个情况将元素分为三种形态:

  • 一是Value,代表正常工作阶段流动的数据,流通于上渠;

  • 二是case .failure(Failure),流通于下渠时,代表异常结束事件的发生;

  • 三是case .finished,流通于下渠时,代表正常结束事件的发生。

2.4 Publisher

了解了元素流动的两个渠道和三种形态,现在我们先来看一下几种Publisher可能的输出形式。

一种情形是Publisher可以输出不限制数量的Value,如Notification center publisher, Timer publisher, Key-Value observing publisher等系统API提供的Publisher,此类Publisher通过上渠往下游一轮又一轮地输出Value,并且处理过程中不会发生异常结束事件,下渠不会有任何输出,会持续运行直到开发者手动解散这条流水线。它的输出形式示意图如下(图中输出元素先后顺序由右到左,下同):

初探响应式编程框架Combine构建的分工协作体系(一)

第二种情形是Publisher需要输出的Value个数有限,并且处理过程中不会发生异常结束事件,如系统API提供的Sequence publisher,此类Publisher通过上渠全部输出完毕后,再通过下渠输出正常结束事件.finished并解散自身。

初探响应式编程框架Combine构建的分工协作体系(一)

第三种情形是,不管Publisher需要输出的Value个数是有限还是无限,Publisher在处理过程中可能发生异常结束事件,如系统API提供的URLSession data task publisher,这时Publisher通过下渠输出异常结束事件 .failure(Failure),放弃后续的元素输出并提前解散自身。

初探响应式编程框架Combine构建的分工协作体系(一)

由此可见,一个Publisher的输出特征是由其输出的Value类型和异常产生的错误类型决定的:

  • 如果其输出Value类型为String(其他类型可类推),不会有输出错误,那么其输出特征记为<String,Never>;

  • 如果其输出Value类型为String(其他类型可类推),可能输出错误类型为URLError(其他错误类型可类推),那么其输出特征记为<String,URLError>;

无论是正常结束还是异常结束,该Publisher一旦通过下渠输出任何一个结束事件,其命运都是解散自身,无法逆转。

2.5 Operator

Combine框架提供了诸多拥有不同特色功能的Operator,可以按照其特色功能是用于处理哪个渠道的元素分为两大类:

  1. 特色功能用于处理上渠流动元素的Operator。

    此类工序的特点是:

    • 当输入元素来自于上渠时,会启用自身特色的处理功能处理该Value。

    • 当输入元素来自于下渠时,无论内容为.failure(Failure)还是.finished,会将该元素继续通过下渠往更下游输出的同时,对自身发出解散指令。

  2. 特色功能用于处理下渠流动元素的Operator。

    此类工序的特点是:

    • 当输入元素来自于下渠且内容为.failure(Failure)时,会启用自身特色的处理功能处理该Failure;

    • 当输入元素来自于下渠且内容为.finished,会将该元素继续通过下渠往更下游输出的同时,对自身发出解散指令;

    • 当输入元素来自于上渠时,仅仅会将该Value继续通过上渠往更下游输出而没有多余的处理动作。

一道工序接收的输入必然是上一道工序的输出,而一个Operator同时拥有Publisher和Subscriber双重身份,那么它便同时拥有输入特征及输出特征,输入的元素经过其处理后可能发生改变,导致输出特征与输入特征的不同。它可以下图的形式体现(图中类型仅为示例,其他类型可类推,后文配图循此例):



因此,流水线有了进一步的演化:


显然,开端事件源Publisher输出的每一个元素,流经不同的工序后很可能以不同的形态存在,但无论其如何变化都只能是以下三种形态中的之一:Value, .failure(Failure), .finished

2.6 Subscriber

Combine框架通过系统API仅提供了两个Subscriber,除了上文例子中的Sink,还有一个是AssignSink的使用方法上文已经熟悉,它是一个通用性强的Subscriber,而Assign则专门用于对一个对象的Property进行赋值,并且它要求其相邻上游工序的输出特性的Failure类型必须为Never。一般来说,这两个Subscriber足够使用,有特殊需求的话开发者也可以定制自己的Subscriber。

2.7 章节小结

现在我们对Combine带来的流水线有了直观的认识,看到这里,有读者也许会有疑问:使用原来熟练的手段也能做同样的事情,大费周章学习了一堆新概念新术语后,用一种不同的方法重新解决一遍问题,好像没有这个必要?答案隐藏在Combine提供的多达数十种具备不同分工处理能力的Operator里,它提供的是成体系的分工协作能力,通过这种小而专的专业化分工,再组合成强健的流水线去解决复杂的问题,正是Combine框架最为值得称道的地方。相信看完接下来的章节,你会得到更加明确的答案。