Dart 中的多线程 与 Future
1:Dart中的异步执行
1.1:在开始之前,我们先要了解什么是阻塞与非阻塞?
阻塞:阻塞的系统调用是指,当进行系统调用时,除非出错(被信号打断也视为出错),进程将会一直陷入内核态直到调用完成。
非阻塞:非阻塞的系统调用是指无论I/O操作成功与否,调用都会立即返回。
在《Unix网络编程》一书中提到了五种IO模型,分别是:阻塞IO、非阻塞IO、多路复用IO、信号驱动IO以及异步IO。
关于以上五种IO模型的介绍
1.2:Dart中是如何处理阻塞的
之前有 android
和 iOS
开发经验的小伙伴都知道什么是异步执行;在移动端都存在多线程这个概念,当我们需要处理一段耗时的代码,就需要开启一个子线程去执行这段代码,防止阻塞主线程,导致界面卡顿。
如果是前端开发的小伙伴就知道,在 js
中,是没有多线程这个概念的,当我们需要去处理耗时操作,我们可以这样写:
new Promise((resolve, reject) =>{ // 网络请求等耗时操作
}).then((success) => { //成功 console.log(success);
}).catch((error) => { // 失败 console.log(error);
});
通过 Promise
对象我们可以异步处理一段阻塞的代码。
与 js
类似,Dart
也是单线程编程语言,也存在一个特定的方式去处理需要异步执行的代码,如下:
void fut0() {
final fut0 = Future(() { // 网络请求等耗时操作
}).then((value) => {
});
}
到这里为止,我们知道在Dart
中可以使用Future
对象来处理阻塞的代码。但是具体是如何处理的?
我们知道在操作系统中,提供很多底层的I/O
接口,这些接口模型可能是阻塞或者是非阻塞,亦或者是1.1中提到的其他几种模型。
对于不同的I/O
模型,Dart
提供了两种不同的处理方式。
在这里,我们先提出以下三种情况,通过对每个情况的分析,来了解异步处理的真正方式
第一种方式:如果我们调用的方法,在操作系统中存在异步
I/O
,则直接交给系统去执行;比如说,网络请求,Socket
本身提供了select
模型可以异步查询;而文件IO
,操作系统也提供了基于事件的回调机制,我们只需要监听到系统执行的回调就行了。
类似下面的例子:
void fut0() {
final fut0 = Future(() { // 网络请求等耗时操作
// socket 本身提供了异步查询
}).then((value) => {
});
}
第二种方式:不是所有的方法系统都有异步
I/O
,比如我们需要执行数据解析或者复杂的运算,这可能也很耗时;但是这些系统并没有提供异步I/0
的操作,我们没办法直接交给系统去执行。那Dart
是怎么将这些耗时的操作去异步执行的?第三种方式:针对于第二种方式,并且我希望这是一个并发执行的操作
1.3:针对第二种方式处理 -> Dart 中的事件循环与消息队列
事件循环
(Event Loop)
:在Dart
中,存在isolate
的概念,通常我们的代码都运行在一个isolate
的独立环境中,在isolate
环境中,存在一个事件循环,它在不停干一件事情:从事件队列中取出事件并处理它。
while(true) { event = event_queue.first() //取出事件
handleEvent(event) //处理事件
drop(event) //从队列中移除}
类似于iOS的runLoop
,android的Looper
,当我们点击屏幕,或者网络请求时,都会产生一个event
事件,这些事件会依次进入事件循环中被处理。
在 一个isolate
环境中,除了事件循环 (Event Loop)
,还存在两个消息队列。
event queue
事件队列:用来处理外部的事件,如果IO、点击、绘制、计时器(timer)和不同 isolate 之间的消息事件等。microtask queue
微任务队列:处理来自于Dart内部的任务,适合用来不会特别耗时或紧急的任务,微任务队列的处理优先级比事件队列的高,如果微任务处理比较耗时,会导致事件堆积,应用响应缓慢。
import 'dart:async';
main() { // 默认添加到事件队列 new Future(() => print('beautiful')); // 收到提交到微任务队列
Future.microtask(() => print('hi'));
}
打印:222111Exited
当我们程序启动时,默认会创建一个主的isolate(隔离)
(相当于一个运行环境),启动事件循环,事件会按照FIFO的顺序依次执行,优先处理 microtask queue
中的任务,然后处理 event queue
中的任务。
按照第二种方式,我们执行如下任务:
void fut0() { print('1');
final fut0 = Future(() { for (var i=0; i< 10000; i++) { for (var j=0; j<10000; j++) {
}
} return 'xxxxxx';
}).then((value) => { print(value)
});
Future.microtask(() => print('3')); print('2');
}
其中执行了一个耗时的操作,最后输出:
1
2
3
xxxxxx
事件按照FIFO的方式,fut0
中的print('1')和print('2')
方法会优先执行,然后在处理microtask queue
中的任务输出 3,最后处理 event queue
中的任务输出 xxxxxx
到这里,我们可以知道,一个 isolate(隔离)
相当于一个线程,因为事件处理会按照顺序执行,一个 isolate(隔离)
无法实现多线程并行的方式
1.4:针对第三种方式处理(多线程并行) -> Dart 中的 isolate(隔离)
到这里,我们需要重新去了解 Dart
,在 1.2 中,我们提到 Dart
是单线程编程语言,这是因为我们在上层写代码的时候,不需要考虑并发问题,不需要对你的状态上锁这些,但其实它是由多个线程配合实现的单线程环境,那么 Dart
是如何实现多线程功能的?
在 1.3 中,我们提到 isolate隔离
,默认情况下,在一个isolate隔离
中的代码都是按照顺序执行的,所有无法实现多线程,那么我们是否可以创建多个isolate隔离
来实现多线程?
让我们先来了解一下
isolate隔离
是什么?
isolate:英文翻译为隔离,它是类似于线程(thread)
但不共享内存的独立运行的worker
,类似于一个进程
我们可以查看文档中对isolate
的解释;大致翻译如下:
一个
Isolate对象
,是对一个isolate隔离
的引用,当我们创建一个新的Isolate 对象
时,如果成功会生成一个新的isolate隔离
(开辟一个新的独立的内存空间),在新的isolate隔离
中,运行的代码会隔离在自己的事件循环中。按照这样,这样我们可以创建多个isolate隔离
实现并发,因为每个isolate隔离
是相互独立的。Isolate对象
允许被其他isolate
控制、监听它所代表的isolate
的事件循环,例如当这个isolate
发生未捕获错误时,可以暂停(pause)
此isolate
或获取(addErrorListener)
错误信息。按照这样isolate隔离
是可以相互通信的。代码默认运行在一个
main isolate隔离
中。
以下是网上找到的例子:这里创建了一个新的isolate隔离
,然后在 main isolate隔离
去监听执行结果
import 'dart:isolate';
main(List<String> args) async { print('test1');
await start(); for (int i=0; i< 100; i++) { print('testxxx$i');
} print('test3');
}
Isolate isolate;
start() async { print('test5'); //创建接收端口,用来接收子线程消息
ReceivePort receivePort = ReceivePort(); //创建并发Isolate,并传入主线程发送端口
isolate = await Isolate.spawn(entryPoint, receivePort.sendPort); //监听子线程消息
receivePort.listen((data) { print('Data:$data');
});
}//并发IsolateentryPoint(SendPort sendPort) { for (var i=0; i< 3000; i++) { print(i);
}
sendPort.send('xxxxx');
}
在执行之前,我们先猜想一下,有以下几种结果:
1:如果 await start(); 返回的是一个普通Future,那么输出顺序:
test1 -> test5 -> testxxx1 ->testxxx99 -> test3 -> Future中的输出
2:先执行 start() async
中的代码,输出顺序:
test1 -> test5 -> test1 ->test2999 -> testxxx1 ->testxxx99 -> test3
3:以上两种都是按照顺序执行的,那么是否会出现并行?
test1 -> test5 -> ?(交错输出) -> test3
执行结果:
test1test5012...
testxxx0
testxxx1
testxxx2
testxxx3
testxxx4
...269426952696testxxx44
testxxx45
...2901testxxx93
testxxx94
testxxx95
testxxx96
testxxx97
testxxx98
testxxx99
test3
...29982999Data:xxxxx
我们可以通过打印输出观察到,确实是第三种输出,输出的顺序是不固定的,也就是说 await Isolate.spawn(entryPoint, receivePort.sendPort);
创建了一个新的isolate隔离
,新的isolate隔离
与main isolate隔离
同时执行。
结论:Dart
可以通过isolate隔离
来实现多线程并发执行
(在测试中,上面循环如果少于3000,无法产生我们预想的结果,原因未知)
通常情况下,我们的功能需求通过第二种方式中就可以完成,不需要新创建isolate隔离
来实现
2:Future常见的创建方法
Future
提供了多种不同的 factory
方法,通过不同的方法,可以创建多种功能的Future
。
2.1:最基础的工厂方法
void futu1() {
Future(() { for (var i=0; i< 10; i++) { print(i);
} return 'xxx';
}).then((value) => { print(value)
});
}
2.2:Future.microtask 将事件添加到 microtask queue
队列中
void futu2() {
Future.microtask(() => { for (var i=0; i< 10; i++) { print(i)
}
}).then((value) => { print(value)
});
}
2.3:Future.sync,future中的任务同步执行
void futu3() { print('1');
Future.sync(() => { for (var i=0; i< 10; i++) { print(i)
}
}).then((value) => {
}); print('3');
}
输出:
1
test0
test1
test2
test3
test4
test5
test6
test7
test8
test9
3
2.4:Future.value()
Future<String> futu4() { return Future.value('3');
}
2.5:Future.delayed 延时执行操作
void futu5() {
Future.delayed(new Duration(seconds: 2)).then((value) => { print('22')
});
}
2.6:Future.wait[xxx] 等待 [xxx]中的任务都执行完之后,才执行
void futu6() { print('11');
Future.wait([Future.delayed(new Duration(seconds: 2)).then((value) => { print('22')
}), Future(() { print('33');
})]).then((value) => { print('44')
}); print('55');
}
输出:
11
55
33
22
44
2.7:Future.any[xxx] 只要 [xxx]中的任务有一个执行完成,就执行
void futu7() { print('11');
Future.any([Future.delayed(new Duration(seconds: 2)).then((value) => { print('22')
}), Future(() { print('33');
})]).then((value) => { print('44')
}); print('55');
}
输出:
11
55
33
44
22
3:Future常见用法
3.1:获取 future 的执行结果
使用
then
获取future
的返回值
void futu1() {
Future(() { return 'xxx';
}).then((value) => { print(value)
});
}
使用
async
和await
来获取返回值
futu1() async { var value = await Future(() { for (var i=0; i< 10; i++) {
print(i);
} return 'xxx';
});
print(value);
}
3.2:使用 catchError 拦截future中的错误
void futu2() {
Future(() => { throw new Exception("3")
}).then((value) => { print(value)
}).catchError((error) { print(error);
});
}
3.3:使用 whenComplete 监听 future 执行结束
void futu2() {
Future(() => { throw new Exception("3")
}).then((value) => { print(value)
}).catchError((error) { print(error);
}).whenComplete(() => { print('complete')
});
}
3.4:使用 timeout 判断 future 执行超时
void futu2() {
Future.delayed(Duration(seconds: 3), () { return 'xxx';
}).timeout(Duration(seconds: 2)).then((value) => { print(value)
}).catchError((error) { print(error);
}).whenComplete(() => { print('complete')
});
}
输出:
TimeoutException after 0:00:02.000000: Future not completedcomplete
4:FutureOr
FutureOr<T>
:表示Future<T>
或T
的值的类型
获取 FutureOr<T>
的值,需要先判断类型,通常使用如下判断:
if (value is Future<T>) { var result = await value;
print(result);
}else {
print(value);
}
参考:
Dart 异步编程:
阻塞IO与非阻塞IO:
搞懂Dart异步并封装Isolate:
Dart中的Isolate:
https://blog.csdn.net/joye123/article/details/102913497
扫描二维码
了解更多信息