vlambda博客
学习文章列表

读书笔记《elasticsearch-7-0-cookbook-fourth-edition》大数据集成

Big Data Integration

Elasticsearch 已成为大数据架构中的常用组件,因为它提供了以下几个特性:

  • It allows you to search on massive amounts of data in a very fast way
  • For common aggregation operations, it provides real-time analytics on big data
  • It's more easy to use an Elasticsearch aggregation than a Spark one
  • If you need to move on to a fast data solution, starting from a subset of documents after a query is faster than doing a full rescan of all your data

用于处理数据的最常见的大数据软件现在是 Apache Spark (http://spark.apache.org/),它被认为是过时的 Hadoop MapReduce 的演变,用于将处理从磁盘移动到内存。

在本章中,我们将看到如何将 Elasticsearch 集成到 Spark 中,用于写入和读取数据。最后,我们将看到如何使用 Apache Pig 以一种简单的方式在 Elasticsearch 中写入数据。

在本章中,我们将介绍以下食谱:

  • Installing Apache Spark
  • Indexing data using Apache Spark
  • Indexing data with meta using Apache Spark
  • Reading data with Apache Spark
  • Reading data using Spark SQL
  • Indexing data with Apache Pig

Installing Apache Spark

要使用 Apache Spark,我们需要安装它。这个过程非常简单,因为它的要求不是需要 Apache Zookeeper 和 Hadoop HDFS 的传统 Hadoop。

Apache Spark 能够在类似于 Elasticsearch 的独立节点安装中工作。

Getting ready

您需要安装 Java 虚拟机。通常使用 8.x 或更高版本。

How to do it...

要安装 Apache Spark,我们将执行以下步骤:

  1. Download a binary distribution from https://spark.apache.org/downloads.html. For a generic usage, I would suggest that you download a standard version using the following request:
wget https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
  1. Now, we can extract the Spark distribution using tar, as follows:
tar xfvz spark-2.4.0-bin-hadoop2.7.tgz
  1. Now, we can test if Apache Spark is working by executing a test, as follows:
2019-02-09 13:56:11 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
 2019-02-09 13:56:12 INFO SparkContext:54 - Running Spark version 2.4.0
 2019-02-09 13:56:12 INFO SparkContext:54 - Submitted application: Spark Pi
 ... truncated...
 2019-02-09 13:56:13 INFO DAGScheduler:54 - ResultStage 0 (reduce at SparkPi.scala:38) finished in 0.408 s
 2019-02-09 13:56:13 INFO DAGScheduler:54 - Job 0 finished: reduce at SparkPi.scala:38, took 0.445820 s
 Pi is roughly 3.139915699578498
 2019-02-09 13:56:13 INFO AbstractConnector:318 - Stopped Spark@788ba63e{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
 2019-02-09 13:56:13 INFO SparkUI:54 - Stopped Spark web UI at http://192.168.1.121:4040
 2019-02-09 13:56:13 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped!
 2019-02-09 13:56:13 INFO MemoryStore:54 - MemoryStore cleared
 ... truncated...
 2019-02-09 13:56:13 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/0h/fkvg8wz54d30g1_9b9k3_7zr0000gn/T/spark-fac6580a-7cb6-48ae-8739-3675365dbcd4
 2019-02-09 13:56:13 INFO ShutdownHookManager:54 - Deleting directory /private/var/folders/0h/fkvg8wz54d30g1_9b9k3_7zr0000gn/T/spark-8fc0bf8e-0d30-440e-9965-72b77ba22c1d

How it works...

Apache Spark 作为独立节点非常易于安装。与 Elasticsearch 类似,它只需要在系统中安装一个 Java 虚拟机。 安装过程非常简单——您只需解压存档,即可完成完整的工作安装。

在前面的步骤中,我们还测试了 Spark 安装是否正常工作。 Spark 是用 Scala 编写的,默认二进制文件针对版本 2.11.x。主要的 Scala 版本不兼容,因此您需要注意确保 Spark 和 Elasticsearch Hadoop 使用的是相同的版本。

执行 Spark 作业时,简化的步骤如下:

  1. The Spark environment is initialized
  2. Spark MemoryStore and BlockManager masters are initialized
  3. A SparkContext for the execution is initialized
  4. SparkUI is activated at http://0.0.0.0:4040
  5. The job is taken
  6. An execution graph, a Direct Acyclic Graph (DAG), is created for the job
  7. Every vertex in the DAG is a stage, and a stage is split into tasks that are executed in parallel
  8. After executing the stages and tasks, the processing ends
  1. The result is returned
  2. The SparkContext is stopped
  3. The Spark system is shut down

There's more...

Spark 最强大的工具之一就是外壳(Spark shell)。它允许您输入命令并直接在 Spark 集群上执行。 要访问 Spark shell,您需要使用  ./bin/spark-shell 调用它。

调用时,输出将如下所示:

2019-02-09 14:00:02 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Setting default log level to "WARN".
 To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
 Spark context Web UI available at http://192.168.1.121:4040
 Spark context available as 'sc' (master = local[*], app id = local-1549717207051).
 Spark session available as 'spark'.
 Welcome to
 ____ __
 / __/__ ___ _____/ /__
 _\ \/ _ \/ _ `/ __/ '_/
 /___/ .__/\_,_/_/ /_/\_\ version 2.4.0
 /_/

Using Scala version 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 11.0.2)
 Type in expressions to have them evaluated.
 Type :help for more information.

scala>

现在,可以插入要在集群中执行的命令行命令。

Indexing data using Apache Spark

现在我们已经安装了 Apache Spark,我们可以将其配置为与 Elasticsearch 一起工作并在其中写入一些数据。

Getting ready

您需要启动并运行 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch  中所述配方 第 1 章, 开始.

您还需要 Apache Spark 的有效安装。

How to do it...

要配置 Apache Spark 以与 Elasticsearch 通信,我们将执行以下步骤:

  1. We need to download the ElasticSearch Spark JAR, as follows:
wget -c https://artifacts.elastic.co/downloads/elasticsearch-hadoop/elasticsearch-hadoop-7.0.0-alpha2.zip
unzip elasticsearch-hadoop-7.0.0-alpha2.zip
  1. A quick way to access the Spark shell in Elasticsearch is to copy the Elasticsearch Hadoop file that's required in Spark's .jar directory. The file that must be copied is elasticsearch-spark-20_2.11-7.0.0.jar.

Apache Spark 和 Elasticsearch Spark 使用的 Scala 版本必须匹配!

要使用 Apache Spark 在 Elasticsearch 中存储数据,我们将执行以下步骤:

  1. Start the Spark shell by running the following command:
./bin/spark-shell
  1. Apply the Elasticsearch configuration, as follows:
val conf = sc.getConf
conf.setAppName("ESImport")
conf.set("es.index.auto.create", "true")
  1. We will import Elasticsearch Spark implicits, as follows:
import org.elasticsearch.spark._
  1. We will create two documents to be indexed, as follows:
val numbers = Map("one" -> 1, "two" -> 2, "three" -> 3)
val airports = Map("arrival" -> "Otopeni", "SFO" -> "San Fran")
  1. Now, we can create a Resilient Distributed Datasets (RDD) and save the document in Elasticsearch, as follows:
sc.makeRDD(Seq(numbers, airports)).saveToEs("spark/docs")

How it works...

通过 Spark 在 Elasticsearch 中存储文档非常简单。 在 shell 上下文中启动 Spark shell 后,可以使用一个包含 SparkContext 的sc 变量。如果我们需要将值传递给底层的 Elasticsearch 配置,我们需要在 Spark 配置中进行设置。

有几种配置可以设置;以下是最常用的:

  • es.index.auto.create: This is used to create indices if they do not exist
  • es.nodes: This is used to define a list of nodes to connect with (default localhost)
  • es.port: This is used to define the HTTP Elasticsearch port to connect with (default 9200)
  • es.ingest.pipeline: This is used to define an ingest pipeline to be used (default none)
  • es.mapping.id: This is used to define a field to extract the ID value (default none)
  • es.mapping.parent: This is used to define a field to extract the parent value (default none)

简单文档可以定义为 Map[String, AnyRef],并且可以通过 Resilient Distributed Dataset (RDD) 进行索引,这是一种特殊的 Spark集合上的抽象。

通过 org.elasticsearch.spark 中可用的隐式函数,RDD 有一个名为 saveToEs 的新方法,允许您定义用于索引的对索引或文档.

See also

您可以参考以下内容以获得与此配方相关的进一步参考:

Indexing data with meta using Apache Spark

使用简单的地图来摄取数据并不适合简单的工作。 Spark 中的最佳实践是使用 case 类,以便您进行快速序列化并可以管理复杂的类型检查。在索引期间,提供自定义 ID 会非常方便。在这个秘籍中,我们将看到如何解决这些问题。

Getting ready

您需要启动并运行 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch  中所述第 1 章中的配方,  开始.

您还需要 Apache Spark 的有效安装。

How to do it...

要使用 Apache Spark 在 Elasticsearch 中存储数据,我们将执行以下步骤:

  1. Start the Spark shell by running the following command:
./bin/spark-shell
  1. We will import the required classes, as follows:
import org.apache.spark.SparkContext
import org.elasticsearch.spark.rdd.EsSpark
  1. We will create a case class Personas follows:
case class Person(username:String, name:String, age:Int)
  1. We will create two documents that are to be indexed, as follows:
val persons = Seq(Person("bob", "Bob",19), Person("susan","Susan",21))
  1. Now, we can create a RDD, as follows:
val rdd=sc.makeRDD(persons)
  1. We can index them using EsSparkas follows:
EsSpark.saveToEs(rdd, "spark2/persons", Map("es.mapping.id" -> "username"))
  1. In Elasticsearch, the indexed data will be as follows:
{
  ... truncated ...
  "hits" : {
    "total" : {
      "value" : 2,
      "relation" : "eq"
    },
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "spark2",
        "_type" : "persons",
        "_id" : "bob",
        "_score" : 1.0,
        "_source" : {
          "username" : "bob",
          "name" : "Bob",
          "age" : 19
        }
      },
      {
        "_index" : "spark2",
        "_type" : "persons",
        "_id" : "susan",
        "_score" : 1.0,
        "_source" : {
          "username" : "susan",
          "name" : "Susan",
          "age" : 21
        }
      }
    ]
  }
}

How it works...

为了加快 Apache Spark 中的计算速度, case 类用于更好地描述我们在作业处理期间使用的域对象。它具有快速的序列化器和反序列化器,可以轻松地将 case 类转换为 JSON,反之亦然。通过使用 case 类,数据被强类型化和建模。

在前面的示例中,我们创建了一个设计标准人的 Person 类。 (嵌套的 case 类是自动管理的。) 现在我们已经实例化了一些 Person 对象,我们需要创建一个将保存在 Elasticsearch 中的 Spark RDD。

在此示例中,我们使用了一个名为  EsSpark 的特殊类,它提供了帮助器来传递用于索引的元数据。在我们的例子中,我们提供了有关如何使用  Map("es.mapping.id" -> "username") 从文档中提取 ID 的信息。

There's more...

通常,ID 不是对象的字段——它是在文档上计算的复杂值。在这种情况下,您可以设法创建一个带有要索引的元组(ID、文档)的 RDD。

对于以下示例,我们可以在 Person 上定义执行 ID 计算的函数:

import org.elasticsearch.spark._
case class Person(username:String, name:String, age:Int) {
  def id=this.username+this.age
}

然后,我们可以使用它来计算我们的新 RDD, 如下:

val persons = Seq(Person("bob", "Bob",19),Person("susan", "Susan",21))
val personIds=persons.map(p => p.id -> p)
val rdd=sc.makeRDD(personIds)

现在,我们可以对它们进行索引了, 如下:

rdd.saveToEsWithMeta("spark3/person_id")

在这种情况下,存储的文档将是如下

{
  ... truncated ...
    "hits" : [
      {
        "_index" : "spark3",
        "_type" : "person_id",
        "_id" : "susan21",
        "_score" : 1.0,
        "_source" : {
          "username" : "susan",
          "name" : "Susan",
          "age" : 21
        }
      },
      {
        "_index" : "spark3",
        "_type" : "person_id",
        "_id" : "bob19",
        "_score" : 1.0,
        "_source" : {
          "username" : "bob",
          "name" : "Bob",
          "age" : 19
        }
      }
    ]
  }
}

Reading data with Apache Spark

在 Spark 中,您可以从许多来源读取数据,但一般而言,使用 NoSQL 数据存储(如 HBase、Accumulo 和 Cassandra),您的查询子集有限,并且您经常需要扫描所有数据以仅读取所需的数据数据。使用 Elasticsearch,您可以检索与您的 Elasticsearch 查询匹配的文档子集。

Getting ready

您需要启动并运行 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch  中所述第 1 章中的配方,  开始.

您还需要有效安装 Apache Spark 以及我们在上一个示例中索引的数据。

How to do it...

要通过 Apache Spark 读取 Elasticsearch 中的数据,我们将执行以下步骤:

  1. Start the Spark Shell by running the following command:
./bin/spark-shell
  1. Import the required classes, as follows:
import org.elasticsearch.spark._
  1. Now, we can create a RDD by reading data from Elasticsearch, as follows:
val rdd=sc.esRDD("spark2/persons")
  1. We can watch the fetched values using the following command:
rdd.collect.foreach(println)
  1. The result will be as follows:
(bob,Map(username -> bob, name -> Bob, age -> 19))
(susan,Map(username -> susan, name -> Susan, age -> 21))

How it works...

Elastic 团队在允许使用简单的 API 从 Elasticsearch 读取数据方面做得很好。

您只需要使用  esRDD 方法导入扩展标准 RDD 的隐式,以允许从 Elasticsearch 检索数据。

esRDD 方法接受以下参数:

  • resource: This is generally an index or type tuple.
  • query: This is a query that is used to filter the results. It's in the query args format (an optional string).
  • config: This contains extra configurations to be provided to Elasticsearch  (an optional Map[String,String]).

返回值是 ID 和 Map 对象形式的元组集合。

Reading data using Spark SQL

Spark SQL 是用于结构化数据处理的 Spark 模块。它提供了一种称为 DataFrames 的编程抽象,也可以充当分布式 SQL 查询引擎。 Elasticsearch Spark 集成允许我们使用 SQL 查询读取数据。

Spark SQL 适用于结构化数据;换句话说,所有条目都应具有相同的结构(相同数量的字段,相同类型和名称)。不支持使用非结构化数据(具有不同结构的文档),这会导致问题。

Getting ready

您需要启动并运行 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch  中所述配方在 第 1 章,  ;开始.

您还需要 Apache Spark 的有效安装以及我们在本章的使用 Apache Spark 索引数据秘籍中索引的数据。

How to do it...

要使用 Apache Spark SQL 和 DataFrame 在 Elasticsearch 中读取数据,我们将执行以下步骤:

  1. Start the Spark shell by running the following command:
./bin/spark-shell
  1. We will create a DataFrame in the format org.elasticsearch.spark.sql and load data from spark3/person_idas follows:
val df = spark.read.format("org.elasticsearch.spark.sql").load("spark3/person_id")
  1. If we want to check the schema, we are able to inspect it using printSchemaas follows:
df.printSchema
root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 |-- username: string (nullable = true)
  1. We can watch fetched values as follows:
df.filter(df("age").gt(20)).collect.foreach(println)[21,Susan,susan]

要使用 Apache Spark SQL 通过 SQL 查询读取 Elasticsearch 中的数据,我们将执行以下步骤:

  1. Start the Spark shell by running the following command:
./bin/spark-shell
  1. We will create a view for reading data from spark3/person_idas follows:
spark.sql("CREATE TEMPORARY VIEW persons USING org.elasticsearch.spark.sql OPTIONS (resource 'spark3/person_id', scroll_size '2000')" )
  1. We can now execute a SQL query against the previously created view, as follows:
val over20 = spark.sql("SELECT * FROM persons WHERE age >= 20")
  1. We can watch fetched values as follows:
over20.collect.foreach(println)
[21,Susan,susan]

How it works...

Spark 中数据管理的核心是 DataFrame,它允许您从不同的数据存储中获取值。

您可以在 DataFrame 的顶部使用 SQL 查询功能,并且根据使用的驱动程序(在我们的例子中为 org.elasticsearch.spark.sql),可以在驱动程序级别下推查询( Elasticsearch 中的原生查询)。例如,在我们前面的示例中,查询被转换为布尔过滤器,其范围由 Elasticsearch 本地执行。

Elasticsearch Spark 驱动程序能够从映射中推断读取信息,并将数据存储作为标准 SQL 数据存储进行管理。 SQL 方法非常强大,允许您重用非常常见的 SQL 专业知识。

A good approach in using Elasticsearch with Spark is to use the Spark Notebooks: interactive web-based interfaces that speed up the testing phases of application prototypes. The most famous ones are Spark Notebook, available at  http://spark-notebook.io, and Apache Zeppelin, available at https://zeppelin.apache.org.

Indexing data with Apache Pig

Apache Pig (https://pig.apache.org/) 是一种经常用于在数据存储中存储和操作数据的工具。如果您需要在 Elasticsearch 中以非常快速的方式导入一些 逗号分隔值 (CSV),它会非常方便。

Getting ready

How to do it...

我们想读取一个 CSV 文件并将数据写入 Elasticsearch。为此,我们将执行以下步骤:

  1. We will download a CSV dataset from the GeoNames site to get all the GeoName locations of Great Britain. We can fast download them and unzip them as follows:
wget http://download.geonames.org/export/dump/GB.zip
unzip GB.zip
  1. We can write es.pig, which contains the Pig commands to be executed, as follows:
REGISTER /Users/alberto/elasticsearch/elasticsearch-hadoop-7.0.0/dist/elasticsearch-hadoop-pig-7.0.0.jar;


SET pig.noSplitCombination TRUE;


DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage();


-- launch the Map/Reduce job with 5 reducers

SET default_parallel 5;


--load the GB.txt file

geonames= LOAD 'GB.txt' using PigStorage('\t') AS
(geonameid:int,name:chararray,asciiname:chararray,
alternatenames:chararray,latitude:double,longitude:double,
feature_class:chararray,feature_code:chararray,
country_code:chararray,cc2:chararray,admin1_code:chararray,
admin2_code:chararray,admin3_code:chararray,
admin4_code:chararray,population:int,elevation:int,
dem:chararray,timezone:chararray,modification_date:chararray);


STORE geonames INTO 'geoname/gb' USING EsStorage();
  1. Now, execute the pig command, as follows:
pig -x local es.pig

输出将类似于以下内容:

2019-02-10 14:51:12,258 INFO [main] pig.ExecTypeProvider (ExecTypeProvider.java:selectExecType(41)) - Trying ExecType : LOCAL
2019-02-10 14:51:12,259 INFO [main] pig.ExecTypeProvider (ExecTypeProvider.java:selectExecType(43)) - Picked LOCAL as the ExecType
2019-02-10 14:51:12,283 [main] INFO org.apache.pig.Main - Apache Pig version 0.17.0 (r1797386) compiled Jun 02 2017, 15:41:58
... truncated ...
2019-02-10 14:51:13,693 [LocalJobRunner Map Task Executor #0] INFO org.apache.pig.builtin.PigStorage - Using PigTextInputFormat
2019-02-10 14:51:13,696 [LocalJobRunner Map Task Executor #0] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigRecordReader - Current split being processed file:/Users/alberto/Projects/elasticsearch-7.x-cookbook/ch17/GB.txt:0+7793280
... truncated ...
2019-02-10 14:51:18,586 [main] INFO org.apache.pig.tools.pigstats.mapreduce.SimplePigStats - Script Statistics:

HadoopVersion PigVersion UserId StartedAt FinishedAt Features
2.7.3 0.17.0 alberto 2019-02-10 14:51:13 2019-02-10 14:51:18 UNKNOWN

Success!

Job Stats (time in seconds):
JobId Maps Reduces MaxMapTime MinMapTime AvgMapTime MedianMapTime MaxReduceTime MinReduceTime AvgReduceTime MedianReducetime Alias Feature Outputs
job_local1289431064_0001 1 0 n/a n/a n/a n/a 0 0 0 0 geonames MAP_ONLY geoname/gb,

Input(s):
Successfully read 63131 records from: "file:///Users/alberto/Projects/elasticsearch-7.x-cookbook/ch17/GB.txt"

Output(s):
Successfully stored 63131 records in: "geoname/gb"

Counters:
Total records written : 63131
Total bytes written : 0
Spillable Memory Manager spill count : 0
Total bags proactively spilled: 0
Total records proactively spilled: 0

Job DAG:
job_local1289431064_0001


2019-02-10 14:51:18,587 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2019-02-10 14:51:18,588 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2019-02-10 14:51:18,588 [main] INFO org.apache.hadoop.metrics.jvm.JvmMetrics - Cannot initialize JVM Metrics with processName=JobTracker, sessionId= - already initialized
2019-02-10 14:51:18,591 [main] WARN org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Encountered Warning TOO_LARGE_FOR_INT 1 time(s).
2019-02-10 14:51:18,591 [main] INFO org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher - Success!
2019-02-10 14:51:18,604 [main] INFO org.apache.pig.Main - Pig script completed in 6 seconds and 565 milliseconds (6565 ms)

几秒钟后,所有 CSV 数据都会在 Elasticsearch 中建立索引。

How it works...

Apache Pig 是一个非常方便的工具。只需少量代码行,它就能够读取、转换和存储不同数据存储中的数据。 它有一个 shell,但编写一个包含所有要执行的命令的 Pig 脚本是很常见的。

要在 Apache Pig 中使用 Elasticsearch,您需要注册包含 EsStorage 的库。这是使用注册脚本完成的:JAR 位置取决于您的安装, 如下:

REGISTER /Users/alberto/elasticsearch/elasticsearch-hadoop-7.0.0/dist/elasticsearch-hadoop-pig-7.0.0.jar;

默认情况下,Pig 将数据拆分为块,然后将它们组合起来,然后再将数据发送到 Elasticsearch。要保持最大并行度,您需要使用 SET pig.noSplitCombination TRUE 禁用此行为。

为了防止输入 EsStorage 的完整路径,我们必须定义以下快捷方式

DEFINE EsStorage org.elasticsearch.hadoop.pig.EsStorage();

默认情况下,Pig 并行度设置为 1。如果我们想加快进程,我们需要增加这个值, 如下:

-- launch the Map/Reduce job with 5 reducers

SET default_parallel 5;

在 Pig 中读取 CSV 非常简单;我们定义了一个文件,PigStorage和字段分隔符,以及字段的格式, 如下:

--load the GB.txt file

geonames= LOAD 'GB.txt' using PigStorage('\t') AS
(geonameid:int,name:chararray,asciiname:chararray,
alternatenames:chararray,latitude:double,longitude:double,
feature_class:chararray,feature_code:chararray,
country_code:chararray,cc2:chararray,admin1_code:chararray,
admin2_code:chararray,admin3_code:chararray,
admin4_code:chararray,population:int,elevation:int,
dem:chararray,timezone:chararray,modification_date:chararray);

读取 CSV 文件后,这些行在 Elasticsearch 中被索引为对象, 如下:

STORE geonames INTO 'geoname/gb' USING EsStorage();
如您所见,使用 Pig 的所有复杂性在于管理输入和输出的格式。 Apache Pig 的主要优势是能够加载不同的数据集、连接它们并将它们存储在几行代码中。

Using Elasticsearch with Alpakka

Alpakka 项目(https://doc.akka.io/docs/alpakka/current/index. html) 是基于 Reactive Streams 和 Akka (https://akka.io/)。

响应式流基于组件——最重要的是 Source(用于从不同来源读取数据)和 Sink(用于写入数据)在存储中)。

Alpakka 支持许多数据存储的 Source 和 Sink,Elasticsearch 就是其中之一。

在这个秘籍中,我们将经历一个常见的场景——读取一个 CSV 文件并将其提取到 Elasticsearch 中。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 第 1 章入门

应全局安装支持 Scala 编程的 IDE,例如 IntelliJ IDEA,并带有 Scala 插件。

这个配方的代码可以在 ch17/alpakka 目录中找到,引用的类是 CSVToES.

How to do it...

我们将创建一个读取 CSV 并将记录写入 Elasticsearch 的简单管道。为此,我们将执行以下步骤:

  1. Add the alpakka dependencies to build.sbt:
"com.github.pathikrit" %% "better-files" % "3.7.1",
"com.lightbend.akka" %% "akka-stream-alpakka-csv" % "1.0.0",
"com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "1.0.0",
  1. Then, initialize the Akka system:
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
implicit val executor = actorSystem.dispatcher

  1. Now we can initialize the Elasticsearch client, in this case we use the default Rest client:
import org.elasticsearch.client.RestClient
implicit val client: RestClient = RestClient.builder(new HttpHost("0.0.0.0", 9200)).build()
  1. The data will be stored in the Iris class and we will also create a Scala implicit to allow encoding and decoding in JSON. All this stuff is done using a few lines of code:
final case class Iris(label: String, f1: Double, f2: Double, f3: Double, f4: Double)
import spray.json._
import DefaultJsonProtocol._
implicit val format: JsonFormat[Iris] = jsonFormat5(Iris)
  1. Before initializing the sink, we define some policies to manage back pressure and retry logic:
val sinkSettings =
   ElasticsearchWriteSettings()
     .withBufferSize(1000)
     .withVersionType("internal")
     .withRetryLogic(RetryAtFixedRate(maxRetries = 5, retryInterval = 1.second))
  1. Now we can create a pipeline, the source will be read by a CSV, for every line, we will create a Iris Index message and will ingest in an index iris-alpakka using the Elasticsearch Sink:
val graph =Source.single(ByteString(Resource.getAsString("com/packtpub/iris.csv")))
   .via(CsvParsing.lineScanner())
   .drop(1)
   .map(values => WriteMessage.createIndexMessage[Iris](
   Iris(
     values(4).utf8String,
     values.head.utf8String.toDouble,
   values(1).utf8String.toDouble, values(2).utf8String.toDouble,
   values(3).utf8String.toDouble)))
   .via(
     ElasticsearchFlow.create[Iris](
       "iris-alpakka",
       "_doc",
       settings = sinkSettings
     ))
   .runWith(Sink.ignore)
  1. The preceding pipeline is executed asynchronously, we need to wait for it to finish, to process the items and then we need to close all the used resources:
val finish=Await.result(graph, Duration.Inf)
 client.close()
 actorSystem.terminate()
 Await.result(actorSystem.whenTerminated, Duration.Inf)

How it works...

Alpakka 是构建现代 ETL(Extract、Transform、Load)最常用的工具之一,具有许多方便的功能:

  • Back pressure: If a data store is under high load, it automatically reduces the throughput.
  • Modular approach: A user can replace Source and Sink to read and write to other different data stores without massive code refactoring.
  • Numerous operators:A plethora of operators (map, flatMap, filter, groupBy, mapAsync, and so on) to build complex pipelines.
  • Low memory footprint: It doesn't load all the data in memory as Apache Spark, but it streams data from the Source to the Sink. It can be easily dockerized and deployed on a Kubernetes cluster for large scaling of your ETL.

Elasticsearch Source 和 Sink 使用 Elasticsearch 官方的 Rest 客户端。从前面的代码可以看出,只要一行代码就可以初始化:

implicit val client: RestClient = RestClient.builder(new HttpHost("0.0.0.0", 9200)).build()

客户端变量必须是 隐式 以便自动传递给 Elasticsearch Source 和 Sink 或 Flow 构造函数。

默认情况下,它支持使用 spray.json 使用 Scala 宏对 JSON 中的案例类进行序列化和反序列化 jsonFormatN < span>(其中 N字段数):

implicit val format: JsonFormat[Iris] = jsonFormat5(Iris)

同样在这种情况下,变量应该是 隐式 这样才能自动传递给需要的方法。

我们可以使用 ElasticsearchWriteSettings:自定义用于在Elasticsearch中写入的参数

ElasticsearchWriteSettings()
   .withBufferSize(1000)
   .withVersionType("internal")
   .withRetryLogic(RetryAtFixedRate(maxRetries = 5, retryInterval = 1.second))

这个类最常用的方法有:

  • withBufferSize(size:Int) : The number of items to be used for a single bulk.
  • withVersionType(vType:String): It sets the type of record versioning in Elasticsearch.
  • withRetryLogic(logic:RetryLogic): It sets the retry policies. RetryLogic is a Scala trait class that can be extended to provide different implementations. You can implement your RetryLogic policy extending the trait class; the following ones are already available be default:
    • RetryNever: To never retry.
    • RetryAtFixedRate(maxRetries: Int, retryInterval: scala.concurrent.duration.FiniteDuration): It allows maxRetries at a fixed interval (retryInterval).

Elasticsearch Sink 或 Flow 仅接受 WriteMessage[T,PT],其中 T< /span> 是消息的类型 PT 是可能的PassThrough类型(例如,用于传递 Kafka 偏移量并在 Elasticsearch 写入响应之后提交)。

WriteMessage 有帮助创建最常用的消息,即:

  • createIndexMessage[T](source: T): This is used to create an index action
  • createIndexMessage[T](id: String, source: T): This is used to create an index action with provided id
  • createCreateMessage[T](id: String, source: T): This is used to build a create action
  • createUpdateMessage[T](id: String, source: T): This is used to create an update action
  • createUpsertMessage[T](id: String, source: T): This is used to create an upsert action (it tries to update the document, if the document doesn't exist it create a new one)
  • createDeleteMessage[T](id: String): This is used to create a delete action

要创建这些消息,最常见的做法是使用 map 函数将值转换为所需 < span>WriteMessage type.

在创建了 WriteMessages之后,我们可以创建一个Sink来在Elasticsearch中写入记录.此 Sink 所需的参数是:

  • indexName:String the index to be used
  • typeName:String the mapping name (usually _doc in Elasticsearch 7.x). Probably it will be removed in future release of Alpakka Elasticsearch.
  • settings: ElasticsearchWriteSettings (optional) the setting parameters for write, that we have discussed previously.

在这个简短的介绍中,我们只触及了 Akka 或 Alpakka 的皮毛,但很容易理解这个系统在编排简单和复杂的摄取作业方面有多么强大。

See also

Using Elasticsearch with MongoDB

MongoDB (https://www.mongodb.com/) 是最受欢迎的记录数据存储之一,因为它的安装简单性和使用它的大型社区。

在许多架构中,使用 Elasticsearch 作为搜索或查询层,使用 MongoDB 作为更安全的数据阶段是很常见的。在这个秘籍中,我们将看到在 MongoDB 中使用 Alpakka 从 Elasticsearch 查询流中读取数据是多么简单。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在下载和安装 Elasticsearch 配方中所述的那样 第一章 , 开始.

应全局安装支持 Scala 编程的 IDE,例如 IntelliJ IDEA,并带有 Scala 插件。

运行示例需要本地安装 MongoDB。

这个配方的代码可以在 ch17/alpakka 目录中找到,引用的类是 ESToMongoDB。我们将读取上一个秘籍中创建的索引。

How to do it...

我们将创建一个读取 CSV 并将记录写入 Elasticsearch 的简单管道。为此,我们将执行以下步骤:

  1. We need to add the alpakka-mongodb dependencies to build.sbt:
"org.mongodb.scala" %% "mongo-scala-bson" % "2.4.2",
"com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "1.0.0",
  1. The first step is to initialize the Akka system:
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
implicit val executor = actorSystem.dispatcher
  1. Now we can initialize the Elasticsearch client, in this case we use the default RestClient:
import org.elasticsearch.client.RestClient
implicit val client: RestClient = RestClient.builder(new HttpHost("0.0.0.0", 9200)).build()

  1. The data will be stored in a Iris class—we also create a Scala implicit to allow encode and decode in JSON and a codec for mongoDB. All this stuff is done using a few lines of code:
final case class Iris(label: String, f1: Double, f2: Double, f3: Double, f4: Double)
 import spray.json._
 import DefaultJsonProtocol._
 implicit val format: JsonFormat[Iris] = jsonFormat5(Iris)

val codecRegistry = fromRegistries(fromProviders(classOf[Iris]), DEFAULT_CODEC_REGISTRY)
  1. Now we can create a irisCollection that will store our data in MongoDB:
private val mongo = MongoClients.create("mongodb://localhost:27017")
 private val db = mongo.getDatabase("es-to-mongo")
 val irisCollection = db
   .getCollection("iris", classOf[Iris])
   .withCodecRegistry(codecRegistry)
  1. Finally, we can create a pipeline, the source will be an ElasticsearchSource, and all the records will be ingested in MongoDB using its MongoSink:
val graph =
   ElasticsearchSource
     .typed[Iris](
     indexName = "iris-alpakka",
     typeName = "_doc",
     query = """{"match_all": {}}"""
   )
     .map(_.source) // we want only the source
     .grouped(100) // bulk insert of 100
     .runWith(MongoSink.insertMany[Iris](irisCollection))
  1. The preceding pipeline is executed asynchronously, an we need to wait for it finish to process the items and then close all the used resources:
val finish=Await.result(graph, Duration.Inf)
 client.close()
 actorSystem.terminate()
 Await.result(actorSystem.whenTerminated, Duration.Inf)

How it works...

在管道中使用 MongoDB Source 和 Sink 非常简单。 在前面的代码中,我们使用了一个 ElasticsearchSource,它给定一个索引或 typeName 和一个查询,能够生成一个类型化的项目流:

ElasticsearchSource
     .typed[Iris](
     indexName = "iris-alpakka",
     typeName = "_doc",
     query = """{"match_all": {}}"""
   )

返回的类型是一个 Source[ReadResult[T], NotUsed] where T 是我们的类型(即示例中的Iris)。

A ReadResult[T] 是一个包装对象,包含:

  • id:String: This is the Elasticsearch ID
  • source:T: This is the source part of the document converted to object T
  • version:Option[Long]: This is an optional version number

要在MongoDB中编写,我们需要创建一个连接,选择一个数据库,并获取一个集合:

private val mongo = MongoClients.create("mongodb://localhost:27017")
 private val db = mongo.getDatabase("es-to-mongo")
 val irisCollection = db
   .getCollection("iris", classOf[Iris])
   .withCodecRegistry(codecRegistry)

在这种情况下,我们定义了 irisCollection 的类型为 Iris,并且我们提供了一个编解码器来进行数据编组 (转换)。

codecRegistry 是使用 Scala 宏构建的:

import org.bson.codecs.configuration.CodecRegistries.{fromProviders, fromRegistries}
 import org.mongodb.scala.bson.codecs.DEFAULT_CODEC_REGISTRY
 import org.mongodb.scala.bson.codecs.Macros._
val codecRegistry = fromRegistries(fromProviders(classOf[Iris]), DEFAULT_CODEC_REGISTRY)

为了加快写入速度,我们选择在 MongoDB 中执行 100 个元素的批量写入,因此我们首先使用以下命令将流转换为一组 100 个元素:

.grouped(100) // bulk insert of 100

并使用 MongoSink 将结果写入集合中:

.runWith(MongoSink.insertMany[Iris](irisCollection))

如果是流式传输,我建议始终在 Elasticsearch 和 MongoDB 中批量写入。

在前面的示例中,我们已经了解了如何使用 Elasticsearch Source 和 Sink 以及 MongoDB Sink,并且很容易理解如何组合不同的 Source 或 Sink 来构建自己的管道。

See also