vlambda博客
学习文章列表

读书笔记《functional-kotlin》不变性--它很重要

第 9 章函数式编程和反应式编程

到目前为止,我们在过去八章中取得了不错的进展。您已经了解了函数式编程的概念(FP ) 以及一些很棒的 Kotlin 特性,例如协程和委托,它们并不完全来自 FP 理论(实际上,委托来自 OOP 范式),但它们都使我们能够从 FP 中获得更多好处。

这一简短的章节致力于将其他编程原则/范式与 FP 结合起来,以从中获得最佳输出。以下是我们将在本章中介绍的主题列表:

  • 将 FP 与 OOP 相结合
  • 函数式反应式编程
  • RxKotlin 简介

那么,让我们开始吧。

将 FP 与 OOP 相结合


FP 和 OOP both 旧式编程范式,各有优缺点。例如,很难严格遵循没有任何副作用和所有纯函数的 FP,特别是对于 FP 的初学者和复杂的项目要求。但是,对于 OOP 系统,很难避免副作用;此外,OOP 系统通常被称为并发程序的噩梦。

FP 不承认状态,而在现实生活中,状态是无法避免的。

所有这些麻烦都可以通过将 OOP 与 FP 结合使用来避免。最普遍的混合 OOP 和 FP 的风格可以概括为小型的功能,大型的面向对象。这是将 OOP 与 FP 相结合的简单且最有效的想法。这个概念讲的是在你的代码中使用更高层次的OOP,即在模块化架构中,你可以将OOP用于类和接口,而在较低层次使用FP,即在编写方法/函数的同时。

要打破这个概念,请考虑一种结构,您可以像通常使用 OOP 一样编写类和接口,然后在编写函数/方法时,遵循使用纯函数、monad 和不变性的 FP 风格。

如本书前面所述,如果您想要混合使用 OOP 和 FP,我们相信 Kotlin 是最好的语言。

函数式反应式编程


函数式反应式编程的概念是通过将 FP 范式与反应式编程相结合而出现的。

函数响应式编程的定义说它是一个编程 使用 FP 构建块的反应式编程(异步数据流编程)范式(例如,mapreduce , 和 filter)。

所以,让我们从定义反应式编程开始,然后我们将讨论将它们与 FP 结合起来。

反应式编程是现代编程范式,它谈到了变化的传播,也就是说,反应式编程不是将世界表示为一系列状态,而是对行为进行建模。

反应式编程是一种异步编程范式,它围绕数据流和变化的传播展开。简而言之,那些传播所有影响其数据/数据流到所有相关方(例如最终用户、组件和子部分以及其他以某种方式相关的程序)称为反应式程序

反应式编程最好用反应式宣言来定义,如下节所述。

反应式宣言

