vlambda博客
学习文章列表

Dart 中的多线程 与 Future






1:Dart中的异步执行

1.1:在开始之前,我们先要了解什么是阻塞与非阻塞?

阻塞:阻塞的系统调用是指,当进行系统调用时,除非出错(被信号打断也视为出错),进程将会一直陷入内核态直到调用完成。

非阻塞:非阻塞的系统调用是指无论I/O操作成功与否,调用都会立即返回。

在《Unix网络编程》一书中提到了五种IO模型,分别是:阻塞IO、非阻塞IO、多路复用IO、信号驱动IO以及异步IO。

关于以上五种IO模型的介绍

1.2:Dart中是如何处理阻塞的

之前有 android 和 iOS 开发经验的小伙伴都知道什么是异步执行;在移动端都存在多线程这个概念,当我们需要处理一段耗时的代码,就需要开启一个子线程去执行这段代码,防止阻塞主线程,导致界面卡顿。
如果是前端开发的小伙伴就知道,在 
js 中,是没有多线程这个概念的,当我们需要去处理耗时操作,我们可以这样写:

new Promise((resolve, reject) =>{    // 网络请求等耗时操作
}).then((success) => { //成功   console.log(success);
}).catch((error) => { // 失败   console.log(error);
});

通过 Promise 对象我们可以异步处理一段阻塞的代码。
与 
js 类似,Dart 也是单线程编程语言,也存在一个特定的方式去处理需要异步执行的代码,如下:

void fut0() {
 final fut0 = Future(() {    // 网络请求等耗时操作
 }).then((value) => {

 });
}

到这里为止,我们知道在Dart中可以使用Future 对象来处理阻塞的代码。但是具体是如何处理的?

我们知道在操作系统中,提供很多底层的I/O接口,这些接口模型可能是阻塞或者是非阻塞,亦或者是1.1中提到的其他几种模型。

对于不同的I/O模型,Dart 提供了两种不同的处理方式。
在这里,我们先提出以下三种情况,通过对每个情况的分析,来了解异步处理的真正方式

  1. 第一种方式:如果我们调用的方法,在操作系统中存在异步I/O ,则直接交给系统去执行;比如说,网络请求,Socket 本身提供了 select 模型可以异步查询;而文件IO,操作系统也提供了基于事件的回调机制,我们只需要监听到系统执行的回调就行了。
    类似下面的例子:

void fut0() {
 final fut0 = Future(() {    // 网络请求等耗时操作
   // socket 本身提供了异步查询
 }).then((value) => {

 });  
}

  1. 第二种方式:不是所有的方法系统都有异步I/O,比如我们需要执行数据解析或者复杂的运算,这可能也很耗时;但是这些系统并没有提供异步I/0的操作,我们没办法直接交给系统去执行。那Dart是怎么将这些耗时的操作去异步执行的?

  2. 第三种方式:针对于第二种方式,并且我希望这是一个并发执行的操作

1.3:针对第二种方式处理 -> Dart 中的事件循环与消息队列

  1. 事件循环 (Event Loop):在Dart中,存在isolate的概念,通常我们的代码都运行在一个isolate的独立环境中,在isolate环境中,存在一个事件循环,它在不停干一件事情:从事件队列中取出事件并处理它。

