vlambda博客
学习文章列表

读书笔记《elasticsearch-7-0-cookbook-fourth-edition》Scala集成

Scala Integration

Scala 正在成为大数据场景中最常用的语言之一。这种语言为管理数据提供了很多便利,例如不变性和函数式编程。

在 Scala 中,您可以简单地使用我们在上一章中看到的 Java 库,但它们不是可扩展的,因为它们不提供类型安全(因为这些库中的许多将 JSON 作为字符串)并且易于使用异步编程。

在本章中,我们将了解如何使用成熟的库 elastic4s 在 Scala 中使用 Elasticsearch。其主要特点如下:

  • Type-safe, concise DSL
  • Integrates with standard Scala futures
  • Uses the Scala collections library over Java collections
  • Returns option where the Java methods would return null
  • Uses Scala durations instead of strings/longs for time values
  • Uses typeclass for marshalling and unmarshalling classes to/from Elasticsearch documents, and is backed by Jackson, Circe, Json4s, and PlayJson implementations
  • Provides reactive-streams implementation
  • Provides embedded nodes and testkit sub-projects, which are ideal for your tests

在本章中,我们将主要看到有关标准 elastic4s DSL 使用的示例和一些帮助程序,例如 circe 扩展,以便在类中轻松编组/解组文档。

在本章中,我们将介绍以下食谱:

  • Creating a client in Scala
  • Managing indices
  • Managing mappings
  • Managing documents
  • Executing a standard search
  • Executing a search with aggregations

Creating a client in Scala

使用 elastic4s 的第一步是创建一个连接客户端来调用 ElasticSearch。与 Java 类似,连接客户端是本地的,可以是节点或传输节点。

与 Java 类似,连接客户端既可以是原生客户端,也可以是 HTTP 客户端。
在这个秘籍中,我们将初始化一个 HTTP 客户端,因为它可以放在代理/平衡器后面,以提高解决方案的高可用性.这是一个很好的做法。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 配方中所述第 1 章 开始时间>。

应全局安装支持 Scala 编程的 IDE,例如 IntelliJ IDEA,并带有 Scala 插件。

此配方的代码可以在 chapter_14/elastic4s_sample 目录中找到,参考文件是 ClientSample.scala

How to do it...

要创建 Elasticsearch 客户端和创建/搜索文档,我们将执行以下步骤:

  1. The first step is to add the elastic4s library to the build.sbt configuration via the following code:
libraryDependencies ++= {
  val elastic4sV = "7.0.0"
  val scalaTestV = "3.0.5"
  val Log4jVersion = "2.11.1"
  Seq(
    "com.sksamuel.elastic4s" %% "elastic4s-core" % elastic4sV,
    "com.sksamuel.elastic4s" %% "elastic4s-circe" % elastic4sV,
    // for the http client
    "com.sksamuel.elastic4s" %% "elastic4s-http" % elastic4sV,

    // if you want to use reactive streams
    "com.sksamuel.elastic4s" %% "elastic4s-http-streams" % elastic4sV,

    // testing
    "com.sksamuel.elastic4s" %% "elastic4s-testkit" % elastic4sV % "test",
    "com.sksamuel.elastic4s" %% "elastic4s-embedded" % elastic4sV % "test",
    
    "org.apache.logging.log4j" % "log4j-api" % Log4jVersion,
    "org.apache.logging.log4j" % "log4j-core" % Log4jVersion,
    "org.apache.logging.log4j" % "log4j-1.2-api" % Log4jVersion,
    "org.scalatest" %% "scalatest" % scalaTestV % "test"
  )
}

resolvers ++= Seq(
  Resolver.sonatypeRepo("releases"),
  Resolver.jcenterRepo
)
  1. To use the library, we need to import client classes and implicits:
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.http.{ElasticClient, ElasticProperties}
  1. Now, we can initialize the client, by providing an Elasticsearch URI:
