Spark on Elasticsearch一致性问题的探索
作者导读:
Spark与Elasticsearch(es)的结合,是近年来大数据解决方案很火热的一个话题。一个是出色的分布式计算引擎,另一个是出色的搜索引擎。近年来,越来越多的成熟方案落地到行业产品中,包括我们耳熟能详的Spark+ES+HBase日志分析平台。
目前,华为云数据湖探索(DLI)服务已全面支持Spark/Flink跨源访问Elasticsearch。而之前在实现过程中也遇到过很多场景化问题,本文将挑选其中比较经典的分布式一致性问题进行探讨。
01
分布式一致性问题
图1 Spark task失败时向es写入了部分数据
图2 task重试成功后上一次写入的部分数据成为脏数据
解决方案
写覆盖
步骤一判断当前index中是否有数据
步骤二清空当前index中的数据
步骤三向index中写入数据
图3 使用overwrite模式,task重试时覆盖上一次数据
val dfWriter = sparkSession.createDataFrame(rdd, schema)
//
// 写入数据至es
//dfWriter.write
.format("es")
.option("es.resource", resource)
.option("es.nodes", nodes)
.mode(SaveMode.Overwrite)
.save()
// 插入数据至es
sparkSession.sql("insert overwrite table es_table values(1, 'John'),(2, 'Bob')")
02
最终一致性
图 Spark追加数据写入es
图4 用overwrite写入会将原先正确的数据覆盖掉
Elasticsearch |
关系型数据库 |
Index |
Database |
Type |
Table |
Document |
Row |
Field |
Column |
create table es_table(id int, name string) using es options(
'es.nodes' 'localhost:9200',
'es.resource'
'/mytest/anytype','es.mapping.id' 'id')")
图5 在插入数据时将主键设为doc_id,利用幂等插入来实现最终一致性
03
总结
图6 es使用bulk接口进行数据写入
04
扩展阅读:Elasticsearch Datasource
简介
功能描述
//
// 初始化设置
//
// 设置es的/index/type(es 6.x版本不支持同一个index中存在多个type,7.x版本不支持设置type)
val resource = "/mytest/anytype";
val nodes = "localhost:9200"
// 构造数据
val schema = StructType(Seq(StructField("id", IntegerType, false), StructField("name", StringType, false)))
val rdd = sparkSession.sparkContext.parallelize(Seq(Row(1, "John"),Row(2,"Bob")))
val dfWriter = sparkSession.createDataFrame(rdd, schema)
//
// 写入数据至es
//
dfWriter.write
.format("es")
.option("es.resource", resource)
.option("es.nodes", nodes)
.mode(SaveMode.Append)
.save()
//
// 从es读取数据
//
val dfReader = sparkSession.read.format("es").option("es.resource",resource).option("es.nodes", nodes).load()
dfReader.show()
// 创建一张关联es /index/type的Spark临时表,该表并不存放实际数据
val sparkSession = SparkSession.builder().getOrCreate()
sparkSession.sql("create table es_table(id int, name string) using es options(
'es.nodes' 'localhost:9200',
'es.resource' '/mytest/anytype')")
// 插入数据至es
sparkSession.sql("insert into es_table values(1, 'John'),(2, 'Bob')")
// 从es中读取数据
val dataFrame = sparkSession.sql("select * from es_table")
dataFrame.show()
-END-
-内容精选,欢迎品鉴-