while(true) {   event = event_queue.first() //取出事件
  handleEvent(event) //处理事件
  drop(event) //从队列中移除}

类似于iOS的runLoopandroid的Looper,当我们点击屏幕,或者网络请求时,都会产生一个event事件,这些事件会依次进入事件循环中被处理。

在 一个isolate环境中,除了事件循环 (Event Loop),还存在两个消息队列。

  1. event queue事件队列:用来处理外部的事件,如果IO、点击、绘制、计时器(timer)和不同 isolate 之间的消息事件等。

  2. microtask queue微任务队列:处理来自于Dart内部的任务,适合用来不会特别耗时或紧急的任务,微任务队列的处理优先级比事件队列的高,如果微任务处理比较耗时,会导致事件堆积,应用响应缓慢。



import 'dart:async';

main() {    // 默认添加到事件队列  new Future(() => print('beautiful'));  // 收到提交到微任务队列
 Future.microtask(() => print('hi'));
}
打印:222111Exited

当我们程序启动时,默认会创建一个主的isolate(隔离)(相当于一个运行环境),启动事件循环,事件会按照FIFO的顺序依次执行,优先处理 microtask queue中的任务,然后处理 event queue中的任务。

按照第二种方式,我们执行如下任务:

void fut0() {  print('1');
 final fut0 = Future(() {    for (var i=0; i< 10000; i++) {      for (var j=0; j<10000; j++) {
       
     }
   }    return 'xxxxxx';
 }).then((value) => {    print(value)
 });
 Future.microtask(() => print('3'));  print('2');
}

其中执行了一个耗时的操作,最后输出:

1
2
3
xxxxxx

事件按照FIFO的方式,fut0 中的print('1')和print('2')方法会优先执行,然后在处理microtask queue中的任务输出 3,最后处理 event queue中的任务输出 xxxxxx
到这里,我们可以知道,一个 
isolate(隔离) 相当于一个线程,因为事件处理会按照顺序执行,一个 isolate(隔离) 无法实现多线程并行的方式

1.4:针对第三种方式处理(多线程并行) -> Dart 中的 isolate(隔离)

到这里,我们需要重新去了解 Dart,在 1.2 中,我们提到 Dart 是单线程编程语言,这是因为我们在上层写代码的时候,不需要考虑并发问题,不需要对你的状态上锁这些,但其实它是由多个线程配合实现的单线程环境,那么 Dart 是如何实现多线程功能的?

在 1.3 中,我们提到 isolate隔离,默认情况下,在一个isolate隔离中的代码都是按照顺序执行的,所有无法实现多线程,那么我们是否可以创建多个isolate隔离来实现多线程?

  • 让我们先来了解一下isolate隔离是什么?
    isolate:英文翻译为隔离,它是类似于线程
    (thread)但不共享内存的独立运行的worker ,类似于一个进程
    我们可以查看文档中对 
    isolate的解释;大致翻译如下:

  1. 一个Isolate对象,是对一个 isolate隔离的引用,当我们创建一个新的Isolate 对象时,如果成功会生成一个新的isolate隔离(开辟一个新的独立的内存空间),在新的isolate隔离中,运行的代码会隔离在自己的事件循环中。按照这样,这样我们可以创建多个isolate隔离实现并发,因为每个isolate隔离是相互独立的。

  2. Isolate对象 允许被其他isolate控制、监听它所代表的isolate的事件循环,例如当这个isolate发生未捕获错误时,可以暂停(pause)isolate或获取(addErrorListener)错误信息。按照这样 isolate隔离是可以相互通信的。

  3. 代码默认运行在一个 main isolate隔离 中。

以下是网上找到的例子:这里创建了一个新的isolate隔离,然后在 main isolate隔离 去监听执行结果

import 'dart:isolate';
main(List<String> args) async {  print('test1');
 await start();  for (int i=0; i< 100; i++) {    print('testxxx$i');
 }  print('test3');
}
Isolate isolate;
start() async {  print('test5');  //创建接收端口,用来接收子线程消息
 ReceivePort receivePort = ReceivePort();  //创建并发Isolate,并传入主线程发送端口
 isolate = await Isolate.spawn(entryPoint, receivePort.sendPort);  //监听子线程消息
receivePort.listen((data) {   print('Data:$data');
 });
}//并发IsolateentryPoint(SendPort sendPort) {  for (var i=0; i< 3000; i++) {    print(i);
 }
 sendPort.send('xxxxx');
}

在执行之前,我们先猜想一下,有以下几种结果:
1:如果 await start(); 返回的是一个普通Future,那么输出顺序:

test1 -> test5 -> testxxx1 ->testxxx99 -> test3 -> Future中的输出

2:先执行 start() async中的代码,输出顺序:

test1 -> test5 -> test1 ->test2999 -> testxxx1 ->testxxx99 -> test3

3:以上两种都是按照顺序执行的,那么是否会出现并行?

test1 -> test5  -> ?(交错输出) -> test3

执行结果:

test1test5012...
testxxx0
testxxx1
testxxx2
testxxx3
testxxx4
...269426952696testxxx44
testxxx45
...2901testxxx93
testxxx94
testxxx95
testxxx96
testxxx97
testxxx98
testxxx99
test3
...29982999Data:xxxxx