object ClientSample extends App {
  val client = ElasticClient(ElasticProperties("http://127.0.0.1:9200"))
  1. To index a document, we execute indexInto with the document in the following way:
client.execute {
  indexInto("bands" / "artists") fields "name" -> "coldplay"
}.await

Thread.sleep(2000)  //to be sure that the record is indexed
  1. Now, we can search for the document we indexed earlier:
// now we can search for the document we indexed earlier
val resp = client.execute {
  search("bands") query "coldplay"
}.await
println(resp)

如果文档可用,结果将如下所示:

RichSearchResponse({"took":2,"timed_out":false,"_shards":{"total":5,"successful":5,"failed":0},"hits":{"total":1,"max_score":0.2876821,"hits":[{"_index":"bands","_type":"artists","_id":"AViBXXEWXe9IuvJzw-HT","_score":0.2876821,"_source":{"name":"coldplay"}}]}})

How it works...

Elastic4s 隐藏了初始化 Elasticsearch 客户端所需的大量样板。

定义与 Elasticsearch 的连接的更简单方法是通过 ElasticProperties, 这允许您提供以下内容:

  • Multiple server endpoints, separated by commas (that is, http(s)://host:port,host:port(/prefix)?querystring)
  • The other settings to be provided to the client via a Map[String,String] (that is, ?cluster.name=elasticsearch)

定义 ElasticProperties 后,您可以创建用于每个 Elasticsearch 调用的 ElasticClient

您可以通过多种方式初始化 ElasticClient。我们建议如下:

  • via ElasticProperties, which accepts a string similar to a JDBC connection. It is very handy because you can store it as a simple string in your application configuration file:
val client = ElasticClient(ElasticProperties("http://127.0.0.1:9200")
  • By providing a Rest client:
val restClient = RestClient.builder(
    new HttpHost("localhost", 9200, "http"),
    new HttpHost("localhost", 9201, "http")).build();
val client = ElasticClient.fromRestClient(restClient)

See also

Managing indices

现在我们有了一个客户端,我们需要做的第一件事就是创建一个自定义索引,并为其优化映射。 Elastic4s 提供了强大的 DSL 来执行这种操作。

在这个秘籍中, 我们将使用 域语法语言DSL)创建一个自定义映射,由elastic4s的作者开发。该语法是在 Elasticsearch JSON 上设计的,因此非常自然且易于使用。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 第一章,  开始

应全局安装支持 Scala 编程的 IDE,例如 IntelliJ IDEA,并带有 Scala 插件。

此配方的代码可以在 ch14/elastic4s_sample 目录中找到,参考文件是 IndicesExample

How to do it...

Elasticsearch 客户端将所有索引操作映射到客户端的 admin.indices 对象下。

在这里,您将找到所有索引操作(createdeleteexistsopenclose forceMerge 等)。

以下代码检索客户端并对索引执行主要操作:

  1. We need to import the required classes:
import com.sksamuel.elastic4s.http.ElasticDsl._
  1. We define an IndicesExample class that manages the index operations:
object IndicesExample extends App with ElasticSearchClientTrait{
  1. We check if the index exists. If true, we delete it:
  val indexName="test"
  if(client.execute{ indexExists(indexName)}.await.result.isExists){
    client.execute{ deleteIndex(indexName)}.await
  }
  1. We create an index, including a mapping:
client.execute{
  createIndex(indexName) shards 1 replicas 0 mappings (
    mapping("_doc") as (
      textField("name").termVector("with_positions_offsets").stored(true),
      keywordField("tag")
    )
    )
}.await

Thread.sleep(2000)
  1. We can optimize the index to reduce the number of segments:
client.execute(forceMerge(indexName)).await
  1. We close an index as follows:
client.execute(closeIndex(indexName)).await
  1. We open an index as follows:
client.execute(openIndex(indexName)).await
  1. We delete an index as follows:
client.execute(deleteIndex(indexName)).await
  1. We close the client to clean up the resources as follows:
client.close()

How it works...

使用 elastic4s 的 Elasticsearch Domain Script Language (DSL) 非常简单易用。它以一种更自然 易于使用的方式对标准 Elasticsearch 功能进行建模。它也是强类型的,因此可以防止常见错误,例如印刷错误或值类型更改。

为了简化这些示例中的代码,我们创建了一个特征,其中包含用于初始化 ElasticSearchClientTrait client 的代码。

elastic4s中所有的API调用都是异步的,所以返回 Future。为了实现结果,我们需要在调用末尾添加 .wait

在底层,elastic4s 使用 Java 标准 Elasticsearch 客户端,但将其包装在 DSL 中,以便方法和参数与标准 Elasticsearch 文档具有相同的含义。

In the code, we have put a delay of 1 second ( Thread.sleep(2000)) to prevent fast actions on indices, because their shard allocations are asynchronous and they require some milliseconds to be ready. The best practice is not to have a similar hack, but to poll an index's state before performing further operations, and only to  perform those operations when it goes green.

See also

第3章中,基本操作s, 参考 >创建索引配方了解有关索引创建的详细信息, 删除索引配方了解有关索引删除的详细信息,以及 打开/关闭索引 描述打开/关闭索引 API 的配方

Managing mappings

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 第 1 章,  ;开始

应全局安装支持 Scala 编程的 IDE,例如 IntelliJ IDEA,并带有 Scala 插件。

这个配方的代码可以在 ch14/elastic4s_sample 文件中找到,引用的类是 MappingExample

How to do it...

在以下代码中,我们通过本机客户端将 mytype 映射添加到 myindex 索引:

  1. We need to import the required classes:
package com.packtpub

import com.sksamuel.elastic4s.http.ElasticDsl._
  1. We define a class to contain our code and to initialize the client and the index:
object MappingExample extends App with ElasticSearchClientTrait {
  val indexName = "myindex"
  if (client.execute { indexExists(indexName) }.await.result.isExists) {
    client.execute { deleteIndex(indexName) }.await
  }
  1. We create the index by providing the _doc mapping:
client.execute {
  createIndex(indexName) shards 1 replicas 0 mappings (
    mapping("_doc") as (
      textField("name").termVector("with_positions_offsets").stored(true)
    )
  )
}.await
Thread.sleep(2000)
  1. We add another field in the mapping via a putMapping call:
client.execute {
  putMapping(indexName / "_doc").as(
    keywordField("tag")
  )
}.await
  1. We can now retrieve our mapping to test it:
val myMapping = client
  .execute {
    getMapping(indexName / "_doc")
  }
  .await
  .result
  1. From the mapping, we extract the tag field:
val tagMapping = myMapping.seq.head
println(tagMapping)
  1. We remove the index using the following command:
client.execute(deleteIndex(indexName)).await
  1. Now, we can close the client to free up resources:
//we need to close the client to free resources
client.close()

How it works...

在执行映射操作之前,客户端必须可用。

我们可以通过 createIndex 构建器中的 mappings 方法在索引创建期间包含映射:

createIndex(indexName) shards 1 replicas 0 mappings (
  mapping("_doc") as (
    textField("name").termVector("with_positions_offsets").stored(true)
  )
)
The elastic4s DSL provides a strong-typed definition for mapping fields.

如果我们忘记在映射中添加一个字段,或者如果在我们的应用程序生命周期中我们需要添加一个新字段,则可以使用新字段或新的完整类型映射调用 putMapping

putMapping(indexName / "_doc").as(
  keywordField("tag")
)

这样,如果类型存在,则更新;否则,它被创建。在管理控制台中,要检查我们的索引类型是否存储在映射中,我们需要从集群状态中检索它们。我们已经看到的方法是 getMapping 方法:

val myMapping = client
  .execute {
    getMapping(indexName / "_doc")
  }
  .await
  .result

返回的映射对象是 IndexMapping 元素的列表:

case class IndexMappings(index: String, mappings: Map[String, Map[String, Any]])

为了访问我们的映射,我们获取第一个结果:

val tagMapping = myMapping.seq.head
println(tagMapping)

See also

您可以参考以下与本配方相关的 URL 以获得进一步的参考:

  • The Putting a mapping in an index recipe in Chapter 3, Basic Operations, for more details about the Put Mapping API
  • The Getting a mapping recipe in Chapter 3, Basic Operations, for more details about the Get Mapping API

Managing documents

用于管理文档的 API(indexdeleteupdate)是仅次于搜索的 API。在这个秘籍中,我们将看看如何使用它们。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 第一章,  开始

应全局安装支持 Scala 编程的 IDE,例如 IntelliJ IDEA,并带有 Scala 插件。

此配方的代码可以在 ch14/elastic4s_sample 文件中找到,引用的类是 DocumentExample

How to do it...

要管理文档,我们将执行以下步骤:

  1. We'll need to import the required classes to execute all the document CRUD operations:
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.circe._
  1. We need to create the client and ensure that the index and mapping exists:
object DocumentExample extends App with ElasticSearchClientTrait {
  val indexName = "myindex"

  ensureIndexMapping(indexName)
  1. Now, we can store a document in Elasticsearch via the indexInto call:
client.execute {
  indexInto(indexName) id "0" fields (
    "name" -> "brown",
    "tag" -> List("nice", "simple")
  )
}.await
  1. We can retrieve the stored document via the get call:
val bwn = client.execute {
  get("0") from indexName
}.await

println(bwn.result.sourceAsString)
  1. We can update the stored document via the update call using a script in Painless:
client.execute {
  update("0").in(indexName).script("ctx._source.name = 'red'")
}.await
  1. We can check if our update was applied:
val red = client.execute {
  get("0") from indexName
}.await

println(red.result.sourceAsString)
  1. The console output result will be as follows:
{"name":"brown","tag":["nice","simple"]}
{"name":"red","tag":["nice","simple"]}
文档版本,在更新操作之后,如果文档用新的更改重新索引,总是递增 1

How it works...

在执行文档操作之前,客户端和索引必须可用,并且应该创建文档映射(映射是可选的,因为它可以从索引文档中推断出来)。

要索引文档,elastic4s 允许我们通过多种方式提供文档内容,例如通过以下方式:

  • fields:
    • A sequence of tuples (String, Any), as in the preceding example
    • Map[String, Any]
    • An Iterable[(String, Any)]
  • doc/source:
    • A string
    • A typeclass that derives Indexable[T]

显然,可以添加我们在 第 3 章基本操作,如父、路由等。

返回值 IndexReponse 是从 Java 调用返回的对象。

要检索文档,我们需要知道 index/id;方法是get。它需要 from 方法中提供的 idindex 。正如我们在 第 3 章, 基本操作 .在前面的示例中,调用如下:

val bwn = client.execute {
  get("0") from indexName
}.await

返回类型 GetResponse 包含所有请求(如果文档存在)和文档信息(sourceversionindex typeid)。

要更新文档,需要知道 index/id 并提供用于更新的脚本或文档。客户端方法是 update。在前面的示例中,我们使用了一个脚本:

client.execute {
  update("0").in(indexName).script("ctx._source.name = 'red'")
}.await

脚本代码必须是字符串。如果未定义脚本语言,则使用默认的 Painless。

返回的响应包含有关执行的信息和用于管理并发的新版本值。

要删除一个文档(不需要执行查询),我们必须知道 index/id,我们可以使用客户端方法,delete< /kbd>,创建一个 delete 请求。在前面的代码中,我们使用了以下代码:

client.execute {
  delete("0") from indexName
}.await

delete 请求允许我们在 Deleting a document 配方中看到的所有参数"ch03">第 3 章基本操作,它将控制路由和版本,传递给它。

There's more...

Scala 程序员喜欢类型类、案例类的自动编组/解组,以及强大的数据类型管理。为此,elastics4 提供了对通用 JSON 序列化库的额外支持,例如:

"com.sksamuel.elastic4s" %% "elastic4s-circe" % elastic4sV
"com.sksamuel.elastic4s" %% "elastic4s-jackson" % elastic4sV
  • Json4s (http://json4s.org/). To use this library, you need to add the following dependency:
"com.sksamuel.elastic4s" %% "elastic4s-json4s" % elastic4sV

例如,如果要使用 Circe,请执行以下步骤:

  1. You need to import the circe implicits:
import com.sksamuel.elastic4s.circe._
import io.circe.generic.auto._
import com.sksamuel.elastic4s.Indexable
  1. You need to define the case class, which needs to be deserialized:
case class Place(id: Int, name: String)
case class Cafe(name: String, place: Place)
  1. You need to force the implicit serializer:
implicitly[Indexable[Cafe]]
  1. Now, you can index the case classes directly:
val cafe = Cafe("nespresso", Place(20, "Milan"))

client.execute {
  indexInto(indexName).id(cafe.name).source(cafe)
}.await

See also

在前面的秘籍中,我们对文档使用了所有 CRUD 操作。有关这些操作的更多详细信息,请参阅:

  • The Indexing a document recipe in Chapter 3, Basic Operations
  • The Getting a document recipe in Chapter 3, Basic Operations, on retrieving a stored document
  • The Deleting a document recipe in Chapter 3, Basic Operations
  • The Updating a document recipe in Chapter 3, Basic Operations

Executing a standard search

显然,Elasticsearch 中最常见的操作是搜索。 Elastic4s 利用查询 DSL,它为 Scala 的查询带来了类型安全的定义。此功能最常见的优点之一是,随着 Elasticsearch 的发展,在通过 elastic4s, 的 Scala 代码中,您可能会弃用或编译可能会中断,需要您更新代码。

在这个秘籍中,我们将看到如何执行搜索、检索结果并将它们转换为类型化的域对象(类),而无需为我们的数据编写序列化器/反序列化器。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 第一章,  开始

应全局安装支持 Scala 编程的 IDE,例如 IntelliJ IDEA,并带有 Scala 插件。

此配方的代码可以在 ch14/elastic4s_sample 文件中找到,引用的类是 QueryExample

How to do it...

要执行标准查询,我们将执行以下步骤:

  1. We need to import the classes and implicits that are required to index and search the data:
import com.sksamuel.elastic4s.http.ElasticDsl._
import com.sksamuel.elastic4s.circe._
import com.sksamuel.elastic4s.Indexable
import io.circe.generic.auto._
  1. We will create an index and populate it with some data. We will use bulk calls for speedup:

object QueryExample extends App with ElasticSearchClientTrait {
  val indexName = "myindex"
  val typeName = "_doc"

  case class Place(id: Int, name: String)
  case class Cafe(name: String, place: Place)

  implicitly[Indexable[Cafe]]

  ensureIndexMapping(indexName, typeName)

  client.execute {
    bulk(
      indexInto(indexName / typeName)
        .id("0")
        .source(Cafe("nespresso", Place(20, "Milan"))),
      indexInto(indexName / typeName)
        .id("1")
        .source(Cafe("java", Place(60, "Rome"))),
      ... truncated...
      indexInto(indexName / typeName)
        .id("9")
        .source(Cafe("java", Place(89, "London")))
    )
  }.await

  Thread.sleep(2000)
  1. We can use a bool filter for search documents with the name equal to java and place.id greater than or equal to 80:
val resp = client.execute {
  search(indexName).bool(
    must(termQuery("name", "java"), rangeQuery("place.id").gte(80)))
}.await
  1. When we have the response parameter, we need to check its count and we can convert it back into a list of classes:
println(resp.result.size)

println(resp.result.to[Cafe].toList)
  1. The result should be similar to the following:
 List(Cafe(java,Place(80,Chicago)), Cafe(java,Place(89,London)))

How it works...

Elastic4s 查询 DSL 以一种更易于阅读的方式包装了 Elasticsearch。

search 方法允许我们定义一个复杂的查询 通过 DSL。结果是原始 Java 结果的包装,并提供了一些帮助程序以提高生产力。

Java 结果的常用方法在顶层可用,但它们也提供了两个有趣的方法:tosafeTo

他们能够通过范围内可用的隐式转换来转换案例类中的结果。对于 to[T] 方法,结果是 T 的迭代器(在前面的示例中,我们将转换回 List< 咖啡馆的/kbd>)。在 safeTo[T] 的情况下,结果是一个 Either[Throwable, T];通过这种方式,可以收集转换错误/异常。

Using the typeclass in Scala allows you to write a cleaner and easy-to-understand code, and also reduces errors due to string management in Elasticsearch.

See also

第 4 章中的执行搜索配方,探索搜索功能< /em>,包含有关执行查询的更详细信息

Executing a search with aggregations

在 Elasticsearch 中搜索后的下一步是执行聚合。 elastic4s DSL 还提供对聚合的支持,以便可以以更安全的类型化方式构建它。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 第一章,  开始

应全局安装支持 Scala 编程的 IDE,例如 IntelliJ IDEA,并带有 Scala 插件。

此配方的代码可以在 ch14/elastic4s_sample 文件中找到,引用的类是 AggregationExample

How to do it...

要使用聚合执行搜索,我们将执行以下步骤:

  1. We need to import the classes that are needed for the aggregations:
import com.sksamuel.elastic4s.http.ElasticDsl._
  1. We will create an index and populate it with some data that will be used for the aggregations:
val indexName = "myindex"
val typeName = "_doc"
ensureIndexMapping(indexName, typeName)
populateSampleData(indexName, typeName, 1000)
  1. We already know how to execute a search with aggregation using termsAggregation with several sub-aggregations (extended statistics, geocentroid):
val resp = client
  .execute {
    search(indexName) size 0 aggregations (termsAggregation("tag") field "tag" size 100 subAggregations (
      extendedStatsAggregation("price") field "price", extendedStatsAggregation(
        "size") field "size", geoBoundsAggregation("centroid") field "location"
    ))
  }
  .await
  .result
  1. The resp variable contains our query result. We can extract the aggregation results from it and show some values:
val tagsAgg = resp.aggregations.terms("tag")

println(s"Result Hits: ${resp.size}")
println(s"number of tags: ${tagsAgg.buckets.size}")
println(
  s"max price of first tag ${tagsAgg.buckets.head.key}: ${tagsAgg.buckets.head.extendedStats("price").max}")
println(
  s"min size of first tag ${tagsAgg.buckets.head.key}: ${tagsAgg.buckets.head.extendedStats("size").min}")
  1. Finally, we clean up the used resources:
client.execute(deleteIndex(indexName)).await

client.close()
  1. The result should look similar to the following:
number of tags: 5
 max price of first tag awesome: 10.799999999999999
 min size of first tag awesome: 0.0

How it works...

Elastic4s 为更多类型安全的聚合提供了强大的 DSL。

在前面的示例中,我们最初使用 termsAggregation  to 按标签设置聚合桶以收集至少 100桶(termsAggregation("tag") 大小 100)。那么我们有两种类型的子聚合:

  • extendedStatsAggregation: This is used to collect extended statistics on the price and size fields
  • geoBoundsAggregation: This is used to compute the center of documents results

elastic4s DSL 提供了所有官方的 Elasticsearch 聚合。

此外,聚合结果包含用于管理聚合的助手,例如某些类型的自动大小写。最常用的是:

  • StringTermsResult: This wraps a string terms aggregation result
  • TermsResult: This wraps a generic terms aggregation result
  • MissingResult: This wraps a missing aggregation result
  • CardinalityResult: This wraps a cardinality aggregation result
  • ExtendedStatsAggResult: This wraps an extended stats result
  • AvgResult: This wraps an average metric aggregation result
  • MaxResult: This wraps a max metric aggregation result
  • SumResult: This wraps a sum metric aggregation result
  • MinResult: This wraps a min metric aggregation result
  • HistogramResult: This wraps a histogram aggregation result
  • ValueCountResult: This wraps a count aggregation result

如果聚合结果不是这些聚合结果的一部分,则辅助方法 get[T]:T 允许您检索转换后的聚合结果。

See also

您可以参考以下与本配方相关的 URL 以获得进一步的参考:

  • The Executing term Aggregations recipe in Chapter 7, Aggregations, which describes term aggregations
  • The Executing statistical aggregations recipe in Chapter 7, Aggregations, for more details about statistical aggregations

Integrating with DeepLearning.scala

在上一章中,我们学习了在 Java 中使用 DeepLearning4j。这个库可以在 Scala 中本地使用,为我们的 Scala 应用程序提供深度学习功能。

在这个秘籍中,我们将学习使用 Elasticsearch 作为机器学习算法中的训练数据源。

Getting ready

您需要一个启动并运行的 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 配方中所述, 第 1 章入门

必须安装 Maven 或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

这个秘籍的代码在 ch14/deeplearningscala 目录中。

我们将使用 iris 数据集(https://en.wikipedia.org/wiki/Iris_flower_data_set )我们在 第 13 章Java 集成中使用过。要准备您的索引数据集 iris,我们需要执行以下源代码中提供的 PopulatingIndex 类来填充它第 13 章Java 集成

How to do it...

要将 DeepLearning4J 用作模型的源数据输入,您将执行以下步骤:

  1. We need to add the DeepLearning4J dependencies to build.sbt:
"org.nd4j" % "nd4j-native-platform" % nd4jVersion,
"org.nd4j" % "nd4j-native-platform" % nd4jVersion,
"org.deeplearning4j" % "deeplearning4j-core" % dl4jVersion,
// ParallelWrapper & ParallelInference live here
"org.deeplearning4j" % "deeplearning4j-parallel-wrapper"% dl4jVersion
  1. Now, we can write our DeepLearning4J class to train and test our model. Initialize the Elasticsearch client:
lazy val client: ElasticClient = {
  ElasticClient(ElasticProperties("http://127.0.0.1:9200"))
}
lazy val indexName = "iris"
  1. After having the client, we can read our dataset. We will execute a query and collect the Hit results using a simple search:
case class Iris(label: Int, f1: Double, f2: Double, f3: Double, f4: Double)
implicitly[Indexable[Iris]]
val response = client.execute {
  search(indexName).size(1000)
}.await
val hits = response.result.to[Iris].toArray 
  1. We need to convert the hits in a DeepLearning4J dataset. To do this, create intermediate arrays and populate them:
//Convert the iris data into 150x4 matrix
val irisMatrix: Array[Array[Double]] = hits.map(r => Array(r.f1, r.f2, r.f3, r.f4))
//Now do the same for the label data
val labelMatrix: Array[Array[Double]] = hits.map { r =>
r.label match {
    case 0 => Array(1.0, 0.0, 0.0)
    case 1 => Array(0.0, 1.0, 0.0)
    case 2 => Array(0.0, 0.0, 1.0)
}
}

val training = Nd4j.create(irisMatrix)
val labels = Nd4j.create(labelMatrix)
val allData = new DataSet(training, labels) 
  1. We need to split the datasets in two parts—one for training and one for tests. Then, we need to normalize the values. These actions can be done with the following code:
allData.shuffle()
val testAndTrain = allData.splitTestAndTrain(0.65) //Use 65% of data for training
val trainingData = testAndTrain.getTrain
val testData = testAndTrain.getTest
//We need to normalize our data. We'll use NormalizeStandardize (which gives us mean 0, unit variance):
val normalizer = new NormalizerStandardize
normalizer.fit(trainingData) //Collect the statistics (mean/stdev) from the training data. This does not modify the input data
normalizer.transform(trainingData) //Apply normalization to the training data
normalizer.transform(testData) //Apply normalization to the test data. This is using statistics calculated from the *training* set
  1. Now we can design the model to be used for the training:
val numInputs = 4
val outputNum = 3
val seed = 6

logger.info("Build model....")
val conf = new NeuralNetConfiguration.Builder()
  .seed(seed)
  .activation(Activation.TANH)
  .weightInit(WeightInit.XAVIER)
  .updater(new Sgd(0.1))
  .l2(1e-4)
  .list
  .layer(0, new DenseLayer.Builder().nIn(numInputs).nOut(3).build)
  .layer(1, new DenseLayer.Builder().nIn(3).nOut(3).build)
  .layer(2, new OutputLayer.Builder(LossFunctions.LossFunction.NEGATIVELOGLIKELIHOOD)
    .activation(Activation.SOFTMAX).nIn(3).nOut(outputNum).build)
  .backprop(true)
  .pretrain(false)
  .build
  1. After having defined the model, we can finally train it with our dataset—we use 1000 iterations for the training. The code is as follows:
//run the model
val model = new MultiLayerNetwork(conf)
model.init()
model.setListeners(new ScoreIterationListener(100))
0.to(1000).foreach{ _ => model.fit(trainingData)}
  1. Now that we have a trained model, we need to evaluate its accuracy and we can do that using the test dataset:
//evaluate the model on the test set
val eval = new Evaluation(3)
val output = model.output(testData.getFeatures)
eval.eval(testData.getLabels, output)
logger.info(eval.stats)

How it works...

Elasticsearch 可用作数据存储——使用紧凑的 Scala 代码可显着减少获取和使用数据集所需的代码量。 管理数据集的最佳方法是创建一个数据模型。在这种情况下,我们为此目的创建了 Iris 类:

case class Iris(label: Int, f1: Double, f2: Double, f3: Double, f4: Double)

我们使用 Circe (https://circe.github.io/circe/ ) 导出编码器和解码器Elasticsearch 在 Iris 对象数组中搜索命中:

 implicitly[Indexable[Iris]]
val response = client.execute {
  search(indexName).size(1000)
}.await
val hits = response.result.to[Iris].toArray 

使用这种方法,转换数据所需的代码减少了,我们可以处理字符串类型的对象来生成我们的深度学习模型。

创建数据集的最后一步是将 Iris 对象转换为要提供给算法的值数组。我们已经在我们的 Elasticsearch 点击中使用了一些功能 map

val irisMatrix: Array[Array[Double]] = hits.map(r => Array(r.f1, r.f2, r.f3, r.f4))

标签也是如此,但在这种情况下,我们必须根据标签值生成一个 3 维数组:

val labelMatrix: Array[Array[Double]] = hits.map { r =>
  r.label match {
    case 0 => Array(1.0, 0.0, 0.0)
    case 1 => Array(0.0, 1.0, 0.0)
    case 2 => Array(0.0, 0.0, 1.0)
  }
}

使用模型 虹膜 填充我们的数组,代码更简单易读。另一个优点是,它允许在未来 hits 替换为可流式结构,而无需大量代码重构。

构建数据集后,您可以设计模型、训练模型并评估您的命中质量——这种方法独立于您使用的机器学习库。

See also