Reactive Manifesto (http ://www.reactivemanifesto.org) 是一个文档,定义了四个响应式原则,其中如下:

  • 响应式:系统及时 方式。响应式系统专注于提供快速和一致的响应时间,因此它们提供一致的服务质量。
  • 弹性:如果系统面临任何故障,它会保持响应。弹性是通过复制、遏制、隔离和委派实现的。故障包含在每个组件中,将组件相互隔离,因此当一个组件发生故障时,不会影响其他组件或整个系统。
  • 弹性:反应式系统可以对变化做出反应并保持< /a> 在不同的工作负载下响应。反应式系统在商品硬件和软件平台上以具有成本效益的方式实现弹性。
  • 消息驱动:为了建立弹性原则,响应式systems 需要依靠异步消息传递来建立组件之间的边界。

通过实现上述所有四个原则,系统变得可靠且响应迅速,因此是反应性的。

Kotlin 的函数式反应框架

为了编写响应式程序,我们需要一个库。有几个 reactive 编程 libraries 让 Kotlin 在这些方面为我们提供帮助。以下是可用库的列表:

  • RxKotlin
  • Reactor-Kotlin
  • Redux-Kotlin
  • RxKotlin/RxJava 和其他 Reactive Java (ReactiveX) 框架也可以与 Kotlin 一起使用(因为 Kotlin 与 Java 双向 100% 可互操作)

在本书中,我们将重点关注 RxKotlin。

RxKotlin 入门


RxKotlin 是响应式的特定实现受 FP 影响的 Kotlin 编程。它有利于函数组合、避免全局状态和副作用。它依赖于生产者/消费者的观察者模式,具有许多允许组合、调度、节流、转换、错误处理和生命周期管理的操作符。 ReactiveX 框架得到了一个大型社区和 Netflix 的支持。

Reactor-Kotlin 也是基于 FP 的;它被广泛接受并得到 Spring Framework 的支持。 RxKotlin 和 Reactor-Kotlin 有很多相似之处(可能是因为 Reactive Streams 规范)。

下载和设置 RxKotlin

您可以从 GitHub 下载和构建 RxKotlin (https://github.com/ReactiveX/RxKotlin)。它不需要任何其他依赖项。 GitHub Wikipedia 页面上的 documentation 结构良好。以下是从 GitHub 签出项目并运行构建的方法,如下所示:

$ git clone https://github.com/ReactiveX/RxKotlin.git$ cd RxKotlin/$ ./gradlew build

您还可以按照页面上的说明使用 Maven 和 Gradle。

笔记

本书使用的是 RxKotlin 2.2.0 版。

现在,让我们看一下 RxKotlin 的全部内容。我们将从众所周知的事情开始,然后逐渐了解图书馆的秘密。

比较 Pull 机制与 RxJava Push 机制

RxKotlin 围绕代表Observable类型 > 现实生活中的事件和数据系统,用于推送机制,因此它是惰性的,可以以同步和异步两种方式使用。

如果我们从一个处理数据列表的简单示例开始,我们会更容易理解:

fun main(args: Array<String>) { 
    var list:List<Any> = listOf(1, "Two", 3, "Four", "Five", 5.5f) // 1 
    var iterator = list.iterator() // 2 
    while (iterator.hasNext()) { // 3 
        println(iterator.next()) // Prints each element 4 
    } 
} 

输出如下:

读书笔记《functional-kotlin》不变性--它很重要

让我们逐行浏览程序以了解它是如何工作的。

在评论 1 处,我们创建了一个包含七个项目的 listlistAny 类的帮助下包含混合数据类型的数据)。在注释 2 处,我们从 list 创建了 iterator,这样我们可以遍历数据。在注释 3 中,我们创建了一个 while 循环来从 listiterator 的帮助下,然后在注释 4 处,我们打印了它。

这里要注意的是,我们正在从 list 中提取数据,而当前线程被阻塞,直到数据被接收并准备好。例如,考虑从网络调用/数据库查询中获取数据,而不仅仅是一个列表,在这种情况下,线程将被阻塞多长时间。您显然可以为这些操作创建一个单独的线程,但这也会增加复杂性。

想一想,哪种方法更好,让程序等待数据或在数据可用时将数据推送到程序?

ReactiveX 框架(无论是 RxKotlin 还是 RxJava)的构建块是 observables。 Observable 类与 Iterator 相对。它有一个基础集合或计算,可以生成可供消费者使用的值。但不同之处在于消费者不会像迭代器模式那样从生产者那里提取这些值。相反,生产者将值作为通知推送给消费者。

所以,让我们再次举同样的例子,这次是 observable

fun main(args: Array<String>) { 
    var list = listOf(1, "Two", 3, "Four", "Five", 5.5f) // 1 
    var observable = list.toObservable(); 
 
    observable.subscribeBy(  // named arguments for lambda Subscribers 
            onNext = { println(it) }, 
            onError =  { it.printStackTrace() }, 
            onComplete = { println("Done!") } 
    ) 
} 

该程序的输出与上一个相同;它打印列表中的所有项目。不同之处在于它的方法。那么,让我们看看它实际上是如何工作的:

  1. 创建了一个列表(与上一个相同)
  2. Observable 实例由 list 创建
  3. 我们订阅观察者(我们为 lambda 使用命名参数;稍后我们将详细介绍它们)

当我们订阅 observable 变量时,每个数据都会在准备好时被推送到 onNext;当所有数据被推送时,它会调用 onComplete,如果发生任何错误,它会调用 onError

因此,您学习了如何使用 Observable 实例,并且它们与我们非常熟悉的 Iterator 实例非常相似。我们可以使用这些 Observable 实例来构建异步流并将数据更新推送给他们的订阅者(甚至是多个订阅者)。这是响应式编程范式的简单实现。数据正在传播给所有相关方——订阅者。

可观察的


正如我们之前讨论的,在反应式编程中,Observable 有一个基础 计算产生可供消费者使用的值(Observer)。这里最重要的是消费者(Observer)不会在这里拉取值;相反,Observable 将值推送给消费者。所以,我们可以说 Observable interface 是一个基于推送的、可组合的迭代器,它通过一系列操作符将其项目发送到最终的 Observer,最终消费项目。现在让我们按顺序分解这些内容以更好地理解它:

  • Observer 订阅 Observable
  • Observable 开始发射其中包含的项目
  • ObserverObservable 发出的任何项目作出反应

那么,让我们深入研究一下Observable是如何通过它的事件/方法工作的,即onNext, onCompleteonError

可观察的工作原理

正如我们之前所说,一个 Observable 值具有以下三个最important事件/方法:

  • onNextObservable 接口将所有项目一一传递给该方法
  • onComplete:当所有的item都经过onNext方法后,Observable 调用 onComplete 方法
  • onError:当Observable遇到任何错误时,调用onError方法处理错误(如果已定义)

这里需要注意的一点是,我们所说的 Observable 中的 item 可以是任何东西;它被定义为 Observable<T>,其中 T 可以是任何类。甚至可以将数组/列表分配为 Observable

我们来看下图:

读书笔记《functional-kotlin》不变性--它很重要

这是一个代码示例,可以更好地理解它:

fun main(args: Array<String>) { 
 
    val observer = object :Observer<Any>{//1 
    override fun onComplete() {//2 
        println("All Completed") 
    } 
 
        override fun onNext(item: Any) {//3 
            println("Next $item") 
        } 
 
        override fun onError(e: Throwable) {//4 
            println("Error Occured $e") 
        } 
 
        override fun onSubscribe(d: Disposable) {//5 
            println("Subscribed to $d") 
        } 
    } 
 
    val observable = listOf(1, "Two", 3, "Four", "Five", 5.5f).toObservable() //6 
 
    observable.subscribe(observer)//7 
 
    val observableOnList = Observable.just(listOf("One", 2, "Three", "Four", 4.5, "Five", 6.0f), 
            listOf("List with 1 Item"), 
            listOf(1,2,3))//8 
 
 
 
    observableOnList.subscribe(observer)//9 
} 

在前面的示例中,我们声明了 Any 数据类型的观察者实例在评论 1

笔记

在这里,我们利用了 Any 数据类型。在 Kotlin 中,每个类都是 Any 的子类。此外,在 Kotlin 中,一切都是类和对象;没有单独的原始数据类型。

Observer 接口声明了四个方法。注释 2 中的 onComplete() 方法在 Observable 被调用时被调用完成所有项目,没有任何错误。在注释 3 处,我们定义了 onNext(item: Any) 函数,该函数将被 可观察值 对于它必须发出的每个项目。在该方法中,我们将数据打印到控制台。在注释 4 处,我们定义了 onError(e: Throwable) 方法,在 Observable 接口面临错误。在注释 5 处,onSubscribe(d: Disposable) 方法将在 Observer 时被调用 订阅 Observable。在评论 6 中,我们从一个列表 (val observable ) 并订阅 observable value 并在评论 7 。在注释 8 处,我们再次创建了 observable (val observableOnList),它将列表作为项目保存。

程序的输出如下:

读书笔记《functional-kotlin》不变性--它很重要

因此,正如您在输出中看到的那样,对于第一次订阅(注释 7),当我们订阅 observable  ;value,它调用 onSubscribe 方法,然后 Observable property 开始发射项目,如 observer 开始在 onNext 方法上接收它们并打印它们。当Observable属性发出所有item时,调用onComplete方法表示所有item都发出成功.和第二个一样,只是这里每一项都是一个列表。

随着我们对 Observables 有了一些了解,您现在可以学习一些方法来为 Observable 创建 Observable 工厂方法。

Observable.create 方法


在任何时候,您都可以使用Observable.create 方法。此方法将 ObservableEmitter<T> 接口的实例作为观察源。看看下面的代码示例:

fun main(args: Array<String>) { 
 
    val observer: Observer<String> = object : Observer<String> { 
        override fun onComplete() { 
            println("All Completed") 
        } 
 
        override fun onNext(item: String) { 
            println("Next $item") 
        } 
 
        override fun onError(e: Throwable) { 
            println("Error Occured => ${e.message}") 
        } 
 
        override fun onSubscribe(d: Disposable) { 
            println("New Subscription ") 
        } 
    }//Create Observer 
 
    val observable:Observable<String> = Observable.create<String> {//1 
        it.onNext("Emitted 1") 
        it.onNext("Emitted 2") 
        it.onNext("Emitted 3") 
        it.onNext("Emitted 4") 
        it.onComplete() 
    } 
 
    observable.subscribe(observer) 
 
    val observable2:Observable<String> = Observable.create<String> {//2 
        it.onNext("Emitted 1") 
        it.onNext("Emitted 2") 
        it.onError(Exception("My Exception")) 
    } 
 
    observable2.subscribe(observer) 
} 

首先,我们创建了一个 Observer 接口的实例作为前面的示例。我不会详细说明 observer 值,因为我们已经在前面的示例中看到了概述,本章稍后会详细介绍。在评论 1 处,我们使用 Observable.create Observable 值代码>方法。我们在 onNext 方法的帮助下从 Observable 值发出了四个字符串,然后用 onComplete 方法。在注释 2 处,我们几乎做了同样的事情,除了这里,我们没有调用 onComplete,而是调用了 onError 带有自定义 Exception 函数。

这是程序的输出:

读书笔记《functional-kotlin》不变性--它很重要

Observable.create 方法很有用,尤其是当您使用 working自定义数据结构,并希望控制发出哪些值。您还可以从 不同的 线程向观察者发出值。

笔记

Observable contract (http://reactivex.io/documentation/contract.html) 声明 Observables 必须连续(而不是并行)向观察者发出通知.他们可能会从不同的线程发出这些通知,但通知之间必须有正式的先发生关系。

Observable.from 方法


Observable.from 方法相对Observable.create 方法。您可以借助 from 方法从几乎所有 Kotlin 结构中创建 Observable 实例。

笔记

在 RxKotlin 1 中,您将拥有 Observale.from 作为方法;但是,从 RxKotlin 2.0(与 RxJava2.0 一样)开始,运算符重载已使用后缀重命名,例如 fromArrayfromIterablefromFuture

我们来看看下面的代码:

fun main(args: Array<String>) { 
 
    val observer: Observer<String> = object : Observer<String> { 
        override fun onComplete() { 
            println("Completed") 
        } 
 
        override fun onNext(item: String) { 
            println("Received-> $item") 
        } 
 
        override fun onError(e: Throwable) { 
            println("Error Occured => ${e.message}") 
        } 
 
        override fun onSubscribe(d: Disposable) { 
            println("Subscription") 
        } 
    }//Create Observer 
 
    val list = listOf("Str 1","Str 2","Str 3","Str 4") 
    val observableFromIterable: Observable<String> = Observable.fromIterable(list)//1 
    observableFromIterable.subscribe(observer) 
 
 
    val callable = object : Callable<String> { 
        override fun call(): String { 
            return "I'm From Callable" 
        } 
 
    } 
    val observableFromCallable:Observable<String> = Observable.fromCallable(callable)//2 
    observableFromCallable.subscribe(observer) 
 
    val future:Future<String> = object : Future<String> { 
        val retStr = "I'm from Future" 
 
        override fun get() = retStr 
 
        override fun get(timeout: Long, unit: TimeUnit?)  = retStr 
 
        override fun isDone(): Boolean = true 
 
        override fun isCancelled(): Boolean = false 
 
        override fun cancel(mayInterruptIfRunning: Boolean): Boolean = false 
 
    } 
    val observableFromFuture:Observable<String> = Observable.fromFuture(future)//3 
    observableFromFuture.subscribe(observer) 
} 

在注释 1 中,我们使用 Observable.fromIterable 方法来创建 Observable来自 Iterable 实例(此处为 list)。在评论 2。我们调用 Observable.fromCallable 方法从 Callable 实例创建 Observable ,我们在评论 3 中做了同样的事情,我们在其中 称为 code class="literal">Observable.fromFuture 方法从 Future 实例派生 Observable

这是输出:

读书笔记《functional-kotlin》不变性--它很重要

迭代器 .toObservable


感谢 Kotlin 的扩展 functions,你可以转任何 Iterable 实例,如 list,到Observable 不费吹灰之力。我们已经在 第1章中使用过这个方法, Kotlin – 数据类型、对象和类,但再看看这个:

fun main(args: Array<String>) { 
    val observer: Observer<String> = object : Observer<String> { 
        override fun onComplete() { 
            println("Completed") 
        } 
 
        override fun onNext(item: String) { 
            println("Received-> $item") 
        } 
 
        override fun onError(e: Throwable) { 
            println("Error Occured => ${e.message}") 
        } 
 
        override fun onSubscribe(d: Disposable) { 
            println("Subscription") 
        } 
    }//Create Observer 
    val list:List<String> = listOf("Str 1","Str 2","Str 3","Str 4") 
 
    val observable: Observable<String> = list.toObservable() 
 
    observable.subscribe(observer) 
 
} 

输出如下:

读书笔记《functional-kotlin》不变性--它很重要

那么,您是不是很想研究 toObservable 方法?我们开始做吧。您可以在 RxKotlin 包提供的 observable.kt 文件中找到此方法:

fun <T : Any> Iterator<T>.toObservable(): Observable<T> = toIterable().toObservable() 
fun <T : Any> Iterable<T>.toObservable(): Observable<T> = Observable.fromIterable(this) 
fun <T : Any> Sequence<T>.toObservable(): Observable<T> = asIterable().toObservable() 
 
fun <T : Any> Iterable<Observable<out T>>.merge(): Observable<T> = Observable.merge(this.toObservable()) 
fun <T : Any> Iterable<Observable<out T>>.mergeDelayError(): Observable<T> = Observable.mergeDelayError(this.toObservable()) 

因此,它在内部使用了 Observable.from 方法,再次感谢 Kotlin 的扩展功能。

订阅者——观察者界面


在 RxKotlin 1.x 中,Subscriber 运算符 essentially 变成了 Observer 输入 RxKotlin 2.x。 RxKotlin 1.x 中有一个 Observer 类型,但是 Subscriber 值是你传递给 subscribe() 方法,它实现了Observer。在 RxJava 2.x 中,Subscriber 运算符仅在谈论 Flowables 时存在。 

正如你在本章前面的examples中看到的,一个Observer  type是一个接口,里面有四个方法,分别是onNext(item:T), onError(error:Throwable)onComplete()onSubscribe(d:Disposable)。如前所述,当我们将 Observable 连接到Observer时,它会在观察者 输入并调用它们。以下是对以下四种方法的简要说明:

  • onNextObservable调用Observer的这个方法来传递每一个一项一项
  • onComplete:当 Observable 想要表示已完成将项目传递给 onNext 方法,它调用ObserveronComplete方法
  • onError:当Observable遇到任何错误时,调用onError方法如果在 Observer  类型中定义,则处理错误,否则抛出异常
  • onSubscribe:每当有新的 Observable 订阅 Observer 时都会调用此方法

订阅和处置


所以,我们有 Observable (应该被观察的东西)和我们有 Observer type (应该是观察到),现在呢?我们如何连接它们? ObservableObserver 就像输入设备(无论是键盘还是鼠标)和计算机;我们需要一些东西来连接它们(即使是无线输入设备也有一些连接通道,无论是蓝牙还是 W​​i-Fi)。

subscribe 运算符通过 connecting 服务于媒体的 目的 Observable Observer的接口 。我们可以传递一到三个方法(onNextonCompleteonError ) 到 subscribe 运算符,或者我们可以将 Observer 接口的实例传递给 subscribe 运算符以获取 Observable 接口与 Observer连接。

所以,现在让我们看一个例子:

fun main(args: Array<String>) { 
    val observable = Observable.range(1,5)//1 
 
    observable.subscribe({//2 
        //onNext method 
        println("Next-> $it") 
    },{ 
        //onError Method 
        println("Error=> ${it.message}") 
    },{ 
        //onComplete Method 
        println("Done") 
    }) 
 
    val observer: Observer<Int> = object : Observer<Int> {//3 
    override fun onComplete() { 
        println("All Completed") 
    } 
 
        override fun onNext(item: Int) { 
            println("Next-> $item") 
        } 
 
        override fun onError(e: Throwable) { 
            println("Error Occurred=> ${e.message}") 
        } 
 
        override fun onSubscribe(d: Disposable) { 
            println("New Subscription ") 
        } 
    } 
 
    observable.subscribe(observer) 
} 

在这个例子中,我们创建了一个 Observable 实例(在注释 1 处)并使用了两次不同的重载 订阅 运算符。在注释 2 中,我们将三个方法作为参数传递给 subscribe 方法。第一个参数是onNext方法,第二个是onError方法,最后一个是onComplete。在注释 2 处,我们传递了一个 Observer 接口的实例。

输出很容易预测,所以我们跳过它。

所以,我们有了订阅的概念,现在可以做。如果您想在订阅一段时间后停止排放怎么办?一定有办法吧?所以,让我们检查一下。

还记得 ObserveronSubscribe 方法吗?该方法有一个我们尚未讨论的参数。订阅时,如果您传递方法而不是 Observer 实例,则 subscribe 运算符将返回 Disposable,或者如果你使用Observer的实例,那么你会得到Disposable<的实例/code> 在 onSubscribe 方法的参数中。

您可以使用 Disposable 接口的实例在任何给定时间停止排放。让我们看一个例子:

fun main(args: Array<String>) { 
 
    val observale = Observable.interval(100, TimeUnit.MILLISECONDS)//1 
    val observer = object : Observer<Long> { 
 
        lateinit var disposable: Disposable//2 
 
        override fun onSubscribe(d: Disposable) { 
            disposable = d//3 
        } 
 
        override fun onNext(item: Long) { 
            println("Received $item") 
            if (item >= 10 && !disposable.isDisposed) {//4 
                disposable.dispose()//5 
                println("Disposed") 
            } 
        } 
 
        override fun onError(e: Throwable) { 
            println("Error ${e.message}") 
        } 
 
        override fun onComplete() { 
            println("Complete") 
        } 
 
    } 
    runBlocking { 
        observale.subscribe(observer) 
        delay(1500)//6 
    } 
} 

在这里,我们使用了 Observable.interval 工厂方法。该方法采用两个参数描述间隔周期和时间单位;然后它从零开始依次发出整数。使用 interval 创建的 Observable 永远不会完成,也永远不会停止,直到您处理掉它们,或者程序停止执行。我认为它非常适合这种情况,因为我们想在这里中途停止 Observable

因此,在此示例中,在注释 1 处,我们创建了 ObservableObservable。 interval 工厂方法,将在每个 100 毫秒间隔后发出一个整数。

在评论 2 处,我已声明 lateinit var弃用的 Disposable type (lateinit 表示变量将在稍后的时间点初始化)。在注释 3 中,在 onSubscribe 方法中,我们将接收到的参数值赋给 一次性变量。

我们打算在序列到达10后停止执行,即10发出后,立即停止发出.为了实现这一点,我们在 onNext 方法中放置了一个检查,我们正在检查发射项目的值。我们检查它是否等于或大于10,如果发射还没有停止(disposed),那么我们就dispose(comment 5 )。

这是输出:

读书笔记《functional-kotlin》不变性--它很重要

从输出中,我们可以看到调用 disposable.dispose() 方法后没有发出整数,尽管执行等待了 500 毫秒以上(100*10 = 1000 毫秒将序列打印到 10,我们使用 1500 调用延迟方法,因此发出 10)。

如果你想知道 Disposable 接口,那么定义如下:

interface Disposable { 
  /** 
 * Dispose the resource, the operation should be idempotent. 
 */ 
  fun dispose() 
  /** 
 * Returns true if this resource has been disposed. 
 * @return true if this resource has been disposed 
 */ 
  val isDisposed:Boolean 
} 

它有一个属性表示发射已被通知停止(处置)和一个方法通知发射停止(处置)。

概括


在本章中,您学习了将 FP 概念与 OOP 和反应式编程相结合。我们甚至讨论了 RxKotlin 并介绍了 RxKotlin 的设置和基本用法。

下一章是关于更高级的 FP 概念——单子、函子和应用程序,以及如何用 Kotlin 实现它们。单子、函子和应用程序是一些必须了解的概念,通常被称为 FP 的构建块。因此,如果您真的愿意学习 FP,请不要跳过下一章。现在翻页。