vlambda博客
学习文章列表

读书笔记《elasticsearch-7-0-cookbook-fourth-edition》Java集成

Java Integration

Elasticsearch 功能可以通过以下几种方式轻松集成到任何 Java 应用程序中: 通过 REST API 和通过原生 API。在 Java 中,很容易使用众多可用库之一调用 REST HTTP 接口,例如 Apache HttpComponents 客户端 (请参阅 http://hc.apache.org/ 了解更多信息)。在这个领域,没有最常用的库。通常,开发人员会选择最适合他们偏好的库或他们非常熟悉的库。从 Elasticsearch 6.x 开始,Elastic 提供了一个低/高级 HTTP 供客户端使用。在本章中,我们将主要将这些用于提供的所有示例。

每种 JVM 语言也可以使用本机协议将 Elasticsearch 与其应用程序集成;但是,我们不会对此进行介绍 因为 它已不再使用 从 Elasticsearch 7.x 开始。新应用程序应该依赖 HTTP。在本章中,我们将学习如何初始化不同的客户端以及如何执行我们在前几章中看到的命令。我们不会深入介绍每个调用,因为我们已经 描述了 REST API。 Elasticsearch 社区建议在集成它们时使用 REST API,因为它们在不同版本之间更稳定并且有详细的文档说明(原生 API 将在 Elasticsearch 8.x 中被删除)。

这些秘籍中提供的所有代码都可以在本书的代码库中获得, 并且可以使用 Maven 构建。

在本章中,我们将介绍以下食谱:

  • Creating a standard Java HTTP client
  • Creating a low-level Elasticsearch client
  • Creating a high-level Elasticsearch client
  • Managing indices
  • Managing mappings
  • Managing documents
  • Managing bulk actions
  • Building a query
  • Executing a standard search
  • Executing a search with aggregations
  • Executing a scroll search

Creating a standard Java HTTP client

HTTP 客户端是最容易创建的客户端之一。它非常方便,因为它不仅允许调用内部方法,就像本机协议那样,还允许调用第三方调用,这些调用只能在插件中实现 be  ;通过HTTP调用.

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 配方中所述  第 1 章开始.

要正确执行以下命令,您需要使用  ch04/populate_kibana.txt 命令填充的索引在线代码。

必须安装 Maven 工具或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

此配方的代码位于 chapter_13/http_java_client 目录中。

How to do it...

要创建 HTTP 客户端,我们将执行以下步骤:

  1. For these examples, we have chosen the Apache HttpComponents library, which is one of the most widely used libraries for executing HTTP calls. This library is available in the main Maven repository called search.maven.org.

要在您的 Maven pom.xml 项目中启用编译,只需添加以下代码:

<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.6</version>
</dependency>
  1. If we want to instantiate a client and fetch a document with a get method, the code will look like the following:
package com.packtpub;

import org.apache.http.HttpEntity;
import org.apache.http.HttpStatus;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.IOException;

public class App {

    private static String wsUrl = "http://127.0.0.1:9200";


    public static void main(String[] args) {
        CloseableHttpClient client = HttpClients.custom()
                .setRetryHandler(new MyRequestRetryHandler()).build();
        HttpGet method = new HttpGet(wsUrl + "/mybooks/_doc/1");
        // Execute the method.

        try {
            CloseableHttpResponse response = client.execute(method);

            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                System.err.println("Method failed: " + response.getStatusLine());
            } else {
                HttpEntity entity = response.getEntity();
                String responseBody = EntityUtils.toString(entity);
                System.out.println(responseBody);
            }

        } catch (IOException e) {
            System.err.println("Fatal transport error: " + e.getMessage());
            e.printStackTrace();
        } finally {
            // Release the connection.
            method.releaseConnection();
        }
    }
}

结果将如下:

{"_index":"mybooks","_type":"_doc","_id":"1","_version":1,"found":true,"_source":{"uuid":"11111","position":1,"title":"Joe Tester","description":"Joe Testere nice guy","date":"2015-10-22","price":4.3,"quantity":50}}

How it works...

我们执行了前面的步骤来创建和使用 HTTP 客户端。让我们更详细地看一下它们:

  1. The first step is to initialize the HTTP client object. In the previous code, this is done via the following code fragment:
CloseableHttpClient client = HttpClients.custom()
        .setRetryHandler(new MyRequestRetryHandler()).build();
  1. Before using the client, it is good practice to customize it. In general, the client can be modified to provide extra functionalities, such as retry support. Retry support is very important for designing robust applications; the IP network protocol is never 100% reliable, so it automatically retries an action if something goes bad (HTTP connection closed, server overhead, and so on).
  2. In the previous code, we defined an HttpRequestRetryHandler, which monitors the execution and repeats it three times before raising an error.
  3. Once we have set up the client, we can define the call method.
  1. In the previous example, we want to execute the GET REST call. The used method will be for HttpGet and the URL will be the item named index/type/id (similar to the CURL example in the Getting a document recipe in Chapter 3, Basic Operations). To initialize the method, use the following code:
HttpGet method = new HttpGet(wsUrl + "/mybooks/_doc/1");
  1. To improve the quality of our REST call, it's good practice to add extra controls to the method, such as authentication and custom headers.
  2. By default, the Elasticsearch server  doesn't require authentication, so we need to provide a security layer at the top of our architecture.
  3. A typical scenario is using your HTTP client with the search guard plugin (https://github.com/floragunncom/search-guard) or the shield plugin, which is part of X-Pack (https://www.elastic.co/products/x-pack), which allows the Elasticsearch REST to be extended with authentication and SSL. After one of these plugins is installed and configured on the server, the following code adds a host entry that allows the credentials to be provided only if context calls are targeting that host.
  4. The authentication is simply basicAuth, but works very well for noncomplex deployments, as you can see in the following code:
HttpHost targetHost = new HttpHost("localhost", 9200, "http");
CredentialsProvider credsProvider = new BasicCredentialsProvider();

credsProvider.setCredentials(
        new AuthScope(targetHost.getHostName(), targetHost.getPort()),
        new UsernamePasswordCredentials("username", "password"));
// Create AuthCache instance

AuthCache authCache = new BasicAuthCache();

// Generate BASIC scheme object and add it to local auth cache

BasicScheme basicAuth = new BasicScheme();
authCache.put(targetHost, basicAuth);

  1. The create context must be used in executing the call, as shown in the following code:
// Add AuthCache to the execution context
HttpClientContext context = HttpClientContext.create();
context.setCredentialsProvider(credsProvider);
  1. Custom headers allow us to pass extra information to the server to execute a call. Some examples could be API keys or hints about supported formats.
  2. A typical example is using gzip data compression over HTTP to reduce bandwidth usage. To do that, we can add a custom header to the call informing the server that our client accepts encoding. An example custom header can be made from the phrases Accept-Encoding and gzip, as shown in the following code:
request.addHeader("Accept-Encoding", "gzip");
  1. After configuring the call with all the parameters, we can fire up the request as follows:
response = client.execute(method, context);
  1. Every response object must be validated on its return status: if the call is OK, the return status should be 200. In the previous code, the check is done in the if statement, as follows:
if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK)
  1. If the call was OK and the status code of the response is 200, we can read the answer, as follows:
HttpEntity entity = response.getEntity();
String responseBody = EntityUtils.toString(entity);

响应包装在 HttpEntity 中,它是一个流。

HTTP 客户端库提供了一个名为 EntityUtils.toString 的辅助方法,它将HttpEntity 的所有内容作为字符串读取;否则,我们需要创建一些代码来读取字符串并构建字符串。

显然,调用的所有读取部分都包装在 try-catch 块中,以收集由网络错误创建的所有可能的错误。

See also

您可以参考以下与本配方相关的 URL 以获得进一步的参考:

Creating an HTTP Elasticsearch client

在 Elasticsearch 6.x 中,Elasticsearch 团队提供了一个自定义的低级 HTTP 客户端来与 Elasticsearch 通信。其主要特点如下:

  • Minimal dependencies
  • Load balancing across all available nodes
  • Failover in the case of node failures and upon specific response codes
  • Failed connection penalization (whether a failed node is retried depends on how many consecutive times it failed; the more failed attempts, the longer the client will wait before trying that same node again)
  • Persistent connections
  • Trace logging of requests and responses
  • Optional automatic discovery of cluster nodes

