收藏|Flink比Spark好在哪?
1 Flink介绍
Flink 是一个面向分布式数据流处理和批量数据处理的开源计算平台。和 Spark 类似,两者都希望提供一个统一功能的计算平台给用户,都在尝试建立一个统一的平台以运行批量,流式,交互式,图处理,机器学习等应用。
1.1部署模式
Flink 集群的部署,本身不依赖 Hadoop 集群,如果用到 HDFS 或是 HBase 中的存储数据,就需要选择对应的 Hadoop 版本。
Standalone
YARN
Mesos
Cloud
1.2整合支持
Flink支持消费kafka的数据;
支持HBase,Cassandra, ElasticSearch
支持与Alluxio的整合
支持RabbitMQ
1.3 API支持
对Streaming数据类应用,提供DataStream API
对批处理类应用,提供DataSet API(支持Java/Scala)
对流处理和批处理,都支持Table API
支持双流join
1.4 Libraries支持
支持机器学习(FlinkML)
支持图分析(Gelly)
支持关系数据处理(Table)
支持复杂事件处理(CEP)
1.5 Flink on YARN
Flink提供两种Yarn的部署方式Yarn Setup:
Start a long-running Flink cluster on YARN
通过命令yarn-session.sh来实现,本质上是在yarn集群上启动一个flink集群。
由yarn预先给flink集群分配若干个container给flink使用,在yarn的界面上只能看到一个Flink session with X TaskManagers的任务。
只有一个Flink界面,可以从Yarn的ApplicationMaster链接进入。
使用bin/flink run命令发布任务时,本质上是使用Flink自带的调度,与普通的在Flink集群上发布任务并没有不同。不同的任务可能在一个TaskManager中,也即是在一个JVM进程中,无法实现资源隔离。
Run a Flink job on YARN
通过命令bin/flink run -m yarn-cluster实现,一次只发布一个任务,本质上给每个flink任务启动了一个集群。
yarn不事先给flink分配container,而是在任务发布时,启动JobManager(对应Yarn的AM)和TaskManager,如果一个任务指定了n个TaksManager(-yn n),则会启动n+1个Container,其中一个是JobManager。
发布m个应用,则有m个Flink界面,对比方式一,同样发布m个应用,会多出m-1个JobManager的。
发布任务时,实际上是使用了Yarn的调用。不同的任务不可能在一个Container(JVM)中,也即是实现了资源隔离。
以第一种启动方式为例,其主要启动流程如下:
首先我们通过下面的命令行启动flink on yarn的集群
这里将产生总共五个进程:
1个FlinkYarnSessionCli ---> Yarn Client
1个YarnApplicationMasterRunner ---> AM + JobManager
3个YarnTaskManager --> TaskManager
即一个客户端+4个container,1个container启动AM,3个container启动TaskManager。
yarn-session.sh支持的参数:
一个Flink环境在YARN上的启动流程:
FlinkYarnSessionCli 启动的过程中首先会检查Yarn上有没有足够的资源去启动所需要的container,如果有,则上传一些flink的jar和配置文件到HDFS,这里主要是启动AM进程和TaskManager进程的相关依赖jar包和配置文件。
从这个启动过程中可以看出,在每次启动Flink on YARN之前,需要指定启动多少个TaskManager,每个taskManager分配的资源是固定的,也就是说这个资源量从taskManager出生到死亡,资源情况一直是这么多,不管它所承载的作业需求资源情况,这样在作业需要更多资源的时候,没有更多的资源分配给对应的作业,相反,当一个作业仅需要很少的资源就能够运行的时候,仍然分配的是那些固定的资源,造成资源的浪费。
用户实现的Flink程序是由Stream和Transformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组Stream和Transformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator。
下面是一个由Flink程序映射为Streaming Dataflow的示意图,如下所示:
FlinkKafkaConsumer是一个Source Operator,map、keyBy、timeWindow、apply是Transformation Operator,RollingSink是一个Sink Operator。
1.6 CEP(Complex event processing)
Flink CEP 是一套极具通用性、易于使用的实时流式事件处理方案。作为 Flink 的原生组件,省去了第三方库与 Flink 配合使用时可能会导致的各种问题。但其功能现阶段看来还比较基础,不能表达复杂的业务场景,同时它不能够做到动态更新。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// (Event, timestamp)
DataStream<Event> input = env.fromElements(
Tuple2.of(new Event(1, "start", 1.0), 5L),
Tuple2.of(new Event(2, "middle", 2.0), 1L),
Tuple2.of(new Event(3, "end", 3.0), 3L),
Tuple2.of(new Event(4, "end", 4.0), 10L),
Tuple2.of(new Event(5, "middle", 6.0), 7L),
Tuple2.of(new Event(6, "middle", 5.0), 7L),
// last element for high final watermark
Tuple2.of(new Event(7, "middle", 5.0), 100L)
).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() {
具体的业务逻辑
Pattern<Event, ? extends Event> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("start");
}
}).followedByAny("middle").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("middle");
}
}).followedByAny("end").where(new SimpleCondition<Event>() {
@Override
public boolean filter(Event value) throws Exception {
return value.getName().equals("end");
}
});
DataStream<String> result = CEP.pattern(input, pattern, comparator).select(
new PatternSelectFunction<Event, String>() {
@Override
public String select(Map<String, List<Event>> pattern) {
StringBuilder builder = new StringBuilder();
builder.append(pattern.get("start").get(0).getId()).append(",")
.append(pattern.get("middle").get(0).getId()).append(",")
.append(pattern.get("end").get(0).getId());
return builder.toString();
}
}
);
从例子代码中可以看到,patterns需要用java代码写,需要编译,很冗长很麻烦,没法动态配置;需要可配置,或提供一种DSL;再者,对于一个流同时只能设置一个pattern,比如对于不同的用户实例想配置不同的pattern,就没法支持;需要支持按key设置pattern。
1.7 Flink目前存在的一些问题
在实时计算中有这么一个普遍的逻辑:业务逻辑中以一个流式数据源与几个相关的配置表进行join操作,而配置表并不是一成不变的,会定期的进行数据更新,可以看成一个缓慢变化的流。这种join环境存在以下几个尚未解决的问题:
1.对元数据库的读压力;如果分析程序有1000并发,是否需要读1000次;
2.读维表数据不能拖慢主数据流的throughput,每秒千万条数据量;
3.动态维表更新问题和一致性问题;元数据是不断变化的,如何把更新同步到各个并发上;
4.冷启动问题,如何保证主数据流流过的时候,维表数据已经ready,否则会出现数据无法处理;
5.超大维表数据会导致流量抖动和频繁gc,比如几十万条的实例数据,可能上百兆。
在Flink社区,对该问题也进行了关注
https://issues.apache.org/jira/browse/FLINK-6131
https://issues.apache.org/jira/browse/FLINK-2320
https://issues.apache.org/jira/browse/FLINK-3514
当然在生产环境上也有相应的解决方案:
使用redis来做cache,只用一个job,负责从元数据库同步数据到redis,这样就解决1,3
然后所有的并发都从redis直接查询需要的元数据,这样就解决4;对于2,在并发上做local cache,只有第一次需要真正查询redis,后续定期异步更新就好,不会影响到主数据流;对于5,因为现在不需要一下全量的读取维表数据到内存,用到的时候才去读,分摊了负载,也可以得到缓解。
这个方案也有一定的弊端,增加了架构的外部依赖,要额外保障外部redis和同步job的稳定性。
2 Flink vs Spark
2.1 框架
Spark把streaming看成是更快的批处理,而Flink把批处理看成streaming的special case。这里面的思路决定了各自的方向,其中两者的差异点有如下这些:
实时 vs 近实时的角度:Flink提供了基于每个事件的流式处理机制,所以可以被认为是一个真正的流式计;而Spark,不是基于事件的粒度,而是用小批量来模拟流式,也就是多个事件的集合。所以Spark被认为是近实时的处理系统。
Spark streaming 是更快的批处理,而Flink Batch是有限数据的流式计算。
2.1.1 流式计算和批处理API
Spark对于流式计算和批处理,都是基于RDD的抽象。这样很方便将两种计算方式合并表示。而Flink将流式计算和批处理分别抽象出来DataStream和DataSet两种API,这一点上Flink相对于spark来说是一个糟糕的设计。
2.2 社区活跃度对比
Spark 2.3 继续向更快、更易用、更智能的目标迈进,引入了低延迟的持续处理能力和流到流的连接,让 Structured Streaming 达到了一个里程碑式的高度。
3 提交一个Flink作业
启动flink服务
./bin/yarn-session.sh -n 4 -jm 2048 -tm 2048
在yarn监控界面上可以看到该作业的执行状态
并验证Wordcount例子
./bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 2048
./examples/batch/WordCount.jar
在client端可以看到log: