读书笔记《functional-kotlin》函数式编程与反应式编程
到目前为止,我们在过去八章中取得了不错的进展。您已经了解了函数式编程的概念(FP ) 以及一些很棒的 Kotlin 特性,例如协程和委托,它们并不完全来自 FP 理论(实际上,委托来自 OOP 范式),但它们都使我们能够从 FP 中获得更多好处。
这一简短的章节致力于将其他编程原则/范式与 FP 结合起来,以从中获得最佳输出。以下是我们将在本章中介绍的主题列表:
- Combining FP with OOP
- Functional reactive programming
- Introduction to RxKotlin
那么,让我们开始吧。
FP 和 OOP both 旧式编程范式,各有优缺点。例如,很难严格遵循没有任何副作用和所有纯函数的 FP,特别是对于 FP 的初学者和复杂的项目要求。但是,对于 OOP 系统,很难避免副作用;此外,OOP 系统通常被称为并发程序的噩梦。
FP 不承认状态,而在现实生活中,状态是无法避免的。
所有这些麻烦都可以通过将 OOP 与 FP 结合使用来避免。最普遍的混合 OOP 和 FP 的风格可以概括为小型的功能,大型的面向对象。这是将 OOP 与 FP 相结合的简单且最有效的想法。这个概念讲的是在你的代码中使用更高层次的OOP,即在模块化架构中,你可以将OOP用于类和接口,而在较低层次使用FP,即在编写方法/函数的同时。
要打破这个概念,请考虑一种结构,您可以像通常使用 OOP 一样编写类和接口,然后在编写函数/方法时,遵循使用纯函数、monad 和不变性的 FP 风格。
如本书前面所述,如果您想要混合使用 OOP 和 FP,我们相信 Kotlin 是最好的语言。
函数式反应式编程的概念是通过将 FP 范式与反应式编程相结合而出现的。
函数响应式编程的定义说它是一个编程 使用 FP 构建块的反应式编程(异步数据流编程)范式(例如,map
、reduce
, 和 filter
)。
所以,让我们从定义反应式编程开始,然后我们将讨论将它们与 FP 结合起来。
反应式编程是现代编程范式,它谈到了变化的传播,也就是说,反应式编程不是将世界表示为一系列状态,而是对行为进行建模。
反应式编程是一种异步编程范式,它围绕数据流和变化的传播展开。简而言之,那些传播所有影响其数据/数据流到所有相关方(例如最终用户、组件和子部分以及其他以某种方式相关的程序)称为反应式程序。
反应式编程最好用反应式宣言来定义,如下节所述。
Reactive Manifesto (http ://www.reactivemanifesto.org) 是一个文档,定义了四个响应式原则,其中是如下:
- Responsive: The system responds in a timely manner. Responsive systems focus on providing rapid and consistent response times, so they deliver a consistent quality of service.
- Resilient: In case the system faces any failure, it stays responsive. Resilience is achieved by replication, containment, isolation, and delegation. Failures are contained within each component, isolating components from each other, so when failure occurs in a component, it will not affect the other components or the system as a whole.
- Elastic: Reactive systems can react to changes and stays responsive under varying workloads. Reactive systems achieve elasticity in a cost-effective way on commodity hardware and software platforms.
- Message-driven: In order to establish the resilient principle, reactive systems need to establish a boundary between components by relying on asynchronous message-passing.
通过实现上述所有四个原则,系统变得可靠且响应迅速,因此是反应性的。
RxKotlin 是响应式的特定实现受 FP 影响的 Kotlin 编程。它有利于函数组合、避免全局状态和副作用。它依赖于生产者/消费者的观察者模式,具有许多允许组合、调度、节流、转换、错误处理和生命周期管理的操作符。 ReactiveX 框架得到了一个大型社区和 Netflix 的支持。
Reactor-Kotlin 也是基于 FP 的;它被广泛接受并得到 Spring Framework 的支持。 RxKotlin 和 Reactor-Kotlin 有很多相似之处(可能是因为 Reactive Streams 规范)。
您可以从 GitHub 下载和构建 RxKotlin (https://github.com/ReactiveX/RxKotlin)。它不需要任何其他依赖项。 GitHub Wikipedia 页面上的 documentation 结构良好。以下是从 GitHub 签出项目并运行构建的方法,如下所示:
您还可以按照页面上的说明使用 Maven 和 Gradle。
现在,让我们看一下 RxKotlin 的全部内容。我们将从众所周知的事情开始,然后逐渐了解图书馆的秘密。
RxKotlin 围绕代表Observable类型 > 现实生活中的事件和数据系统,用于推送机制,因此它是惰性的,可以以同步和异步两种方式使用。
如果我们从一个处理数据列表的简单示例开始,我们会更容易理解:
输出如下:
让我们逐行浏览程序以了解它是如何工作的。
在评论 1
处,我们创建了一个包含七个项目的 list
(list
在 Any
类的帮助下包含混合数据类型的数据)。在注释 2
处,我们从 list
创建了 iterator
,这样我们可以遍历数据。在注释 3
中,我们创建了一个 while
循环来从 list
在 iterator
的帮助下,然后在注释 4
处,我们打印了它。
这里要注意的是,我们正在从 list
中提取数据,而当前线程被阻塞,直到数据被接收并准备好。例如,考虑从网络调用/数据库查询中获取数据,而不仅仅是一个列表,在这种情况下,线程将被阻塞多长时间。您显然可以为这些操作创建一个单独的线程,但这也会增加复杂性。
想一想,哪种方法更好,让程序等待数据或在数据可用时将数据推送到程序?
ReactiveX 框架(无论是 RxKotlin 还是 RxJava)的构建块是 observables。 Observable
类与 Iterator 相对。它有一个基础集合或计算,可以生成可供消费者使用的值。但不同之处在于消费者不会像迭代器模式那样从生产者那里提取这些值。相反,生产者将值作为通知推送给消费者。
所以,让我们再次举同样的例子,这次是 observable
:
该程序的输出与上一个相同;它打印列表中的所有项目。不同之处在于它的方法。那么,让我们看看它实际上是如何工作的:
- Created a list (the same as the previous one)
- An
Observable
instance is created by thelist
- We subscribe to the observer (we're using named arguments for lambda; we will cover them in detail later)
当我们订阅 observable
变量时,每个数据都会在准备好时被推送到 onNext
;当所有数据被推送时,它会调用 onComplete
,如果发生任何错误,它会调用 onError
。
因此,您学习了如何使用 Observable
实例,并且它们与我们非常熟悉的 Iterator 实例非常相似。我们可以使用这些 Observable
实例来构建异步流并将数据更新推送给他们的订阅者(甚至是多个订阅者)。这是响应式编程范式的简单实现。数据正在传播给所有相关方——订阅者。
如前所述,在反应式编程中,Observable
有一个基础 计算产生可供消费者使用的值(Observer
)。这里最重要的是消费者(Observer
)不会在这里拉取值;相反,Observable
将值推送给消费者。因此,我们可以说 Observable
interface 是一个基于推送的、可组合的迭代器,它通过一系列操作符将其项目发送到最终的
Observer
subscribes toObservable
Observable
starts emitting the items that it has in itObserver
reacts to whatever item theObservable
emits
那么,让我们深入研究一下Observable
是如何通过它的事件/方法工作的,即onNext
, onComplete
和 onError
。
正如我们之前所说,一个 Observable
值具有以下三个最important事件/方法:
onNext
: TheObservable
interface passes all the items one by one to this methodonComplete
: When all the items have gone through theonNext
method, theObservable
calls theonComplete
methodonError
: When theObservable
faces any error, it calls theonError
method to deal with the error, if defined
这里需要注意的一点是,我们所说的 Observable
中的 item 可以是任何东西;它被定义为 Observable<T>
,其中 T
可以是任何类。甚至可以将数组/列表分配为 Observable
。
我们来看下图:
这是一个代码示例,可以更好地理解它:
在前面的示例中,我们声明了 Any
数据类型的观察者实例在评论 1
。
Observer
接口声明了四个方法。注释 2
中的 onComplete()
方法在 Observable
被调用时被调用完成所有项目,没有任何错误。在注释 3
处,我们定义了 onNext(item: Any)
函数,该函数将被 可观察
值 对于它必须发出的每个项目。在该方法中,我们将数据打印到控制台。在注释 4
处,我们定义了 onError(e: Throwable)
方法,在 Observable
接口面临错误。在注释 5
处,onSubscribe(d: Disposable)
方法将在 Observer 时被调用
订阅 Observable
。在评论 6
中,我们从一个列表 (val observable
) 并订阅 observable
value 并在评论 7
。在注释
8
处,我们再次创建了 observable
(val observableOnList
),它将列表作为项目保存。
程序的输出如下:
因此,正如您在输出中看到的那样,对于第一次订阅(注释 7
),当我们订阅 observable
;value,它调用 onSubscribe
方法,然后 Observable
property 开始发射项目,如 observer
开始在 onNext
方法上接收它们并打印它们。当Observable
属性发出所有item时,调用onComplete
方法表示所有item都发出成功.和第二个一样,只是这里每一项都是一个列表。
随着我们对 Observables
有了一些了解,您现在可以学习一些方法来为 Observable
创建 Observable 工厂方法。
在任何时候,您都可以使用Observable.create
方法。此方法将 ObservableEmitter<T>
接口的实例作为观察源。看看下面的代码示例:
首先,我们创建了一个 Observer
接口的实例作为前面的示例。我不会详细说明 observer
值,因为我们已经在前面的示例中看到了概述,本章稍后会详细介绍。在评论 1
处,我们使用 Observable.create
Observable
值代码>方法。我们在 onNext
方法的帮助下从 Observable
值发出了四个字符串,然后用 onComplete
方法。在注释 2
处,我们几乎做了同样的事情,除了这里,我们没有调用 onComplete
,而是调用了 onError
带有自定义 Exception
函数。
这是程序的输出:
Observable.create
方法很有用,尤其是当您使用 working自定义数据结构,并希望控制发出哪些值。您还可以从 不同的 线程向观察者发出值。
Note
Observable contract (http://reactivex.io/documentation/contract.html) 声明 Observables 必须连续(而不是并行)向观察者发出通知.他们可能会从不同的线程发出这些通知,但通知之间必须有正式的先发生关系。
Observable.from
方法相对比Observable.create
方法。您可以借助 from 方法从几乎所有 Kotlin 结构中创建 Observable
实例。
Note
在 RxKotlin 1 中,您将拥有 Observale.from
作为方法;但是,从 RxKotlin 2.0(与 RxJava2.0 一样)开始,运算符重载已使用后缀重命名,例如 fromArray
、fromIterable
和 fromFuture
。
我们来看看下面的代码:
在注释 1
中,我们使用 Observable.fromIterable
方法来创建 Observable
来自 Iterable
实例(此处为 list
)。在评论 2
。我们调用 Observable.fromCallable
方法从 Callable
实例创建 Observable
,我们在评论 3
中做了同样的事情,我们在其中 称为 code class="literal">Observable.fromFuture 方法从 Future
实例派生 Observable
。
这是输出:
感谢 Kotlin 的扩展 functions,你可以转任何 Iterable
实例,如 list
,到Observable
不费力气。我们已经在 第1章中使用过这种方法, ;Kotlin – 数据类型、对象和类,但再看看这个:
输出如下:
那么,您是不是很想研究 toObservable
方法?我们开始做吧。您可以在 RxKotlin
包提供的 observable.kt
文件中找到此方法:
因此,它在内部使用了 Observable.from
方法,再次感谢 Kotlin 的扩展功能。
在 RxKotlin 1.x 中,Subscriber
运算符 essentially 变成了 Observer
输入 RxKotlin 2.x。 RxKotlin 1.x 中有一个 Observer
类型,但是 Subscriber
值是你传递给 subscribe()
方法,它实现了Observer
。在 RxJava 2.x 中,Subscriber
运算符仅在谈论 Flowables
时存在。
正如你在本章前面的examples中看到的,一个Observer
type是一个接口,里面有四个方法,分别是onNext(item:T)
, onError(error:Throwable)
、onComplete()
和 onSubscribe(d:Disposable)
。如前所述,当我们将 Observable
连接到Observer
时,它会在观察者
输入并调用它们。以下是对以下四种方法的简要说明:
onNext
: TheObservable
calls this method ofObserver
to pass each of the items one by oneonComplete
: When theObservable
wants to denote that it's done with passing items to theonNext
method, it calls theonComplete
method ofObserver
onError
: WhenObservable
faces any error, it calls theonError
method to deal with the error if defined in theObserver
type, otherwise it throws the exceptiononSubscribe
: This method is called whenever a newObservable
subscribes toObserver
所以,我们有 Observable
(应该被观察的东西)和我们有 Observer
type (应该是观察到),现在呢?我们如何连接它们? Observable
和 Observer
就像输入设备(无论是键盘还是鼠标)和计算机;我们需要一些东西来连接它们(即使是无线输入设备也有一些连接通道,无论是蓝牙还是 Wi-Fi)。
subscribe
运算符通过 connecting 服务于媒体的 目的 Observable
Observer的接口
。我们可以传递一到三个方法(
onNext
、onComplete
和 onError
) 到 subscribe
运算符,或者我们可以将 Observer
接口的实例传递给 subscribe 运算符以获取 Observable
接口与 Observer
连接。
所以,现在让我们看一个例子:
在这个例子中,我们创建了一个 Observable
实例(在注释 1
处)并使用了两次不同的重载 订阅
运算符。在注释 2
中,我们将三个方法作为参数传递给 subscribe
方法。第一个参数是onNext
方法,第二个是onError
方法,最后一个是onComplete
。在注释 2
处,我们传递了一个 Observer
接口的实例。
输出很容易预测,所以我们跳过它。
所以,我们有了订阅的概念,现在可以做。如果您想在订阅一段时间后停止排放怎么办?一定有办法吧?所以,让我们检查一下。
还记得 Observer
的 onSubscribe
方法吗?该方法有一个我们尚未讨论的参数。订阅时,如果您传递方法而不是 Observer
实例,则 subscribe
运算符将返回 Disposable
,或者如果你使用Observer
的实例,那么你会得到Disposable<的实例/code> 在
onSubscribe
方法的参数中。
您可以使用 Disposable
接口的实例在任何给定时间停止排放。让我们看一个例子:
在这里,我们使用了 Observable.interval
工厂方法。该方法采用两个参数描述间隔周期和时间单位;然后它从零开始依次发出整数。使用 interval
创建的 Observable
永远不会完成,也永远不会停止,直到您处理掉它们,或者程序停止执行。我认为它非常适合这种情况,因为我们想在这里中途停止 Observable
。
因此,在此示例中,在注释 1
处,我们创建了 Observable
和 Observable。 interval
工厂方法,将在每个 100
毫秒间隔后发出一个整数。
在评论 2
处,我已声明 lateinit var弃用
的 Disposable
type (lateinit
表示变量将在稍后的时间点初始化)。在注释 3
中,在 onSubscribe
方法中,我们将接收到的参数值赋给 一次性
变量。
我们打算在序列到达10
后停止执行,即10
发出后,立即停止发出.为了实现这一点,我们在 onNext
方法中放置了一个检查,我们正在检查发射项目的值。我们检查它是否等于或大于10
,如果发射还没有停止(disposed),那么我们就dispose(comment 5
)。
这是输出:
从输出中,我们可以看到调用 disposable.dispose()
方法后没有发出整数,尽管执行等待了 500 毫秒以上(100*10 = 1000 毫秒打印序列直到 10
,我们用 1500
调用延迟方法,因此发出 10
)。
如果你想知道 Disposable
接口,那么定义如下:
它有一个属性表示发射已被通知停止(处置)和一个方法通知发射停止(处置)。