Getting ready

您需要一个正常运行的 Elasticsearch 安装,可以按照 the 下载和安装 Elasticsearch recipe 中的说明获取在第 1 章 开始.

要正确执行以下命令,您需要使用  ch04/populate_kibana.txt 命令填充的索引在线代码。

必须安装 Maven 工具或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

此配方的代码位于 ch13/http_es_client 目录中。

How to do it...

为了创建 RestClient,我们将执行以下步骤:

  1. For these examples, we need to add the Elasticsearch HTTP client library that's used to execute HTTP calls. This library is available in the main Maven repository at search.maven.org. To enable compilation in your Maven pom.xml project, just add the following code:
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-client</artifactId>
    <version>7.0.0-alpha2</version>
</dependency>
  1. If we want to instantiate a client and fetch a document with a get method, the code will look like the following:
package com.packtpub;

import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpStatus;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;

import java.io.IOException;

public class App {

    public static void main(String[] args) {
        RestClient client = RestClient.builder(
                new HttpHost("localhost", 9200, "http")).build();

        try {
            Request request=new Request("GET", "/mybooks/_doc/1");
            Response response = client.performRequest(request);

            if (response.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
                System.err.println("Method failed: " + response.getStatusLine());
            } else {
                HttpEntity entity = response.getEntity();
                String responseBody = EntityUtils.toString(entity);
                System.out.println(responseBody);
            }

        } catch (IOException e) {
            System.err.println("Fatal transport error: " + e.getMessage());
            e.printStackTrace();
        } finally {
            // Release the connection.
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

结果将如下:

{"_index":"mybooks","_type":"_doc","_id":"1","_version":1,"found":true,"_source":{"uuid":"11111","position":1,"title":"Joe Tester","description":"Joe Testere nice guy","date":"2015-10-22","price":4.3,"quantity":50}}

How it works...

在内部,Elasticsearch RestClient 使用 Apache HttpComponents 库并用更方便的方法包装它。

我们执行了前面的步骤来创建和使用 RestClient。让我们更详细地看一下它们:

  1. The first step is to initialize the RestClient object.
  2. In the previous code, this is done via the following code fragment:
RestClient client = RestClient.builder(
        new HttpHost("localhost", 9200, "http")).build();
  1. The builder method accepts a multivalue HttpHost (in this way, you can pass a list of HTTP addresses) and returns RestClientBuilder under the hood.
  2. The RestClientBuilder allows client communication to be customized by several methods, such as the following:
    • setDefaultHeaders(Header[] defaultHeaders): This allows the custom headers that must be sent for every request to be provided.
    • setMaxRetryTimeoutMillis(int maxRetryTimeoutMillis): This allows the max retry timeout to be defined if there are multiple attempts for the same request.
    • setPathPrefix(String pathPrefix): This allows a custom path prefix to be defined for every request.
    • setFailureListener(FailureListener failureListener): This allows a custom failure listener to be provided, which is called in an instance of node failure. This can be used to provide user-defined behavior in the case of node failure.
    • setHttpClientConfigCallback(RestClientBuilder.HttpClientConfigCallback httpClientConfigCallback): This allows the modification of the HTTP client communication, such as adding compression or an encryption layer.
    • setRequestConfigCallback(RestClientBuilder.RequestConfigCallback requestConfigCallback): This allows the configuration of request authentications, timeouts, and other properties that can be set at a request level.
  3. After creating the RestClient, we can execute some requests against it via the several kinds of performRequest for synchronous calls and performRequestAsync methods for asynchronous ones.
  1. These methods allow you to set parameters, such as the following:
  2. In the previous example, we executed the GET REST call with the following code:
Request request=new Request("GET", "/mybooks/_doc/1");
Response response = client.performRequest(request);
  1. The response object is an org.elasticsearch.client.Response that wraps the Apache HttpComponents response; for this reason, the code to manage the response is the same as it was in the previous recipe.
RestClient 是低级的;它对构建查询或操作没有帮助。目前,使用它包括构建请求的 JSON 字符串,然后解析 JSON 响应字符串。

See also

您可以参考以下与本配方相关的 URL 以获得进一步的参考:

Creating a high-level REST client

Java 高级 REST 客户端建立在低级客户端之上,并提供请求和响应的自动编组。

该客户端最初与 ElasticSearch 6.x 一起发布,它依赖于主要的 Elasticsearch 库来提供许多额外的功能,例如:

  • JSON support
  • Request/response marshaling/unmarshaling that provides stronger typed programming
  • Support for both synchronous and asynchronous calls

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 第一章,  开始.

要正确执行以下命令,您需要使用在线提供的 ch04/populate_kibana.txt 命令填充的索引代码。

必须安装 Maven 工具或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA

此配方的代码位于 ch13/high-level-client 目录中。

How to do it...

要创建本机客户端,我们将执行以下步骤:

  1. Before starting, we must be sure that Maven loads the Elasticsearch JAR by adding the following lines to the pom.xml:
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.0.0-alpha2</version>
</dependency>
我总是建议使用 Elasticsearch 的最新可用版本,或者在连接到特定集群的情况下,使用与集群正在使用的相同版本的 Elasticsearch。仅当客户端和服务器具有相同的 Elasticsearch 版本时,本机客户端才能正常工作。
  1. Now, there are two ways to create a client.

第一种是从传输协议中获取客户端,这是获取 Elasticsearch 客户端的最简单方法。我们可以使用以下代码来做到这一点:

import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;

// on startup
RestHighLevelClient client= new RestHighLevelClient(
        RestClient.builder(
                new HttpHost("localhost", 9200, "http"),
                new HttpHost("localhost", 9201, "http")));

// on shutdown
//we need to close the client to free resources high-level-client.close();

How it works...

让我们看看创建 RestHighLevelClient 更详细一点的步骤:

  1. In your Maven pom.xmlthe transport plugin must be defined as follows:
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.0.0-alpha1</version>
</dependency>
  1. We can create one or more instances of HttpHost that contain the addresses and ports of the nodes of our cluster, as follows:
HttpHost httpHost = new HttpHost("localhost", 9200, "http")
  1. A RestClientBuilder must be provided with all the required HttpHost elements, as follows:
RestClientBuilder restClient = RestClient.builder(httpHost);
  1. Now a RestHighLevelClient can be initialized with the RestClient, as shown in the following code:
RestHighLevelClient client = new RestHighLevelClient(restClient);
  1. At the end, before closing the application, we need to free the resource that is needed by the node; this can be done by calling the close() method on the client, as shown in the following code:
client.close();

See also

Managing indices

在上一个秘籍中,我们学习了如何初始化客户端以将调用发送到 Elasticsearch 集群。在这个秘籍中,我们将学习如何通过客户端调用来管理索引。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,我们描述了如何进入 下载和安装 Elasticsearch  ;第 1 章中的食谱 a>, 开始.

必须安装 Maven 工具或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

此配方的代码位于 ch13/high-level-client 目录中,引用的类是 IndicesOperations

How to do it...

一个Elasticsearch客户端映射客户端的 indices对象下的所有索引操作,如 createdeleteexists opencloseoptimize。以下步骤检索客户端并对索引执行主要操作:

  1. First, we import the required classes, as shown in the following code:
import org.elasticsearch.action.admin.indices.close.CloseIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
  1. Then, we define an IndicesOperations class that manages the index operations, as shown in the following code:
public class IndicesOperations {
    private final RestHighLevelClient client;

    public IndicesOperations(RestHighLevelClient client) {
        this.client = client;
    }
  1. Next, we define a function that is used to check whether the index is there, as shown in the following code:
public boolean checkIndexExists(String name) throws IOException {
    return client.indices().exists(new GetIndexRequest().indices(name), RequestOptions.DEFAULT);
}
  1. Then, we define a function that can be used to create an index, as shown in the following code:
public void createIndex(String name) throws IOException {
    client.indices().create(new CreateIndexRequest(name), RequestOptions.DEFAULT);
}
  1. We then define a function that can be used to delete an index, as follows:
public void deleteIndex(String name) throws IOException {
    client.indices().delete(new DeleteIndexRequest(name), RequestOptions.DEFAULT);
}
  1. Then, we define a function that can be used to close an index, as follows:
public void closeIndex(String name) throws IOException {
    client.indices().close(new CloseIndexRequest().indices(name), RequestOptions.DEFAULT);
}
  1. Next, we define a function that can be used to open an index, as follows:
public void openIndex(String name) throws IOException {
    client.indices().open(new OpenIndexRequest().indices(name), RequestOptions.DEFAULT);
}

  1. Then, we test all the previously defined functions, as follows:
public static void main(String[] args) throws InterruptedException, IOException {
    RestHighLevelClientHelper nativeClient = new RestHighLevelClientHelper();
    RestHighLevelClient client = nativeClient.getClient();
    IndicesOperations io = new IndicesOperations(client);
    String myIndex = "test";
    if (io.checkIndexExists(myIndex))
        io.deleteIndex(myIndex);
    io.createIndex(myIndex);
    Thread.sleep(1000);
    io.closeIndex(myIndex);
    io.openIndex(myIndex);
    io.deleteIndex(myIndex);

    //we need to close the client to free resources
    nativeClient.close();

}

How it works...

在执行每个索引操作之前,必须有一个客户端可用(我们在前面的秘籍中看到了如何创建一个)。

客户端有很多按功能分组的方法,如下表所示:

  • In the root client.*, we have record-related operations, such as index, deletion of records, search, and update
  • Under indices.*, we have index-related methods, such as create index, delete index, and so on
  • Under cluster.*, we have cluster-related methods, such as state and health

客户端方法通常遵循以下约定:

  • Methods that end with an Async postfix  (such as createAsync) require a build request and optional action listener.
  • Methods that do not end with Async require a request and some instances of RequestOption to execute the call in a synchronous way.

在前面的示例中,我们有几个索引调用,如下表所示:

  • The method call to check the existence if the is exists. It takes a GetIndexRequest element and returns a boolean , which contains information about whether the index exists, as shown in the following code:
return client.indices().exists(new GetIndexRequest().indices(name), RequestOptions.DEFAULT);
  • You can create an index with the create call, as follows:
client.indices().create(new CreateIndexRequest(name), RequestOptions.DEFAULT);
  • You can close an index with the close call, as follows:
client.indices().close(new CloseIndexRequest().indices(name), RequestOptions.DEFAULT);
  • You can open an index with the open call, as follows:
client.indices().open(new OpenIndexRequest().indices(name), RequestOptions.DEFAULT);
  • You can delete an index with the delete call, as follows:
client.indices().delete(new DeleteIndexRequest(name), RequestOptions.DEFAULT);

 

We have put a delay of 1 second ( Thread.wait(1000)) in the code to prevent fast actions on indices because their shard allocations are asynchronous, and they requires a few milliseconds to be ready. The best practice is to not use a similar hack, but to poll an index's state before you perform further operations, and to only perform those operations when it goes green.

See also

您可以参考以下与本配方相关的 URL 以获得进一步的参考:

  • The Creating an index recipe in Chapter 3, Basic Operations, for details on index creation
  • The Deleting an index recipe in Chapter 3, Basic Operations, for details on index deletion
  • The Opening/closing an index recipe in Chapter 3, Basic Operations, for the description of opening/closing index APIs

Managing mappings

Getting ready

您需要一个正常运行的 Elasticsearch 安装,我们描述了如何进入 the 下载和安装 Elasticsearch  第 1 章中的食谱< /a>, 开始.

必须安装 Maven 工具或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

此配方的代码在 ch13/high-level-client 目录中,引用的类是MappingOperations

How to do it...

在以下步骤中,我们通过本机客户端将 mytype 映射添加到 myindex 索引:

  1. Import the required classes using the following code:
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.net.UnknownHostException;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
  1. Define a class to contain our code and to initialize client and index, as follows:
public class MappingOperations {

    public static void main(String[] args) {
        String index = "mytest";
        String type = "mytype";
        RestHighLevelClient client = RestHighLevelClientHelper.createHighLevelClient();
        IndicesOperations io = new IndicesOperations(client);
        try {
            if (io.checkIndexExists(index))
                io.deleteIndex(index);
            io.createIndex(index);
  1. Prepare the JSON mapping to put in the index, as follows:
XContentBuilder builder = null;
try {
    builder = jsonBuilder().
            startObject().
            field("type1").
            startObject().
            field("properties").
            startObject().
            field("nested1").
            startObject().
            field("type").
            value("nested").
            endObject().
            endObject().
            endObject().
            endObject();
  1. Put the mapping in index, as follows:
    AcknowledgedResponse response = client.indices()
            .putMapping(new PutMappingRequest(index).type(type).source(builder), RequestOptions.DEFAULT);
    if (!response.isAcknowledged()) {
        System.out.println("Something strange happens");
    }
} catch (IOException e) {
    System.out.println("Unable to create mapping");
}
  1. We remove index, as follows:
io.deleteIndex(index);
  1. Now, we can close the client to free up resources, as follows:
} finally {

    //we need to close the client to free resources
    try {
        client.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

How it works...

在执行映射操作之前,客户端必须可用并且必须创建索引。

在前面的示例中,如果索引存在,则将其删除并重新创建一个新索引,因此我们确信我们是从头开始的。这可以在以下代码中看到:

RestHighLevelClient client = RestHighLevelClientHelper.createHighLevelClient();
IndicesOperations io = new IndicesOperations(client);
try {
    if (io.checkIndexExists(index))
        io.deleteIndex(index);
    io.createIndex(index);

现在,我们有一个新的 index 来放置我们创建它所需的映射。与 Elasticsearch 中的每个标准对象一样,映射是一个 JSON 对象。 Elasticsearch 提供了一种通过 XContentBuilder.jsonBuilder 以编程方式创建 JSON 的便捷方式。

要使用它,您需要将以下导入添加到您的 Java 文件中:

import org.elasticsearch.common.xcontent.XContentBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

XContentBuilder.jsonBuilder 方法允许以编程方式构建 JSON,因为它是 Elasticsearch 中 JSON 生成的瑞士军刀,因为它具有被链接的能力,并且它有很多方法。这些方法总是返回一个构建器,因此它们可以很容易地链接起来。最重要的如下:

  • startObject() and startObject(name): Here, name is the name of the JSON object. It starts the definition of a JSON object. The object must be closed with an endObject().
  • field(name) or field(name, value): The name must always be a string, and the value must be a valid value that can be converted to JSON. It's used to define a field in a JSON object.
  • value(value): The value in parentheses must be a valid value that can be converted into JSON. It defines a single value in a field.
  • startArray () and startArray(name): Here, name is the name of the JSON array. It starts the definition of a JSON array and must be ended with an endArray().

通常,在 Elasticsearch 中,每个接受 JSON 对象作为参数的方法也接受 JSON 构建器。

现在我们在构建器中有了映射,我们需要调用 Put 映射 API。此 API 在 client.indices() 命名空间中,您需要定义索引、类型和映射来执行此调用,如下所示:

AcknowledgedResponse response = client.indices()
        .putMapping(new PutMappingRequest(index).type(type).source(builder), RequestOptions.DEFAULT);

如果一切正常,您可以检查 response.isAcknowledged() 中的状态,该状态必须为 true(布尔值);否则,会引发错误。

如果您需要更新映射,则必须执行相同的调用,但是 您应该只将需要添加的字段 放入映射中。

There's more...

还有另一个用于管理映射的重要调用——Get mapping API。该调用类似于 delete,并返回一个 GetMappingResponse

GetMappingsResponse resp = client.indices().getMapping(new GetMappingsRequest().indices(index), RequestOptions.DEFAULT);

response 包含映射信息。返回的数据的结构与索引映射中的一样; 它 包含映射作为名称和MappingMetaData

 MappingMetaData 是一个对象,其中包含我们在 Chapter 3, 基本操作.

See also

您可以参考以下与本配方相关的 URL 以获得进一步的参考:

  • The Putting a mapping in an index recipe in Chapter 3, Basic Operations, for more details about the Put mapping API
  • The Getting a mapping recipe in Chapter 3, Basic Operations, for more details about the Get mapping API

Managing documents

用于管理文档的原生 API(indexdeleteupdate)是仅次于搜索 API 的最重要的 API。在这个食谱中,我们将学习如何使用它们。在下一个秘籍中,我们将继续进行批量操作以提高性能。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,我们描述了如何进入 下载和安装 Elasticsearch  ;第 1 章中的食谱 a>, 开始.

必须安装 Maven 工具或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

此配方的代码位于 ch13/high-level-client 目录中,引用的类是 DocumentOperations

How to do it...

为了管理文档,我们将执行以下步骤:

  1. We'll need to import the required classes to execute all the document CRUD operations via the high-level client, as follows:
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;

import java.io.IOException;
  1. The following code will create the client and remove the index that contains our data, if it exists:
public class DocumentOperations {

    public static void main(String[] args) {
        String index = "mytest";
        String type = "mytype";
        RestHighLevelClient client = RestHighLevelClientHelper.createHighLevelClient();
        IndicesOperations io = new IndicesOperations(client);
        try {
            if (io.checkIndexExists(index))
                io.deleteIndex(index);
  1. We will call the create index by providing the required mapping, as follows:
try {
    client.indices().create(
            new CreateIndexRequest()
                    .index(index)
                    .mapping(type, XContentFactory.jsonBuilder()
                            .startObject()
                            .startObject(type)
                            .startObject("properties")
                            .startObject("text").field("type", "text").field("store", "yes").endObject()
                            .endObject()
                            .endObject()
                            .endObject()),
            RequestOptions.DEFAULT
    );
} catch (IOException e) {
    System.out.println("Unable to create mapping");
}
  1. Now, we can store a document in Elasticsearch via the index call, as follows:
IndexResponse ir = client.index(new IndexRequest(index, type, "2").source("text", "unicorn"), RequestOptions.DEFAULT);
System.out.println("Version: " + ir.getVersion());
  1. Let's retrieve the stored document via the get call, as follows:
GetResponse gr = client.get(new GetRequest(index, type, "2"), RequestOptions.DEFAULT);
System.out.println("Version: " + gr.getVersion());
  1. We can update the stored document via the update call using a script in painless, as follows:
UpdateResponse ur = client.update(new UpdateRequest(index, type, "2").script(new Script("ctx._source.text = 'v2'")), RequestOptions.DEFAULT);
System.out.println("Version: " + ur.getVersion());
  1. We can delete the stored document via the delete call, as follows:
DeleteResponse dr = client.delete(new DeleteRequest(index, type, "2"), RequestOptions.DEFAULT);
  1. We can now free up the resources that were used, as follows:
    io.deleteIndex(index);
} catch (IOException e) {
    e.printStackTrace();
} finally {

    //we need to close the client to free resources
    try {
        client.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}
  1. The console output result will be as follows:
Version: 1
Version: 1
Version: 2
  1. The document version, after an update action and if the document is reindexed with new changes, is always incremented by 1.

How it works...

在执行文档操作之前,客户端和索引必须可用,并且应该创建文档映射(映射是可选的,因为它可以从索引文档中推断出来)。

为了通过本机客户端索引文档,创建了 index 方法。它需要索引和类型作为参数。如果提供了 ID,则将使用它;否则,将创建一个新的。

在前面的示例中,我们以键和值的形式放置源,但许多形式都可以作为源传递。它们如下:

  • A JSON string, such as {"field": "value"}
  • A string and a value (from one up to four couples), such as field1, value1, field2, or value2, field3, value3, field4, value4
  • A builder, such as jsonBuilder().startObject().field(field,value).endObject()
  • A byte array

显然,可以添加我们在第三章 基本操作等作为 parentrouting 等等。在前面的示例中,调用如下:

IndexResponse ir = client.index(new IndexRequest(index, type, "2").source("text", "unicorn"), RequestOptions.DEFAULT);

IndexResponse 返回值 可以通过以下方式使用:

  • To check whether the index was successful
  • To get the ID of the indexed document, if it was not provided during the index action
  • To retrieve the document version

要检索文档,您需要知道索引/类型/ID。客户端方法是 get。它需要通常的三元组(indextypeid),但是还有很多其他方法可以控制路由(例如< kbd>souring and  parent)或我们在 获取文档 recipe中看到的字段第三章, 基本操作。在前面的示例中,调用如下:

GetResponse gr = client.get(new GetRequest(index, type, "2"), RequestOptions.DEFAULT);

GetResponse 返回类型 包含所有请求(如果文档存在)和文档信息(sourceversionindex type 和 id)。

要更新文档,您需要知道索引/类型/ID 并提供用于更新的脚本或文档。客户端方法是 update

在前面的示例中,有以下内容:

UpdateResponse ur = client.update(new UpdateRequest(index, type, "2").script(new Script("ctx._source.text = 'v2'")), RequestOptions.DEFAULT);

脚本代码必须是字符串。如果未定义脚本语言,则使用默认的 painless 方法 。

返回的响应包含有关执行的信息和用于管理并发的新版本值。

要删除文档(无需执行查询),我们需要知道索引/类型/ID 三元组,我们可以使用 delete 客户端方法创建删除请求。在前面的代码中,我们使用了以下代码:

DeleteResponse dr = client.delete(new DeleteRequest(index, type, "2"), RequestOptions.DEFAULT);

删除请求允许将所有参数传递给它 我们在 删除文档 recipe 中的 第 3 章 基本操作 , 来控制路由和版本。

See also

在我们的秘籍中,我们对文档使用了所有 CRUD 操作。有关这些操作的更多详细信息,请参阅以下内容:

  • The Indexing a document recipe in Chapter 3, Basic Operations, for information on how to index a document
  • The Getting a document recipe in Chapter 3, Basic Operations, for information on how to retrieve a stored document
  • The Deleting a document recipe in Chapter 3, Basic Operations, for information on how to delete a document
  • The Updating a document recipe in Chapter 3, Basic Operations, for information on how to update a document

Managing bulk actions

如果您需要索引或删除数千/数百万条记录,则通过单个调用对项目执行自动操作通常会导致瓶颈。在这种情况下,最佳做法是执行批量操作。

我们已经在 第 3 章 基本操作

Getting ready

您需要一个正常运行的 Elasticsearch 安装,您可以使用 下载和安装 Elasticsearch recipe in < a href="https://cdp.packtpub.com/elasticsearch_7_0_cookbook/wp-admin/post.php?post=31&action=edit#post_24">第 1 章, 开始< /em>.

必须安装 Maven 工具或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

这个秘籍的代码在 ch13/high-level-client 目录中,引用的类是BulkOperations

How to do it...

要管理批量操作,我们将执行以下步骤:

  1. We'll need to import the required classes to execute bulk actions via the high-level client, as follows:
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;

import java.io.IOException;
  1. Next, we'll create the client, remove the old index if it exists, and create a new one, as follows:
public class BulkOperations {

    public static void main(String[] args) {
        String index = "mytest";
        String type = "mytype";
        RestHighLevelClient client = RestHighLevelClientHelper.createHighLevelClient();
        IndicesOperations io = new IndicesOperations(client);
        try {
            if (io.checkIndexExists(index))
                io.deleteIndex(index);
            try {
                client.indices().create(
                        new CreateIndexRequest()
                                .index(index)
                                .mapping(type, XContentFactory.jsonBuilder()
                                        .startObject()
                                        .startObject(type)
                                        .startObject("properties")
                                        .startObject("position").field("type", "integer").field("store", "yes").endObject()
                                        .endObject()
                                        .endObject()
                                        .endObject()),
                        RequestOptions.DEFAULT);
                ;
            } catch (IOException e) {
                System.out.println("Unable to create mapping");
            }
  1. Now, we can bulk index 1000 documents, adding the bulk index actions to the bulker, as follows:
BulkRequest bulker = new BulkRequest();
for (int i = 1; i < 1000; i++) {
    bulker.add(new IndexRequest(index, type, Integer.toString(i)).source("position", Integer.toString(i)));
}
System.out.println("Number of actions for index: " + bulker.numberOfActions());

client.bulk(bulker, RequestOptions.DEFAULT);
  1. We can bulk update the previously created 1000 documents via a script, adding the bulk update action to the bulker, as follows:
bulker = new BulkRequest();
for (int i = 1; i <= 1000; i++) {
    bulker.add(new UpdateRequest(index, type, Integer.toString(i)).script(new Script("ctx._source.position += 2")));
}
System.out.println("Number of actions for update: " + bulker.numberOfActions());
client.bulk(bulker, RequestOptions.DEFAULT);
  1. We can bulk delete 1000 documents, adding the bulk delete actions to the bulker, as follows:
bulker = new BulkRequest();
for (int i = 1; i <= 1000; i++) {
    bulker.add(new DeleteRequest(index, type, Integer.toString(i)));
}
System.out.println("Number of actions for delete: " + bulker.numberOfActions());
client.bulk(bulker, RequestOptions.DEFAULT);
  1. We can now free up the resources that were used, as follows:
    io.deleteIndex(index);
} catch (IOException e) {
    e.printStackTrace();
} finally {
    //we need to close the client to free resources
    try {
        client.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}
  1. The result will be as follows:
Number of actions for index: 1000
Number of actions for update: 1000
Number of actions for delete: 1000

How it works...

在执行这些批量操作之前,客户端必须可用并且必须创建索引。如你所愿,你也可以创建文档映射。

我们可以将  BulkRequest 视为以下不同操作的收集器:

  • IndexRequest
  • UpdateRequest
  • DeleteRequest
  • A bulk-formatted array of bytes

通常,在代码中使用时,我们可以将其视为一个 List 在其中添加支持类型的操作。让我们完成以下步骤:

  1. To initialize bulkBuilder, we use the following code:
BulkRequest bulker = new BulkRequest();
  1. In the previous example, we added 1,000 index actions, as follows:
for (int i = 1; i < 1000; i++) {
    bulker.add(new IndexRequest(index, type, Integer.toString(i)).source("position", Integer.toString(i)));
}
  1. After adding all the actions, we can print (for example) the number of actions and then execute them, as follows:
System.out.println("Number of actions for index: " + bulker.numberOfActions());
client.bulk(bulker, RequestOptions.DEFAULT);
  1. We have populated the bulk with 1,000 update actions, as follows:
bulker = new BulkRequest();
for (int i = 1; i <= 1000; i++) {
    bulker.add(new UpdateRequest(index, type, Integer.toString(i)).script(new Script("ctx._source.position += 2")));
}
  1. After adding all the update actions, we can execute them in bulk using bulker.execute().actionGet();, as follows:
System.out.println("Number of actions for update: " + bulker.numberOfActions());
client.bulk(bulker, RequestOptions.DEFAULT);
  1. Next, the same step is performed with the delete action, as follows:
bulker = new BulkRequest();
for (int i = 1; i <= 1000; i++) {
    bulker.add(new DeleteRequest(index, type, Integer.toString(i)));
}
  1. To commit the delete, we need to execute the bulk, as follows:
client.bulk(bulker, RequestOptions.DEFAULT);
在此示例中,为了简化它,我创建了具有相同类型操作的批量操作,但是,如前所述,您可以将任何受支持的操作类型放入同一个批量操作中。

Building a query

在搜索之前,必须建立一个查询。 Elasticsearch 提供了多种方式来构建这些查询。在这个秘籍中,我们将学习如何通过 QueryBuilder 和简单的字符串来创建查询对象。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,您可以按照 the 下载和安装 Elasticsearch 中的说明获取它 第 1 章中的食谱< /a>, 开始.

必须安装 Maven 工具或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

此配方的代码位于  ch13/high-level-client 目录中,引用的类是  QueryCreation

How to do it...

要创建查询,我们将执行以下步骤:

  1. We need to import the QueryBuilders using the following code:
import static org.elasticsearch.index.query.QueryBuilders.*;
  1. Next, we'll create a query using QueryBuilder, as follows:
TermQueryBuilder filter = termQuery("number2", 1);
RangeQueryBuilder range = rangeQuery("number1").gt(500);
BoolQueryBuilder query = boolQuery().must(range).filter(filter);
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(query);
SearchRequest searchRequest = new SearchRequest().indices(index).source(searchSourceBuilder);
  1. Now, we can execute a search, as follows (searching via a native API will be discussed in the following recipes):
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
System.out.println("Matched records of elements: " + response.getHits().getTotalHits());
  1. I've removed the redundant parts that are similar to the example of the previous recipe. The result will be as follows:
Matched records of elements: 250

How it works...

在 Elasticsearch 中有几种方法可以定义查询。

通常,查询可以定义如下:

  • QueryBuilder: A helper to build a query.
  • XContentBuilder: A helper to create JSON code. We discussed this in the Managing mapping recipe in this chapter. The JSON code to be generated is similar to the previous REST, but is converted into programmatic code.
  • Array of Bytes or String: In this case, it's usually the JSON to be executed, as we have seen in REST calls.
  • Map: This contains the query and the value of the query.

在前面的示例中,我们通过 QueryBuilders 创建了一个查询。第一步是从命名空间导入QueryBuilder,如下:

import static org.elasticsearch.index.query.QueryBuilders.*;

该示例的查询是一个以 termQuery 作为过滤器的布尔查询。该示例的目的是展示如何混合多种查询类型来创建一个复杂的查询。

我们需要定义一个过滤器,如下代码所示。在这种情况下,我们使用了术语查询,它是最常用的查询类型之一:

TermQueryBuilder filter = termQuery("number2", 1);

termQuery 接受一个字段和一个值,它必须是有效的 Elasticsearch 类型。

前面的代码类似于 JSON REST {"term": {"number2":1}

布尔查询包含一个带有 range 查询的 must 子句。我们可以开始创建 range 查询,如下所示:

RangeQueryBuilder range = rangeQuery("number1").gt(500);

这个range查询匹配 所有大于或等于(gte)500的值 在  number1 字段

创建 range 查询后,我们可以将其添加到 must 块中的布尔查询和 filter< 中的 filter 查询中/kbd> 块,如下:

BoolQueryBuilder query = boolQuery().must(range).filter(filter);

在现实世界的复杂查询中,您可以在布尔查询或过滤器中包含大量嵌套查询。

Before executing a query, the index must be refreshed  so that you don't miss any results.

在我们的示例中,这是使用以下代码完成的:

client.indices().refresh(new RefreshRequest(index), RequestOptions.DEFAULT);

There's more...

可能的本机查询/过滤器与 REST 查询/过滤器相同,并且具有相同的参数:唯一的区别是它们可以通过构建器方法访问。

最常见的查询构建器如下:

  • matchAllQuery: This allows all the documents to be matched
  • matchQuery and matchPhraseQuery: These are used to match against text strings
  • termQuery and termsQuery: These are used to match a term value(s) against a specific field
  • boolQuery: This is used to aggregate other queries with Boolean logic
  • idsQuery: This is used to match a list of IDs
  • fieldQuery: This is used to match a field with text
  • wildcardQuery: This is used to match terms with wildcards (*?.)
  • regexpQuery: This is used to match terms via a regular expression
  • Span query family (spanTermsQuery, spanTermQuery, spanORQuery, spanNotQuery, spanFirstQuery, and so on): These are a few examples of the span query family, which are used in building span queries
  • hasChildQuery, hasParentQuery, and nestedQuery: These are used to manage related documents

前面的列表并不详尽,因为它会在 Elasticsearch 的整个生命周期中不断发展。将添加新的查询类型以涵盖新的搜索案例,或者它们偶尔会被重命名,例如将文本查询更改为匹配查询。

Executing a standard search

在上一个秘籍中,我们学习了如何构建查询。在这个秘籍中,我们将执行一个查询来检索一些文档。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 the 下载和安装 Elasticsearch < /span>第 1 章中的食谱, 开始.

必须安装 Maven 工具或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

此配方的代码位于 ch13/high-level-client 目录中,引用的类是 QueryExample

How to do it...

要执行标准查询,我们将执行以下步骤:

  1. We need to import QueryBuilders to create the query, as follows:
import static org.elasticsearch.index.query.QueryBuilders.*;
  1. We can create an index and populate it with some data, as follows:
String index = "mytest";
String type = "mytype";
QueryHelper qh = new QueryHelper();
qh.populateData(index, type);
RestHighLevelClient client = qh.getClient();
  1. Now, we will build a query with the number1 field greater than or equal to 500 and filter it for number2 equal to 1, as follows:
QueryBuilder query = boolQuery().must(rangeQuery("number1").gte(500)).filter(termQuery("number2", 1));
  1. After creating a query, it is enough to execute it using the following code:
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(query).highlighter(new HighlightBuilder().field("name"));
SearchRequest searchRequest = new SearchRequest().indices(index).source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
  1. When we have SearchResponse, we need to check its status and iterate it on SearchHit, as follows:
if (response.status().getStatus() == 200) {
    System.out.println("Matched number of documents: " + response.getHits().getTotalHits());
    System.out.println("Maximum score: " + response.getHits().getMaxScore());

    for (SearchHit hit : response.getHits().getHits()) {
        System.out.println("hit: " + hit.getIndex() + ":" + hit.getType() + ":" + hit.getId());
    }
}
  1. The result should be similar to the following:
Number of actions for index: 999
Matched number of documents: 999
Maximum score: 1.0
hit: mytest:mytype:499
hit: mytest:mytype:501
hit: mytest:mytype:503
hit: mytest:mytype:505
hit: mytest:mytype:507
hit: mytest:mytype:509
hit: mytest:mytype:511
hit: mytest:mytype:513
hit: mytest:mytype:515
hit: mytest:mytype:517

How it works...

执行search的调用是prepareSearch,它返回一个SearchResponse,如下:

import org.elasticsearch.action.search.SearchResponse;

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(query).highlighter(new HighlightBuilder().field("name"));

SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);

 SearchSourceBuilder 有很多方法可以设置我们在执行搜索中已经看到的所有参数 recipe in 第 4 章探索搜索功能。最常用的如下:

  • indices: This allows the indices to be defined.
  • query: This allows the query that is to be executed to be set.
  • storedField/storedFields: These allow setting fields to be returned (used to reduce the bandwidth by returning only the needed fields).
  • aggregation: This allows us to compute any added aggregations.
  • highlighter: This allows us to return any added highlighting.
  • scriptField: This allows a scripted field to be returned. A scripted field is a field that is computed by server-side scripting using one of the available scripting languages. For example, it can be as follows:
Map<String, Object> params = MapBuilder.<String, Object>newMapBuilder().put("factor", 2.0).map();
.scriptField("sNum1", new Script("_doc.num1.value * factor", params))

执行搜索后,返回响应对象。

通过检查返回的状态和可选的命中数来检查搜索是否成功是一种很好的做法。如果搜索执行正确,返回状态将为 200,如下代码所示:

if (response.status().getStatus() == 200) {

响应对象包含我们在 第 4 章探索搜索功能 最重要的是包含我们结果的点击部分。本节主要的访问器方法如下:

  • totalHits: This allows the total number of results to be obtained, as shown in the following code:
System.out.println("Matched number of documents: " + response.getHits().getTotalHits());
  • maxScore: This gives the maximum score for the documents. It is the same score value of the first SearchHit, as shown in the following code:
System.out.println("Maximum score: " + response.getHits().getMaxScore());
  • hits: This is an array of SearchHit, which contains the results, if available.

SearchHit 是结果对象。它有很多方法,其中最重要的有以下几种:

  • getIndex(): This is the index that contains the document.
  • getId(): This is the ID of the document.
  • getScore(): This is the query score of the document, if available.
  • getVersion(): This is the version of the document, if available.
  • getSource(), getSourceAsString(), getSourceAsMap(), and so on: These return the source of the document in different forms, if available.
  • getExplanation(): If available (required in the search), this contains the query explanation.
  • getFields, getField(String name): These return the fields that were requested if they were passed fields to search for an object.
  • getSortValues(): This is the value/values that are used to sort this record. It's only available if sort is specified during the search phase.
  • getShard(): This is the shard of the search hit. This value is very important for custom routing.

在前面的示例中,我们只打印了每个命中的索引、类型和 ID,如以下代码所示:

for (SearchHit hit : response.getHits().getHits()) {
    System.out.println("hit: " + hit.getIndex() + ":" + hit.getType() + ":" + hit.getId());
}
The number of returned hits, if not defined, is limited to 10. To retrieve more hits, you need to define a larger value in the  size method or paginate using the  from method.

See also

Executing a search with aggregations

可以扩展之前的配方以支持聚合,以便检索索引数据的分析。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,您可以按照 the 下载和安装 Elasticsearch 中的说明获取它 第 1 章中的食谱< /a>, 开始.

必须安装 Maven 工具或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

此配方的代码在 ch13/high-level-client 目录中,引用的类是AggregationExample

How to do it...

要使用聚合执行搜索,我们将执行以下步骤:

  1. We need to import the necessary classes for the aggregations using the following code:
import org.elasticsearch.search.aggregations.AggregationBuilder;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.metrics.ExtendedStats;
import org.elasticsearch.search.aggregations.metrics.ExtendedStatsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.search.aggregations.AggregationBuilders.*;
  1. We can create an index and populate it with some data that we will use for the aggregations, as follows:
String index = "mytest";
String type = "mytype";
QueryHelper qh = new QueryHelper();
qh.populateData(index, type);
RestHighLevelClient client = qh.getClient();
  1. We then calculate two different aggregations (terms and extended statistics), as shown in the following code::
AggregationBuilder aggsBuilder = terms("tag").field("tag");
ExtendedStatsAggregationBuilder aggsBuilder2 = extendedStats("number1").field("number1");
  1. Now, we can execute a search, and pass the aggregations using the following code. We use size(0) because we don't need the hits:
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(matchAllQuery()).aggregation(aggsBuilder).
        aggregation(aggsBuilder2).size(0);
SearchRequest searchRequest = new SearchRequest().indices(index).source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);
  1. We need to check the response validity and wrap the aggregation results, as shown in the following code:
if (response.status().getStatus() == 200) {
    System.out.println("Matched number of documents: " + response.getHits().getTotalHits());
    Terms termsAggs = response.getAggregations().get("tag");
    System.out.println("Aggregation name: " + termsAggs.getName());
    System.out.println("Aggregation total: " + termsAggs.getBuckets().size());
    for (Terms.Bucket entry : termsAggs.getBuckets()) {
        System.out.println(" - " + entry.getKey() + " " + entry.getDocCount());
    }
    ExtendedStats extStats = response.getAggregations().get("number1");
    System.out.println("Aggregation name: " + extStats.getName());
    System.out.println("Count: " + extStats.getCount());
    System.out.println("Min: " + extStats.getMin());
    System.out.println("Max: " + extStats.getMax());
    System.out.println("Standard Deviation: " + extStats.getStdDeviation());
    System.out.println("Sum of Squares: " + extStats.getSumOfSquares());
    System.out.println("Variance: " + extStats.getVariance());
}

  1. The result should be as follows:
Matched number of documents: 1000
Aggregation name: tag
Aggregation total: 4
- bad 264
- amazing 246
- cool 245
- nice 245
Aggregation name: number1
Count: 1000
Min: 2.0
Max: 1001.0
Standard Deviation: 288.6749902572095
Sum of Squares: 3.348355E8
Variance: 83333.25

How it works...

搜索部分与前面的示例类似。在本例中,我们使用了一个匹配所有文档的 matchAllQuery

要执行聚合,首先需要创建它。有三种方法可以做到这一点:

  • Using a string that maps a JSON object
  • Using a XContentBuilder, which will be used to produce a JSON object
  • Using a AggregationBuilder

前两种方法是微不足道的;第三个需要导入builder,如下:

import static org.elasticsearch.search.aggregations.AggregationBuilders.*;

正如我们在 中已经看到的,有几种类型的聚合第 5 章文本和数字查询

第一个是我们用 AggregationBuilder 创建的,是一个Terms 聚合,它收集并统计桶中所有出现的terms,如图所示以下代码:

AggregationBuilder aggsBuilder = terms("tag").field("tag");

每个聚合所需的值是名称,它在构建器构造函数中传递。在 terms 聚合的情况下,需要该字段才能处理请求。还有很多其他参数;参见执行条款聚合 recipe "ch07">第 7 章聚合,了解详细信息。

我们创建的第二个 aggregationBuilder 是基于 number1 numeric 字段的扩展统计聚合,如下:

ExtendedStatsAggregationBuilder aggsBuilder2 = extendedStats("number1").field("number1");

现在我们已经创建了 aggregationBuilders,我们可以通过aggregation方法将它们添加到SearchSourceBuilder对象上,如下:

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(matchAllQuery()).aggregation(aggsBuilder).
        aggregation(aggsBuilder2).size(0);
SearchRequest searchRequest = new SearchRequest().indices(index).source(searchSourceBuilder);
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);

现在,响应包含有关我们聚合的信息。要访问它们,我们需要使用响应的 getAggregations 方法。

聚合的结果包含在类似哈希的结构中,您可以使用之前在请求中定义的名称来检索它们。

要检索第一个聚合结果,我们需要获取它们,如下所示:

Terms termsAggs = response.getAggregations().get("tag");

现在我们有了 Terms 类型的聚合结果(请参阅 Executing terms aggregations recipe < span>在 第 7 章聚合),我们可以获取聚合属性并在桶中迭代,如下:

System.out.println("Aggregation name: " + termsAggs.getName());
System.out.println("Aggregation total: " + termsAggs.getBuckets().size());
for (Terms.Bucket entry : termsAggs.getBuckets()) {
    System.out.println(" - " + entry.getKey() + " " + entry.getDocCount());
}

要获取第二个聚合结果,由于结果是ExtendedStats类型,需要强制转换为,如下代码所示:

ExtendedStats extStats = response.getAggregations().get("number1");

现在,您可以访问这种聚合的结果属性,如下所示:

System.out.println("Aggregation name: " + extStats.getName());
System.out.println("Count: " + extStats.getCount());
System.out.println("Min: " + extStats.getMin());
System.out.println("Max: " + extStats.getMax());
System.out.println("Standard Deviation: " + extStats.getStdDeviation());
System.out.println("Sum of Squares: " + extStats.getSumOfSquares());
System.out.println("Variance: " + extStats.getVariance());
在本机客户端中使用聚合非常简单,您只需注意返回的聚合类型即可执行正确的类型转换以访问您的结果。

See also

您可以参考以下与本配方相关的 URL 以获得进一步的参考:

  • The Executing terms aggregations recipe in Chapter 7Aggregations, which describes the terms aggregation in depth
  • The Executing statistical aggregations recipe in Chapter 7Aggregations for more details about statistical aggregations

Executing a scroll search

如果您将文档与不经常更改的文档进行匹配,则使用标准查询的分页效果非常好;否则,使用实时数据执行分页会返回不可预测的结果。为了绕过这个问题,Elasticsearch 在查询中提供了一个额外的参数:scroll

Getting ready

您需要一个正常运行的 Elasticsearch 安装,您可以按照 the 下载和安装 Elasticsearch 中的说明获取它 第 1 章中的食谱< /a>, 开始.

已安装 Maven 工具或本机支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

此配方的代码在 ch13/high-level-client 目录中,引用的类是 ScrollQueryExample

How to do it...

搜索已按照 执行标准搜索配方中的说明完成。主要区别在于使用 setScroll 超时,它允许将生成的 ID 存储在内存中,以便在定义的时间段内进行查询。这些步骤类似于用于标准搜索的步骤,您可以从以下步骤中看到:

  1. We import the TimeValue object to define time in a more human way, as follows:
import org.elasticsearch.common.unit.TimeValue;
  1. We execute the search by setting the scroll value. We can change the code of the Execute a standard search recipe to use scroll in the following way:
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(query).size(30);
SearchRequest searchRequest = new SearchRequest()
        .indices(index).source(searchSourceBuilder)
        .scroll(TimeValue.timeValueMinutes(2));
  1. To manage the scrolling, we need to create a loop until the results are returned, as follows:
SearchResponse response = client.search(searchRequest, RequestOptions.DEFAULT);

do {
    for (SearchHit hit : response.getHits().getHits()) {
        System.out.println("hit: " + hit.getIndex() + ":" + hit.getType() + ":" + hit.getId());
    }
    response = client.scroll(new SearchScrollRequest(response.getScrollId()).scroll(TimeValue.timeValueMinutes(2)), RequestOptions.DEFAULT);
} while (response.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.
  1. The loop will iterate on all the results until records are available. The output will be similar to the following:
hit: mytest:mytype:499
hit: mytest:mytype:531
hit: mytest:mytype:533
hit: mytest:mytype:535
hit: mytest:mytype:555
hit: mytest:mytype:559
hit: mytest:mytype:571
hit: mytest:mytype:575
...truncated...

How it works...

要使用滚动结果,只需向 SearchRequest 对象添加一个带有超时的 scroll 方法就足够了。

使用滚动时,必须牢记以下行为:

  • The timeout defines the period of time that an Elasticsearch server keeps the results for. If you ask for a scroll after the timeout, the server returns an error. The user must be careful with short timeouts.
  • The scroll consumes memory until it ends or a timeout is raised. Setting too large a timeout without consuming the data results in a big memory overhead. Using a large number of open scrollers consumes a lot of memory proportional to the number of IDs and their related data (score, order, and so on) in the results.
  • With scrolling, it's not possible to paginate the documents as there is no start. Scrolling is designed to fetch consecutive results.

标准的 SearchRequest 通过以下方式更改为滚动:

SearchRequest searchRequest = new SearchRequest()
        .indices(index).source(searchSourceBuilder)
        .scroll(TimeValue.timeValueMinutes(2));

响应包含与标准搜索相同的结果,加上一个滚动 ID,它是获取下一组结果所必需的。

要执行滚动,您需要使用滚动 ID 和新的超时调用 scroll 客户端方法。在这个例子中,我们正在处理所有的结果文档,如下代码所示:

do {
    // Process hits
    response = client.scroll(new SearchScrollRequest(response.getScrollId()).scroll(TimeValue.timeValueMinutes(2)), RequestOptions.DEFAULT);
} while (response.getHits().getHits().length != 0); // Zero hits mark the end of the scroll and the while loop.

要了解我们处于滚动的末尾,我们可以检查是否没有返回任何结果。

在很多场景中scroll 非常重要,但是在处理大数据解决方案时,当结果数量非常大时,很容易遇到超时。在这些场景中,重要的是要有良好的体系结构,在该体系结构中,您可以尽可能快地获取结果,并且不要在循环中迭代地处理结果,而是以分布式方式推迟操作结果。

在这种情况下,最好的解决方案是使用 Elasticsearch 的 search_after 功能,按 _uid 排序,如 使用 search_after 功能中所述;配方在第4章,< /span> 探索搜索功能

See also

您可以参考以下与本配方相关的 URL 以获得进一步的参考:

  • The Executing a scroll query recipe in Chapter 4, Exploring Search Capabilities
  • The Using search_after functionality recipe in Chapter 4, Exploring Search Capabilities

Integrating with DeepLearning4j

DeepLearning4J (DL4J) 是机器学习中最常用的开源库之一。它可以在 https://deeplearning4j.org/找到。

该库的最佳描述可在其网站上找到,该网站称——Deeplearning4j 是第一个为 Java 和 Scala 编写的商业级、开源、分布式深度学习库。 DL4J 与 Hadoop 和 Apache Spark 集成,将 AI 带入业务环境,以在分布式 GPU 和 CPU 上使用

在这个秘籍中,我们将看到如何使用 Elasticsearch 作为数据源在机器学习算法中进行训练。

Getting ready

您需要启动并运行 Elasticsearch 安装,正如我们在 第 1 章入门

必须安装 Maven 工具或原生支持 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。

此配方的代码位于 ch13/deeplearning4j 目录中。

How to do it...

我们将使用著名的 iris 数据集(https://en.wikipedia.org/wiki/Iris_flower_data_set ) 每个数据科学家都知道,它使用用于训练深度学习模型的数据创建索引。

要准备您的索引数据集,我们需要通过执行源代码中提供的 PopulatingIndex 类来填充它。 PopulatingIndex 类读取 iris.txt 文件并使用以下对象格式存储数据集的行:

Field Name Type Description
f1 float Feature 1 of the flower
f2 float Feature 2 of the flower
f3 float Feature 3 of the flower
f4 float Feature 4 of the flower
label int Label of the Flower (valid values 0,1,2)

要将 DL4J 用作模型的源数据输入,您将执行以下步骤:

  1. Add the various DL4J dependencies to pom.xml of the Maven project:
<dependency>
    <groupId>org.nd4j</groupId>
    <artifactId>nd4j-native-platform</artifactId>
    <version>${nd4j.version}</version>
</dependency>

<!-- ND4J backend. You need one in every DL4J project. Normally define artifactId as either "nd4j-native-platform" or "nd4j-cuda-9.2-platform" -->
<dependency>
    <groupId>org.nd4j</groupId>
    <artifactId>${nd4j.backend}</artifactId>
    <version>${nd4j.version}</version>
</dependency>

<!-- Core DL4J functionality -->
<dependency>
    <groupId>org.deeplearning4j</groupId>
    <artifactId>deeplearning4j-core</artifactId>
    <version>${dl4j.version}</version>
</dependency>
  1. Now we can write our ElasticSearchD4J class to train and test our model. As the first step, we need to initialize the Elasticsearch client:
HttpHost httpHost = new HttpHost("localhost", 9200, "http");
RestClientBuilder restClient = RestClient.builder(httpHost);
RestHighLevelClient client = new RestHighLevelClient(restClient);
String indexName="iris";
  1. After having the client, we can read our dataset. We will execute a query and collect the Hit results using a simple search:
SearchResponse searchResult=client.search(new SearchRequest(indexName).source(SearchSourceBuilder.searchSource().size(1000)), RequestOptions.DEFAULT);

SearchHit[] hits=searchResult.getHits().getHits();
  1. We need to convert the hits in a DL4J dataset. We will do this by creating intermediate arrays and populating them:
//Convert the iris data into 150x4 matrix
int row=150;
int col=4;
double[][] irisMatrix=new double[row][col];
//Now do the same for the label data
int colLabel=3;
double[][] labelMatrix=new double[row][colLabel];

for(int r=0; r<row; r++){
    // we populate features
    Map<String, Object> source=hits[r].getSourceAsMap();
    irisMatrix[r][0]=(double)source.get("f1");
    irisMatrix[r][1]=(double)source.get("f2");
    irisMatrix[r][2]=(double)source.get("f3");
    irisMatrix[r][3]=(double)source.get("f4");
    // we populate labels
    int label=(Integer) source.get("label");
    labelMatrix[r][0]=0.0;
    labelMatrix[r][1]=0.0;
    labelMatrix[r][2]=0.0;
    if(label == 0) labelMatrix[r][0]=1.0;
    if(label == 1) labelMatrix[r][1]=1.0;
    if(label == 2) labelMatrix[r][2]=1.0;
}
//Check the array by printing it in the log
//Convert the data matrices into training INDArrays
INDArray training = Nd4j.create(irisMatrix);
INDArray labels = Nd4j.create(labelMatrix);

DataSet allData = new DataSet(training, labels);
  1. Then, split the datasets into two—one for training and one for tests. After having them, we need to normalize the values. These actions can be done with the following code:
allData.shuffle();
SplitTestAndTrain testAndTrain = allData.splitTestAndTrain(0.65); //Use 65% of data for training

DataSet trainingData = testAndTrain.getTrain();
DataSet testData = testAndTrain.getTest();

//We need to normalize our data. We'll use NormalizeStandardize (which gives us mean 0, unit variance):
DataNormalization normalizer = new NormalizerStandardize();
normalizer.fit(trainingData); //Collect the statistics (mean/stdev) from the training data. This does not modify the input data
normalizer.transform(trainingData); //Apply normalization to the training data
normalizer.transform(testData); //Apply normalization to the test data. This is using statistics calculated from the *training* set
  1. Now we can design the model to be used for the training:
final int numInputs = 4;
int outputNum = 3;
long seed = 6;
MultiLayerConfiguration conf = new NeuralNetConfiguration.Builder()
        .seed(seed)
        .activation(Activation.TANH)
        .weightInit(WeightInit.XAVIER)
        .updater(new Sgd(0.1))
        .l2(1e-4)
        .list()
        .layer(0, new DenseLayer.Builder().nIn(numInputs).nOut(3)
                .build())
        .layer(1, new DenseLayer.Builder().nIn(3).nOut(3)
                .build())
        .layer(2, new OutputLayer.Builder(LossFunctions.LossFunction.NEGATIVELOGLIKELIHOOD)
                .activation(Activation.SOFTMAX)
                .nIn(3).nOut(outputNum).build())
        .backprop(true).pretrain(false)
        .build();
  1. After having defined the model, we can finally train it with our dataset—we use 1000 iterations for the training. The training code is as follows:
MultiLayerNetwork model = new MultiLayerNetwork(conf);
model.init();
model.setListeners(new ScoreIterationListener(100));

for(int i=0; i<1000; i++ ) {
    model.fit(trainingData);
}
  1. Now that we have a trained model, we need to evaluate its accuracy and we can do that using the test dataset:
Evaluation eval = new Evaluation(3);
INDArray output = model.output(testData.getFeatures());
eval.eval(testData.getLabels(), output);
log.info(eval.stats());

如果您执行模型训练,输出将类似于以下内容:

14:17:24.684 [main] INFO com.packtpub.ElasticSearchD4J - 

========================Evaluation Metrics========================
 # of classes: 3
 Accuracy: 0.9811
 Precision: 0.9778
 Recall: 0.9833
 F1 Score: 0.9800
Precision, recall & F1: macro-averaged (equally weighted avg. of 3 classes)


=========================Confusion Matrix=========================
  0 1 2
----------
 19 0 0 | 0 = 0
  0 19 1 | 1 = 1
  0 0 14 | 2 = 2

Confusion matrix format: Actual (rowClass) predicted as (columnClass) N times
==================================================================

How it works...

Eclipse Deeplearning4j 是为 Java 和 Java 虚拟机 (JVM)编写的深度学习编程库。它包括受限玻尔兹曼机的实现,深度信念网络、深度自动编码器、堆叠去噪自动编码器和递归神经张量网络,如 word2vec、doc2vec 和 GloVe。这些算法都包括与 Apache Hadoop 和 Spark 集成的分布式并行版本。

DL4J 能够同时使用 CPU 和 GPU 来快速处理深度学习工作负载 

在前面的示例中,我们在 Elasticsearch 中存储了数据集并获取它以构建 DL4J 数据集。使用 Elasticsearch 作为数据集存储非常方便,因为您可以使用 Elasticsearch 的强大功能来分析、清理和过滤数据,然后再将它们提供给机器学习算法。

数据集被打乱 (allData.shuffle();) 以减少对训练和测试数据集的偏差。在这种情况下,我们选择了一个三层深度学习模型,并使用 Elasticsearch 获取的数据对模型进行了训练,迭代训练 1000 次。结果是一个精度为 0.98 的神经网络模型。

这个例子非常简单,但它展示了使用 Elasticsearch 作为机器学习工作的数据源是多么容易。 DL4J 是一个很棒的库,可以在 Elasticsearch 之外使用,也可以嵌入到插件中以提供机器学习 Elasticsearch 的能力。

See also