vlambda博客
学习文章列表

收藏|Flink比Spark好在哪?

1 Flink介绍

Flink 是一个面向分布式数据流处理和批量数据处理的开源计算平台。和 Spark 类似,两者都希望提供一个统一功能的计算平台给用户,都在尝试建立一个统一的平台以运行批量,流式,交互式,图处理,机器学习等应用。

收藏|Flink比Spark好在哪?

1.1部署模式

Flink 集群的部署,本身不依赖 Hadoop 集群,如果用到 HDFS 或是 HBase 中的存储数据,就需要选择对应的 Hadoop 版本。

收藏|Flink比Spark好在哪?

  • Standalone

  • YARN

  • Mesos

  • Cloud

1.2整合支持

  1. Flink支持消费kafka的数据;

  2. 支持HBase,Cassandra, ElasticSearch

  3. 支持与Alluxio的整合

  4. 支持RabbitMQ

1.3 API支持

  • Streaming数据类应用,提供DataStream API

  • 对批处理类应用,提供DataSet API(支持Java/Scala

  • 对流处理和批处理,都支持Table API

  • 支持双流join

1.4 Libraries支持

  • 支持机器学习(FlinkML

  • 支持图分析(Gelly

  • 支持关系数据处理(Table

  • 支持复杂事件处理(CEP

1.5 Flink on YARN

收藏|Flink比Spark好在哪?

Flink提供两种Yarn的部署方式Yarn Setup

Start a long-running Flink cluster on YARN

  • 通过命令yarn-session.sh来实现,本质上是在yarn集群上启动一个flink集群。

  • yarn预先给flink集群分配若干个containerflink使用,在yarn的界面上只能看到一个Flink session with X TaskManagers的任务。

  • 只有一个Flink界面,可以从YarnApplicationMaster链接进入。

  • 使bin/flink run命令发布任务时,本质上是使用Flink自带的调度,与普通的在Flink集群上发布任务并没有不同。不同的任务可能在一个TaskManager中,也即是在一个JVM进程中,无法实现资源隔离。

Run a Flink job on YARN

  • 通过命令bin/flink run -m yarn-cluster实现,一次只发布一个任务,本质上给每个flink任务启动了一个集群。

  • yarn不事先给flink分配container,而是在任务发布时,启动JobManager(对应YarnAM)和TaskManager,如果一个任务指定了nTaksManager(-yn n),则会启动n+1Container,其中一个是JobManager

  • 发布m个应用,则有mFlink界面,对比方式一,同样发布m个应用,会多出m-1JobManager的。

  • 发布任务时,实际上是使用了Yarn的调用。不同的任务不可能在一个ContainerJVM)中,也即是实现了资源隔离。

以第一种启动方式为例,其主要启动流程如下:

首先我们通过下面的命令行启动flink on yarn的集群
这里将产生总共五个进程:

  • 1FlinkYarnSessionCli ---> Yarn Client

  • 1YarnApplicationMasterRunner ---> AM + JobManager

  • 3YarnTaskManager --> TaskManager

即一个客户端+4container1container启动AM3container启动TaskManager

yarn-session.sh支持的参数:

收藏|Flink比Spark好在哪?

一个Flink环境在YARN上的启动流程:

  1. FlinkYarnSessionCli 启动的过程中首先会检查Yarn上有没有足够的资源去启动所需要的container,如果有,则上传一些flinkjar和配置文件到HDFS,这里主要是启动AM进程和TaskManager进程的相关依赖jar包和配置文件。

      从这个启动过程中可以看出,在每次启动Flink on YARN之前,需要指定启动多少个TaskManager,每个taskManager分配的资源是固定的,也就是说这个资源量从taskManager出生到死亡,资源情况一直是这么多,不管它所承载的作业需求资源情况,这样在作业需要更多资源的时候,没有更多的资源分配给对应的作业,相反,当一个作业仅需要很少的资源就能够运行的时候,仍然分配的是那些固定的资源,造成资源的浪费。

用户实现的Flink程序是由StreamTransformation这两个基本构建块组成,其中Stream是一个中间结果数据,而Transformation是一个操作,它对一个或多个输入Stream进行计算处理,输出一个或多个结果Stream。当一个Flink程序被执行的时候,它会被映射为Streaming Dataflow。一个Streaming Dataflow是由一组StreamTransformation Operator组成,它类似于一个DAG图,在启动的时候从一个或多个Source Operator开始,结束于一个或多个Sink Operator

下面是一个由Flink程序映射为Streaming Dataflow的示意图,如下所示:

收藏|Flink比Spark好在哪?

FlinkKafkaConsumer是一个Source OperatormapkeyBytimeWindowapplyTransformation OperatorRollingSink是一个Sink Operator

1.6 CEP(Complex event processing)

Flink CEP 是一套极具通用性、易于使用的实时流式事件处理方案。作为 Flink 的原生组件,省去了第三方库与 Flink 配合使用时可能会导致的各种问题。但其功能现阶段看来还比较基础,不能表达复杂的业务场景,同时它不能够做到动态更新。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);  // (Event, timestamp)  DataStream<Event> input = env.fromElements(  Tuple2.of(new Event(1, "start", 1.0), 5L),  Tuple2.of(new Event(2, "middle", 2.0), 1L),  Tuple2.of(new Event(3, "end", 3.0), 3L),  Tuple2.of(new Event(4, "end", 4.0), 10L),  Tuple2.of(new Event(5, "middle", 6.0), 7L),  Tuple2.of(new Event(6, "middle", 5.0), 7L),  // last element for high final watermark  Tuple2.of(new Event(7, "middle", 5.0), 100L)         ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event, Long>>() { 

具体的业务逻辑

Pattern<Event, ? extends Event> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {   @Override  public boolean filter(Event value) throws Exception {  return value.getName().equals("start");  }  }).followedByAny("middle").where(new SimpleCondition<Event>() {  @Override  public boolean filter(Event value) throws Exception {  return value.getName().equals("middle");  }  }).followedByAny("end").where(new SimpleCondition<Event>() {  @Override  public boolean filter(Event value) throws Exception {  return value.getName().equals("end");  }  });   DataStream<String> result = CEP.pattern(input, pattern, comparator).select(  new PatternSelectFunction<Event, String>() {   @Override  public String select(Map<String, List<Event>> pattern) {  StringBuilder builder = new StringBuilder();   builder.append(pattern.get("start").get(0).getId()).append(",")  .append(pattern.get("middle").get(0).getId()).append(",")  .append(pattern.get("end").get(0).getId());   return builder.toString();  }  }  ); 



从例子代码中可以看到,patterns需要用java代码写,需要编译,很冗长很麻烦,没法动态配置;需要可配置,或提供一种DSL再者,对于一个流同时只能设置一个pattern,比如对于不同的用户实例想配置不同的pattern,就没法支持;需要支持按key设置pattern

1.7 Flink目前存在的一些问题

在实时计算中有这么一个普遍的逻辑:业务逻辑中以一个流式数据源与几个相关的配置表进行join操作,而配置表并不是一成不变的,会定期的进行数据更新,可以看成一个缓慢变化的流。这种join环境存在以下几个尚未解决的问题:

1.对元数据库的读压力;如果分析程序有1000并发,是否需要读1000次;

2.读维表数据不能拖慢主数据流的throughput,每秒千万条数据量;

3.动态维表更新问题和一致性问题;元数据是不断变化的,如何把更新同步到各个并发上;

4.冷启动问题,如何保证主数据流流过的时候,维表数据已经ready,否则会出现数据无法处理;

5.超大维表数据会导致流量抖动和频繁gc,比如几十万条的实例数据,可能上百兆。

Flink社区,对该问题也进行了关注

https://issues.apache.org/jira/browse/FLINK-6131

收藏|Flink比Spark好在哪?

https://issues.apache.org/jira/browse/FLINK-2320

收藏|Flink比Spark好在哪?

https://issues.apache.org/jira/browse/FLINK-3514

收藏|Flink比Spark好在哪?

当然在生产环境上也有相应的解决方案:

使用redis来做cache,只用一个job,负责从元数据库同步数据到redis,这样就解决13

然后所有的并发都从redis直接查询需要的元数据,这样就解决4;对于2,在并发上做local cache,只有第一次需要真正查询redis,后续定期异步更新就好,不会影响到主数据流;对于5,因为现在不需要一下全量的读取维表数据到内存,用到的时候才去读,分摊了负载,也可以得到缓解。

这个方案也有一定的弊端,增加了架构的外部依赖,要额外保障外部redis和同步job的稳定性。

2 Flink vs Spark

2.1 框架

Sparkstreaming看成是更快的批处理,而Flink把批处理看成streamingspecial case。这里面的思路决定了各自的方向,其中两者的差异点有如下这些:
实时 vs 近实时的角度:Flink提供了基于每个事件的流式处理机制,所以可以被认为是一个真正的流式计;而Spark,不是基于事件的粒度,而是用小批量来模拟流式,也就是多个事件的集合。所以Spark被认为是近实时的处理系统。
  Spark streaming 是更快的批处理,而Flink Batch是有限数据的流式计算。

2.1.1 流式计算和批处理API

Spark对于流式计算和批处理,都是基于RDD的抽象。这样很方便将两种计算方式合并表示。而Flink将流式计算和批处理分别抽象出来DataStreamDataSet两种API,这一点上Flink相对于spark来说是一个糟糕的设计。

2.2 社区活跃度对比

收藏|Flink比Spark好在哪?

收藏|Flink比Spark好在哪?

Spark 2.3 继续向更快、更易用、更智能的目标迈进,引入了低延迟的持续处理能力和流到流的连接,让 Structured Streaming 达到了一个里程碑式的高度。

3 提交一个Flink作业

启动flink服务

./bin/yarn-session.sh -n 4 -jm 2048 -tm 2048

收藏|Flink比Spark好在哪?

yarn监控界面上可以看到该作业的执行状态

收藏|Flink比Spark好在哪?

并验证Wordcount例子

./bin/flink run -m yarn-cluster -yn 4 -yjm 2048 -ytm 2048 ./examples/batch/WordCount.jar


client端可以看到log

收藏|Flink比Spark好在哪?

收藏|Flink比Spark好在哪?

历史好文推荐






点个 在看 ,我会高兴一整天