「大数据」(八十三)Spark之Streaming实时流
【导读:数据是二十一世纪的石油,蕴含巨大价值,这是·情报通·大数据技术系列第[83]篇文章,欢迎阅读和收藏】
1 基本概念
Spark Streaming 是 Spark 核心 API 的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括 Kafk 、 Flume 、 Twitter 、 ZeroMQ 、 Kinesis 以及 TCP sockets ,从数据源获取数据之后,可以使用诸如 map 、 reduce 、 join 和 window 等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。在“ One Stack rule them all ”的基础上,还可以使用 Spark 的其他子框架,如集群学习、图计算等,对流数据进行处理。
2 原理介绍
2.1 Spark Streaming 处理的数据流图
Spark Streaming 的基本原理是将输入数据流以时间片(秒级)为单位进行拆分 ,
然后以类似批处理的方式处理每个时间片数据
首先, Spark Streaming 把实时输入数据流以时间片Δ t (如 1 秒)为单位切分成块。Spark Streaming 会把每块数据作为一个 RDD ,并使用 RDD 操作处理每一小块数据。每个块都会生成一个 Spark Job 处理,最终结果也返回多块。
2.2 SparkStreaming 支持 的业务场景
目前而言 SparkStreaming 主要支持以下三种业务场景 :
1. 无状态操作:只关注当前的 DStream 中的实时数据,例如 只对当前 DStream 中的数据做正确性校验
2. 有状态操作:对有状态的 DStream 进行操作时 , 需要依赖之前的数据 例如 统计网站各个模块总的访问量
3. 窗口操作 : 对指定时间段范围内的 DStream 数据进行操作,例如 需要统计一天之内网站各个模块的访问数量
2.3 SparkStreaming 支持的操作
Discretized Stream 是 Spark Streaming 的基础抽象,代表持续性的数据流和经过各种 Spark 原语操作后的结果数据流。在内部实现上, DStream 由连续的序列化 RDD 来表示。支持的操作主要包含以下几种 :
1. Action
当某个 Output Operations 原语被调用时, stream 才会开始真正的计算过程。现阶段支持的 Output 方式有以下几种
print()
foreachRDD(func)
saveAsObjectFiles(prefix, [suffix])
saveAsTextFiles(prefix, [suffix])
saveAsHadoopFiles(prefix, [suffix])
2. 常规 RDD 的 Transformation 操作
对常规 RDD 使用的 transformation 操作,在 DStream 上都适用
3. 有状态的 Transformation
UpdateStateByKey: 使用该方法主要是使用目前的 DStream 数据来更新历史数据
4. 窗口的 Transformation
Window Operations 有点类似于 Storm 中的 State ,可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。
主要支持的操作有:
0. window(windowLength, slideInterval)
1. countByWindow(windowLength, slideInterval)
2. reduceByWindow(func, windowLength, slideInterval)
3. reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks])
4. reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks])
5. countByValueAndWindow(windowLength, slideInterval, [numTasks])
2.4 Spark Streaming 优化
监控手段:
一般来说,使用 Spark 自带的 Web UI 就能满足大部分的监控需求。对于 Spark Streaming 来说,以下两个度量指标尤为重要(在 Batch Processing Statistics 标签下):
Processing Time :处理每个 batch 的时间
Scheduling Delay: 每个 batch 在队列中等待前一个 batch 完成处理所等待的时间
若 Processing Time 的值一直大于 Scheduling Delay ,或者 Scheduling Delay 的值持续增长,代表系统已经无法处理这样大的数据输入量了,这时就需要考虑各种优化方法来增强系统的负载。
优化方式 :
1. 利用集群资源,减少处理每个批次的数据的时间
a. 控制 reduce 数量,太多的 reducer, 造成很多的小任务 , 以此产生很多启动任务的开销。太少的 reducer, 任务执行很慢 !
b. 序列化:包含输入数据序列化、 RDD 序列化、 TASK 序列化
2. 在 Standalone 及 coarse-grained 模式下的任务启动要比 fine-grained 省时
3. 给每个批次的数据量的设定一个合适的大小,原则 : 要来得及消化流进系统的数据
4. 内存调优
a. 清理缓存的 RDD
b. 在 spark.cleaner.ttl 之前缓存的 RDD 都会被清除掉
c. 设置 spark.streaming.unpersis, 系统为你分忧
d. 使用并发垃圾收集器