从 RxJS 到 Flink:如何处理数据流?
阿里妹导读:前端开发的本质是什么?响应式编程相对于 MVVM 或者 Redux 有什么优点?响应式编程的思想是否可以应用到后端开发中?本文以一个新闻网站为例,阐述在前端开发中如何使用响应式编程思想;再以计算电商平台双11每小时成交额为例,分享同样的思想在实时计算中的相同与不同之处。
文末福利:5本开发者职场电子书免费下载。
View = reactionFn(Event)
用户执行页面动作,例如 click, mousemove 等事件。
远程服务端与本地的数据交互,例如 fetch, websocket。
本地的异步事件,例如 setTimeout, setInterval async_event。
View = reactionFn(UserEvent | Timer | Remote API)
单击刷新:单击 Button 刷新数据。
勾选刷新:勾选 Checkbox 时自动刷新,否则停止自动刷新。
下拉刷新:当用户从屏幕顶端下拉时刷新数据。
单击刷新:click -> fetch
勾选刷新:change -> (setInterval + clearInterval) -> fetch
下拉刷新:(touchstart + touchmove + touchend) -> fetch news_app
State 只能用于描述中间状态,而不能描述中间过程。
Action 与 Event 的关系并非一一对应导致 State 难以追踪实际变化来源。
在计算中,响应式编程或反应式编程(英语:Reactive programming)是一种面向数据流和变化传播的声明式编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
点击按钮 -> 触发刷新事件 -> 发送请求 -> 更新视图
勾选自动刷新
手指触摸屏幕
自动刷新间隔 -> 触发刷新事件 -> 发送请求 -> 更新视图
手指在屏幕上下滑
自动刷新间隔 -> 触发刷新事件 -> 发送请求 -> 更新视图
手指在屏幕上停止滑动 -> 触发下拉刷新事件 -> 发送请求 -> 更新视图
自动刷新间隔 -> 触发刷新事件 -> 发送请求 -> 更新视图
关闭自动刷新
定义源数据流
组合/转换数据流
消费数据流并更新视图
涉及 click 数据流。
click$ = fromEvent<MouseEvent>(document.querySelector('button'), 'click');
涉及 change 数据流。
change$ = fromEvent(document.querySelector('input'), 'change');
涉及 touchstart , touchmove 与 touchend 三个数据流。
touchstart$ = fromEvent<TouchEvent>(document, 'touchstart');
touchend$ = fromEvent<TouchEvent>(document, 'touchend');
touchmove$ = fromEvent<TouchEvent>(document, 'touchmove');
= interval(5000);
'https://randomapi.azurewebsites.net/api/users'); = fromFetch(
clickRefresh$ = this.click$.pipe(debounceTime(300));
autoRefresh$ = change$.pipe(
switchMap(enabled => (enabled ? interval$ : EMPTY))
);
pullRefresh$ = touchstart$.pipe(
switchMap(touchStartEvent =>
touchmove$.pipe(
map(touchMoveEvent => touchMoveEvent.touches[0].pageY - touchStartEvent.touches[0].pageY),
takeUntil(touchend$)
)
),
filter(position => position >= 300),
take(1),
repeat()
);
refresh$ = merge(clickRefresh$, autoRefresh$, pullRefresh$));
<div *ngFor="let user of view$ | async">
</div>
View = reactionFn(UserEvent | Timer | Remote API)
1)描述源数据流
与事件UserEvent | Timer | Remote API 对应,在 RxJS 中对应函数分别是:
UserEvent: fromEvent
Timer: interval, timer
Remote API: fromFetch, webSocket
2)组合转换数据流
与响应函数(reactionFn)对应,在 RxJS 中对应的部分方法是:
COMBINING: merge, combineLatest, zip
MAPPING: map
FILTERING: filter
REDUCING: reduce, max, count, scan
TAKING: take, takeWhile
SKIPPING: skip, skipWhile, takeLast, last
TIME: delay, debounceTime, throttleTime
3)消费数据流更新视图
与 View 对应,在 RxJS 及 Angular 中可以使用:
subscribe
async pipe
描述事件发生的本身,而非计算过程或者中间状态。
提供了组合和转换数据流的方法,这也意味着我们获得了复用持续变化数据的方法。
由于所有数据流均由层层组合与转换获得,这也就意味着我们可以精确追踪事件及数据变化的来源。
Action 是 EventStream 的简化。
State 是 Stream 在某个时刻的对应。
The question is: do you really need Redux if you already use Rx? Maybe not. It's not hard to re-implement Redux in Rx. Some say it's a two-liner using Rx.scan() method. It may very well be!
所有事件 -- 找到 --> 相关事件 -- 做出 --> 响应
源数据流 -- 转换 --> 中间数据流 -- 订阅 --> 消费数据流
事件驱动型应用 事件驱动型应用从一个或多个事件流提取数据,并根据到来的事件触发计算、状态更新或其他外部动作。场景包括基于规则的报警,异常检测,反欺诈等等。
数据分析应用 数据分析任务需要从原始数据中提取有价值的信息和指标。例如双十一成交额计算,网络质量监测等等。
数据管道(ETL)应用 提取-转换-加载(ETL)是一种在存储系统之间进行数据转换和迁移的常用方法。ETL 作业通常会周期性地触发,将数据从事务型数据库拷贝到分析型数据库或数据仓库。
用户下单数据流 -- 转换 --> 每小时成交数据流 -- 订阅 --> 写入数据库
源数据流 -- 转换 --> 中间数据流 -- 订阅 --> 消费数据流
无限等下去:late event 可能在传输过程中丢失,window2 窗口永远没有数据产出。
等待时间太短:late event 还没有到来,计算结果错误。
在理想情况,在一个持久通道中缓冲数据。
当数据产生的速度高于中间节点处理能力,或者超过了下游数据的消费能力时,速度较慢的接收器会在队列的缓冲作用耗尽后立即降低发送器的速度。更形象的比喻是,在数据流流速变慢时,将整个管道从水槽“回压”到水源,并对水源进行节流,以便将速度调整到最慢的部分,从而达到稳定状态。
源数据流 -- 转换 --> 中间数据流 -- 订阅 --> 消费数据流
相关链接
[1]https://github.com/vthinkxie/ng-pull-refresh
开发者如何在职场稳步上升?很大程度决定于对技术和行业的掌握程度。本次精选5本超人气职场发展类电子书,从入门到晋升,从面试到“心法”,给你的职业生涯全方位的帮助!
点击“阅读原文”,快去下载吧~