我们可以通过打印输出观察到,确实是第三种输出,输出的顺序是不固定的,也就是说 await Isolate.spawn(entryPoint, receivePort.sendPort); 创建了一个新的isolate隔离,新的isolate隔离main isolate隔离同时执行。

结论:Dart可以通过isolate隔离来实现多线程并发执行
(在测试中,上面循环如果少于3000,无法产生我们预想的结果,原因未知)

通常情况下,我们的功能需求通过第二种方式中就可以完成,不需要新创建isolate隔离来实现

2:Future常见的创建方法

Future 提供了多种不同的 factory 方法,通过不同的方法,可以创建多种功能的Future

2.1:最基础的工厂方法

void futu1() {
 Future(() {    for (var i=0; i< 10; i++) {      print(i);
   }    return 'xxx';
 }).then((value) => {    print(value)
 });
}

2.2:Future.microtask 将事件添加到 microtask queue队列中

void futu2() {
 Future.microtask(() => {    for (var i=0; i< 10; i++) {      print(i)
   }
 }).then((value) => {    print(value)
 });
}

2.3:Future.sync,future中的任务同步执行

void futu3() {  print('1');
 Future.sync(() => {    for (var i=0; i< 10; i++) {      print(i)
   }
 }).then((value) => {
   
 });  print('3');
}

输出:

1
test0
test1
test2
test3
test4
test5
test6
test7
test8
test9
3

2.4:Future.value()

Future<String> futu4() {  return Future.value('3');
}

2.5:Future.delayed 延时执行操作

void futu5() {
 Future.delayed(new Duration(seconds: 2)).then((value) => {    print('22')
 });
}

2.6:Future.wait[xxx] 等待 [xxx]中的任务都执行完之后,才执行

void futu6() {  print('11');
 Future.wait([Future.delayed(new Duration(seconds: 2)).then((value) => {    print('22')
 }), Future(() {    print('33');
 })]).then((value) => {    print('44')
 });  print('55');
}

输出:

11
55
33
22
44

2.7:Future.any[xxx] 只要 [xxx]中的任务有一个执行完成,就执行

void futu7() {  print('11');
 Future.any([Future.delayed(new Duration(seconds: 2)).then((value) => {    print('22')
 }), Future(() {    print('33');
 })]).then((value) => {    print('44')
 });  print('55');
}

输出:

11
55
33
44
22

3:Future常见用法

3.1:获取 future 的执行结果
  1. 使用then获取future的返回值

void futu1() {
 Future(() {    return 'xxx';
 }).then((value) => {    print(value)
 });
}

  1. 使用asyncawait来获取返回值

futu1() async {  var value = await Future(() {    for (var i=0; i< 10; i++) {
     print(i);
   }    return 'xxx';
 });
 print(value);
}

3.2:使用 catchError 拦截future中的错误

void futu2() {
 Future(() => {    throw new Exception("3")
 }).then((value) => {    print(value)
 }).catchError((error) {    print(error);
 });
}

3.3:使用 whenComplete 监听 future 执行结束

void futu2() {
 Future(() => {    throw new Exception("3")
 }).then((value) => {    print(value)
 }).catchError((error) {    print(error);
 }).whenComplete(() => {    print('complete')
 });
}

3.4:使用 timeout 判断 future 执行超时

void futu2() {
 Future.delayed(Duration(seconds: 3), () {    return 'xxx';
 }).timeout(Duration(seconds: 2)).then((value) => {    print(value)
 }).catchError((error) {    print(error);
 }).whenComplete(() => {    print('complete')
 });
}

输出:

TimeoutException after 0:00:02.000000: Future not completedcomplete

4:FutureOr

FutureOr<T>:表示Future<T>T的值的类型
获取 
FutureOr<T>的值,需要先判断类型,通常使用如下判断:

if (value is Future<T>) {    var result = await value;
   print(result);
}else {
   print(value);
}

参考:
Dart 异步编程:

https://zhuanlan.zhihu.com/p/102671345

阻塞IO与非阻塞IO:

https://blog.csdn.net/qq_34638435/article/details/81878301

搞懂Dart异步并封装Isolate:

https://www.colabug.com/2020/0819/7638836/

Dart中的Isolate:

https://blog.csdn.net/joye123/article/details/102913497

Dart 中的多线程 与 Future

扫描二维码

了解更多信息