vlambda博客
学习文章列表

读书笔记《elasticsearch-7-0-cookbook-fourth-edition》插件开发

Plugin Development

Elasticsearch 旨在通过插件进行扩展以提高其功能。在前面的章节中,我们安装并使用了其中的许多(新查询、REST 端点和脚本插件)。

插件是可以为 Elasticsearch 添加许多功能的应用程序扩展。它们可以有多种用途,例如:

  • Adding a new scripting language (that is, Python and JavaScript plugins)
  • Adding new aggregation types
  • Extending Lucene-supported analyzers and tokenizers
  • Using native scripting to speed up computation of scores, filters, and field manipulation
  • Extending node capabilities, for example, creating a node plugin that can execute your logic
  • Monitoring and administering clusters

在本章中,Java 语言将用于开发本机插件,但可以使用任何生成 JAR 文件的 JVM 语言。

构建和测试 Elasticsearch 组件的标准工具构建在在 Gradle 之上 (https://gradle.org/)。我们所有的自定义插件都将使用 Gradle 来构建它们。

在本章中,我们将介绍以下食谱:

  • Creating a plugin
  • Creating an analyzer plugin
  • Creating a REST plugin
  • Creating a cluster action
  • Creating an ingest plugin

Creating a plugin

本机插件允许扩展 Elasticsearch 服务器的多个方面,但它们需要良好的 Java 知识。

在这个秘籍中,我们将看到如何设置一个工作环境来开发原生插件。

Getting ready

您需要启动并运行 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch recipe in 第 1 章, 开始.

需要 Gradle 或支持使用 Gradle 进行 Java 编程的 集成开发环境 (IDE),例如 Eclipse 或 IntelliJ IDEA。

此配方的代码可在 ch16/simple_plugin 目录中找到。

How to do it...

通常,Elasticsearch 插件是使用 Gradle 构建工具在 Java 中开发的,并以 ZIP 文件的形式部署。

要创建一个简单的 JAR 插件,我们将执行以下步骤:

  1. To correctly build and serve a plugin, some files must be defined:
  • build.gradle and settings.gradle are used to define the build configuration for Gradle
  • LICENSE.txt defines the plugin license
  • NOTICE.txt is a copyright notice
  1. build.gradle file is used to create a plugin that contains the following code:
buildscript {
  repositories {
    mavenLocal()
    mavenCentral()
    jcenter()
  }

  dependencies {
    classpath "org.elasticsearch.gradle:build-tools:7.0.0-alpha2"
  }
}

group = 'org.elasticsearch.plugin.analysis'
version = '0.0.1-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'elasticsearch.esplugin'

// license of this project
licenseFile = rootProject.file('LICENSE.txt')
// copyright notices
noticeFile = rootProject.file('NOTICE.txt')
esplugin {
  name 'simple-plugin'
  description 'A simple plugin for ElasticSearch'
  classname 'org.elasticsearch.plugin.simple.SimplePlugin'
  // license of the plugin, may be different than the above license
  licenseFile rootProject.file('LICENSE.txt')
  // copyright notices, may be different than the above notice
  noticeFile rootProject.file('NOTICE.txt')
}

dependencies {
 compile 'org.elasticsearch:elasticsearch:7.0.0-alpha2'

 testCompile 'org.elasticsearch.test:framework:7.0.0-alpha2'
}

// Set to false to not use elasticsearch checkstyle rules
checkstyleMain.enabled = true
checkstyleTest.enabled = true

dependencyLicenses.enabled = false

thirdPartyAudit.enabled = false
  1. The  settings.gradle file is used for the project name, as follows:
rootProject.name = 'simple-plugin'
  1. The src/main/java/org/elasticsearch/plugin/simple/SimplePlugin.java class is an example of the basic (the minimum required) code that needs to be compiled for executing a plugin, as follows:
package org.elasticsearch.plugin.simple;


import org.elasticsearch.plugins.Plugin;

public class SimplePlugin extends Plugin {

}

How it works...

插件的开发生命周期由几个部分组成,例如设计、编码、构建和部署。为了加快所有插件通用的构建和部署步骤,我们需要创建一个  build.gradle 文件。

前面的 build.gradle 文件是开发Elasticsearch插件的标准。该文件由以下内容组成:

  • A buildscript section that depends on the Gradle building tools for Elasticsearch, as follows:
buildscript {
  repositories {
    mavenLocal()
    mavenCentral()
    jcenter()
  }

  dependencies {
    classpath "org.elasticsearch.gradle:build-tools:7.0.0-alpha2"
  }
}
  • The group and the version of the plugin, as follows:
group = 'org.elasticsearch.plugin'
version = '0.0.1-SNAPSHOT'
  • A list of Gradle plugins that must be activated, as follows:
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'elasticsearch.esplugin'
  • The license and notice file definition, as follows:
// license of this project
licenseFile = rootProject.file('LICENSE.txt')
// copyright notices
noticeFile = rootProject.file('NOTICE.txt')
  • The information that's needed to populate the plugin description: that is used to generate the plugin-descriptor.properties file that will be available in the final distribution ZIP. The most important parameter is the class name, which is the main entrypoint of the plugin: 
esplugin {
  name 'simple-plugin'
  description 'A simple plugin for ElasticSearch'
  classname 'org.elasticsearch.plugin.simple.SimplePlugin'
  // license of the plugin, may be different than the above license
  licenseFile rootProject.file('LICENSE.txt')
  // copyright notices, may be different than the above notice
  noticeFile rootProject.file('NOTICE.txt')
}
  • For compiling the code, the dependencies are required, as follows:
dependencies {
  compile 'org.elasticsearch:elasticsearch:7.0.0-alpha2'

  testCompile 'org.elasticsearch.test:framework:7.0.0-alpha2'
}

配置好 Gradle 之后,我们就可以开始编写主插件类了。

每个插件类都必须派生自一个 Plugin,并且必须是公共的,否则无法从JAR动态加载, 如下:

package org.elasticsearch.plugin.simple;

import org.elasticsearch.plugins.Plugin;

在定义了生成我们插件的 ZIP 版本所需的所有文件之后,调用 gradle clean check 命令就足够了。此命令将编译代码并在您的项目的 build/distributions/ 目录中创建一个 zip 包:最终的 ZIP 文件可以作为插件部署在您的弹性搜索集群。

在这个秘籍中,我们配置了一个工作环境来构建、部署和测试插件。在下面的秘籍中,我们将重用这个环境来开发几种插件类型。

There's more...

编译和打包插件不足以为你的插件定义一个好的生命周期:需要提供一个测试阶段来测试你的插件功能。

使用测试用例测试插件功能可以减少在插件发布时可能影响插件的错误数量。

顺序非常重要,因此请确保在您的依赖项中添加以下 行:

testCompile 'org.elasticsearch.test:framework:7.0.0-alpha2'
Elasticsearch for Gradle 的扩展拥有设置测试和集成测试的一切。

Creating an analyzer plugin

Elasticsearch 提供了大量的分析器和标记器来满足开箱即用的一般需求。有时,我们需要通过添加新的分析器来扩展 Elasticsearch 的功能。

通常,您可以在需要执行以下操作时创建分析器插件:

  • Add standard Lucene analyzers/tokenizers that are not provided by Elasticsearch
  • Integrate third-party analyzers
  • Add custom analyzers

在这个秘籍中,我们将添加一个新的自定义英语分析器,类似于 Elasticsearch 提供的分析器。

Getting ready

您需要启动并运行 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 第一章,  开始.

需要 Gradle 或支持使用 Gradle 进行 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。 此配方的代码可在 ch16/analysis_plugin 目录中找到.

How to do it...

一个分析器插件一般由以下两个类组成:

  • A Plugin class, which implements the org.elasticsearch.plugins.AnalysisPlugin class
  • An AnalyzerProviders class, which provides an analyzer

要创建分析器插件,我们将执行以下步骤:

  1. The plugin class is similar to the ones we've seen in previous recipes, but it includes a method that returns the analyzers, as follows:
package org.elasticsearch.plugin.analysis;

import org.apache.lucene.analysis.Analyzer;
import org.elasticsearch.index.analysis.AnalyzerProvider;
import org.elasticsearch.index.analysis.CustomEnglishAnalyzerProvider;
import org.elasticsearch.indices.analysis.AnalysisModule;
import org.elasticsearch.plugins.Plugin;

import java.util.HashMap;
import java.util.Map;

public class AnalysisPlugin extends Plugin implements org.elasticsearch.plugins.AnalysisPlugin {
    @Override
    public Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>> getAnalyzers() {
        Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>> analyzers = new HashMap<>();
        analyzers.put(CustomEnglishAnalyzerProvider.NAME, CustomEnglishAnalyzerProvider::getCustomEnglishAnalyzerProvider);
        return analyzers;
    }
}
  1. The AnalyzerProvider class provides the initialization of our analyzer, and passes the parameters that are provided by the settings, as follows:
package org.elasticsearch.index.analysis;

import org.apache.lucene.analysis.en.EnglishAnalyzer;
import org.apache.lucene.analysis.CharArraySet;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.env.Environment;
import org.elasticsearch.index.IndexSettings;

public class CustomEnglishAnalyzerProvider extends AbstractIndexAnalyzerProvider<EnglishAnalyzer> {
    public static String NAME = "custom_english";

    private final EnglishAnalyzer analyzer;

    public CustomEnglishAnalyzerProvider(IndexSettings indexSettings, Environment env, String name, Settings settings,
                                         boolean useSmart) {
        super(indexSettings, name, settings);

        analyzer = new EnglishAnalyzer(
                Analysis.parseStopWords(env, settings, EnglishAnalyzer.getDefaultStopSet(), true),
                Analysis.parseStemExclusion(settings, CharArraySet.EMPTY_SET));
    }

    public static CustomEnglishAnalyzerProvider getCustomEnglishAnalyzerProvider(IndexSettings indexSettings,
                                                                                 Environment env, String name,
                                                                                 Settings settings) {
        return new CustomEnglishAnalyzerProvider(indexSettings, env, name, settings, true);
    }

    @Override
    public EnglishAnalyzer get() {
        return this.analyzer;
    }
}

在构建插件并将其安装在 Elasticsearch 服务器上之后,我们的分析器可以作为任何本机 Elasticsearch 分析器访问。

How it works...

创建分析器插件非常简单。一般工作流程如下:

  • Wrap the analyzer initialization in a provider
  • Register the analyzer provider in the plugin

在前面的示例中,我们注册了一个 CustomEnglishAnalyzerProvider 类,它扩展了 EnglishAnalyzer 类:

public class CustomEnglishAnalyzerProvider extends AbstractIndexAnalyzerProvider<EnglishAnalyzer> {

我们需要为analyzer提供一个名字, 如下:

 

public static String NAME = "custom_english";

我们实例化一个私有作用域 Lucene 分析器(在 第 2 章管理映射中,我们已经讨论了自定义 Lucene 分析器的用法)r 可根据要求通过 get 方法提供, 如下:

private final EnglishAnalyzer analyzer;

CustomEnglishAnalyzerProvider 构造函数可以通过 Google Guice 注入,其设置可用于通过索引设置或 elasticsearch.yml 提供集群默认值, 如下:

public CustomEnglishAnalyzerProvider(IndexSettings indexSettings, Environment env, String name, Settings settings, boolean ignoreCase) {

为了让它正常工作 我们需要通过super调用来设置父构造函数, 如下:

super(indexSettings, name, settings);

现在,我们可以初始化内部分析器,它必须由get方法返回, 如下:

analyzer = new EnglishAnalyzer(
        Analysis.parseStopWords(env, settings, EnglishAnalyzer.getDefaultStopSet(), ignoreCase),
        Analysis.parseStemExclusion(settings, CharArraySet.EMPTY_SET));

该分析器接受以下内容:

  • A list of stopwords that can be loaded via the settings or set by the default ones
  • A list of words that must be excluded by the stemming step

为了轻松包装分析器,我们需要创建一个 static 方法,可以调用该方法来创建分析器。我们将在插件定义中使用它, 如下:

public static CustomEnglishAnalyzerProvider getCustomEnglishAnalyzerProvider(IndexSettings indexSettings, Environment env, String name, Settings settings) {
  return new CustomEnglishAnalyzerProvider(indexSettings, env, name, settings, true);
}

最后,我们可以在插件中注册我们的分析器。为此,我们的插件必须从 AnalysisPlugin 派生,以便我们可以覆盖 getAnalyzers 方法, 如下:

@Override
public Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>> getAnalyzers() {
    Map<String, AnalysisModule.AnalysisProvider<AnalyzerProvider<? extends Analyzer>>> analyzers = new HashMap<>();
    analyzers.put(CustomEnglishAnalyzerProvider.NAME, CustomEnglishAnalyzerProvider::getCustomEnglishAnalyzerProvider);
    return analyzers;
}

Java 8 的 :: 运算符允许我们提供一个函数,用于构造我们的 AnalyzerProvider

There's more...

一个插件扩展了几个 Elasticsearch 功能。为他们提供这个需要扩展正确的插件接口。在 Elasticsearch 7.x 中,主要插件接口如下:

  • ActionPlugin: This is used for REST and cluster actions
  • AnalysisPlugin: This is used for extending all the analysis stuff, such as analyzers, tokenizers, tokenFilters, and charFilters
  • ClusterPlugin: This is used to provide new deciders
  • DiscoveryPlugin: This is used to provide custom node name resolvers
  • EnginePlugin: This is used to provide new custom engine for indices
  • IndexStorePlugin: This is used to provide a custom index store
  • IngestPlugin: This is used to provide new ingest processors
  • MapperPlugin: This is used to provide new mappers and metadata mappers
  • ReloadablePlugin: This allows you to create plugins that reload their state
  • RepositoryPlugin: This allows the provision of new repositories to be used in backup/restore functionalities
  • ScriptPlugin: This allows the provision of new scripting languages, scripting contexts, or native scripts (Java based ones)
  • SearchPlugin: This allows extending all the search functionalities: Highlighter, aggregations, suggesters, and queries

如果您的插件需要扩展多个功能,它可以同时从多个插件接口扩展。

Creating a REST plugin

在上一个秘籍中,我们阅读了如何构建一个分析器插件来扩展 Elasticsearch 的查询功能。在这个秘籍中,我们将看到如何创建最常见的 Elasticsearch 插件之一。这种插件允许使用自定义调用扩展标准 REST 调用,从而轻松提高 Elasticsearch 的功能。

在这个秘籍中,我们将看到如何定义一个 REST 入口点并创建它的动作;在下一篇中,我们将看到如何在分片中执行这个动作。

Getting ready

您需要启动并运行 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch recipe in 第一章,  开始.

需要使用 Gradle 或支持使用 Gradle 进行 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。 此配方的代码可在 ch16/rest_plugin 目录中找到。

How to do it...

要创建 REST 入口点,我们需要创建操作,然后将其注册到插件中。我们将执行以下步骤:

  1. We create a REST simple action (RestSimpleAction.javaas follows:
public class RestSimpleAction extends BaseRestHandler {
    public RestSimpleAction(Settings settings, RestController controller) {
        super(settings);
        controller.registerHandler(POST, "/_simple", this);
        controller.registerHandler(POST, "/{index}/_simple", this);
        controller.registerHandler(POST, "/_simple/{field}", this);
        controller.registerHandler(GET, "/_simple", this);
        controller.registerHandler(GET, "/{index}/_simple", this);
        controller.registerHandler(GET, "/_simple/{field}", this);
    }

    @Override
    public String getName() {
        return "simple_rest";
    }


    @Override
    protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
        final SimpleRequest simpleRequest = new SimpleRequest(Strings.splitStringByCommaToArray(request.param("index")));
        simpleRequest.setField(request.param("field"));
        return channel -> client.execute(SimpleAction.INSTANCE, simpleRequest, new RestBuilderListener<SimpleResponse>(channel) {
            @Override
            public RestResponse buildResponse(SimpleResponse simpleResponse, XContentBuilder builder) throws Exception {
                try {
                    builder.startObject();
                    builder.field("ok", true);
                    builder.array("terms", simpleResponse.getSimple().toArray());
                    builder.endObject();

                } catch (Exception e) {
                    onFailure(e);
                }
                return new BytesRestResponse(OK, builder);
            }
        });
    }
}
  1. We need to register it in the plugin with the following lines:
public class RestPlugin extends Plugin implements ActionPlugin {

    @Override
    public List<RestHandler> getRestHandlers(Settings settings, RestController restController, ClusterSettings clusterSettings, IndexScopedSettings indexScopedSettings, SettingsFilter settingsFilter, IndexNameExpressionResolver indexNameExpressionResolver, Supplier<DiscoveryNodes> nodesInCluster) {
        return Arrays.asList(new RestSimpleAction(settings, restController));
    }
    @Override
    public List<ActionHandler<? extends ActionRequest, ? extends ActionResponse>> getActions() {
        return Arrays.asList(new ActionHandler<>(SimpleAction.INSTANCE, TransportSimpleAction.class));
    }
}
  1. Now, we can build the plugin via the gradle clean check and manually install the ZIP. If we restart the Elasticsearch server, we should see the plugin loaded as follows:
...truncated...[2019-02-05T21:15:35,250][WARN ][o.e.n.Node ] [iMacParo.local] version [7.0.0-alpha2] is a pre-release version of Elasticsearch and is not suitable for production
 [2019-02-05T21:15:36,306][INFO ][o.e.p.PluginsService ] [iMacParo.local] loaded module [aggs-matrix-stats]
 ... truncated...
 [2019-02-05T21:15:36,311][INFO ][o.e.p.PluginsService ] [iMacParo.local] loaded plugin [rest-plugin]
 [2019-02-05T21:15:38,736][INFO ][o.e.x.s.a.s.FileRolesStore] [iMacParo.local] parsed [0] roles from file [/Users/alberto/elasticsear
  1. We can test out custom REST via curl as follows:
curl -XPUT http://127.0.0.1:9200/mytest
curl -XPUT http://127.0.0.1:9200/mytest2
curl 'http://127.0.0.1:9200/_simple?field=mytest&pretty'
  1. The result will be something similar to the following:
{
 "ok" : true,
 "terms" : [
 "mytest_[mytest2][0]",
 "mytest_[mytest][0]"
 ]
 }

How it works...

添加 REST 动作非常简单:我们需要创建一个处理调用的 RestXXXAction 类。

REST 动作派生自 BaseRestHandler 类,需要实现 handleRequest 方法。

构造函数非常重要。因此,让我们从编写以下内容开始:

public RestSimpleAction(Settings settings, RestController controller) {

公共构造函数采用以下参数:

  • Settings: This can be used to load custom settings for your REST action
  • RestController: This is used to register the REST action to the controller

在 REST 动作的构造函数中,必须处理的动作列表在 RestController 中注册如下:

super(settings);
controller.registerHandler(POST, "/_simple", this);

要注册一个动作,必须将以下参数传递给控制器​​:

  • The REST method (GET/POST/PUT/DELETE/HEAD/OPTIONS)
  • The URL entrypoint
  • The RestHandler, usually the same class, which must answer the call

在定义了构造函数之后,如果触发了一个动作,则调用类方法prepareRequest 如下:

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) {

此方法是 REST 操作的核心。它处理请求并发回结果。以下参数被传递给该方法:

  • RestRequest: This is the REST request that hits the Elasticsearch server
  • RestChannel: This is the channel used to send back the response
  • NodeClient: This is the client used to communicate in the cluster

返回值是一个 RestChannelConsumer,它是一个接受 RestChannelFunctionalInterface——它是一个简单的 Lambda。

prepareRequest 方法通常由以下阶段组成:

  • Process the REST request and build an inner Elasticsearch request object
  • Call the client with the Elasticsearch request
  • If it is okay, process the Elasticsearch response and build the resulting JSON
  • If there are errors, send back the JSON error response

以下 示例中,我们创建了一个SimpleRequest处理请求

final SimpleRequest simpleRequest = new SimpleRequest(Strings.splitStringByCommaToArray(request.param("index")));
simpleRequest.setField(request.param("field"));

如您所见,它接受一个索引列表(我们通过 Strings.splitStringByCommaToArray 帮助器拆分经典的逗号分隔的索引列表)并且我们有 field 参数 if可用的。

现在我们有了一个 SimpleRequest,我们可以将它发送到集群并通过 Lambda 闭包返回 SimpleResponse 如下:

return channel -> client.execute(SimpleAction.INSTANCE, simpleRequest, new RestBuilderListener<SimpleResponse>(channel) {

client.execute 接受一个动作、一个请求和一个映射未来响应的 RestBuilderListener 类。我们现在可以通过 onResponse 方法的定义来处理响应。

onResponse 接收一个 Response 对象,该对象必须转换成 JSON 结果 如下:

@Override
public RestResponse buildResponse(SimpleResponse simpleResponse, XContentBuilder builder) {

builder 是我们已经在 Chapter 13< 中看到的标准 JSON XContentBuilder em>,Java 集成,

处理完集群响应并构建 JSON 后,我们可以发送 REST 响应 如下:

@Override
public RestResponse buildResponse(SimpleResponse simpleResponse, XContentBuilder builder) {

显然,如果在 JSON 创建过程中出现问题,则必须引发异常,如下所示:

try {
    //JSON creation
} catch (Exception e) {
    onFailure(e);
}

我们将在下一个秘籍中讨论 SimpleRequest

See also

Creating a cluster action

在前面的秘籍中,我们看到了如何创建一个 REST 入口点,但是要在集群级别执行操作,我们需要创建一个集群操作。

Elasticsearch 动作通常在集群中执行和分布,在这个秘籍中,我们将看到如何实现这种动作。集群动作将非常裸露;我们向每个分片发送一个带有值的字符串,分片会回显一个结果字符串,将字符串与分片号连接起来。

Getting ready

您需要启动并运行 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 第 1 章, < /span>开始.

需要使用 Gradle 或支持使用 Gradle 进行 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。 此配方的代码可在 ch16/rest_plugin 目录中找到。

How to do it...

在这个秘籍中,我们将看到一个 REST 调用被转换为一个内部集群操作。要执行内部集群操作,需要以下类:

  • A Request and Response class to communicate with the cluster
  • A RequestBuilder used to execute a request to the cluster
  • An Action used to register the action and bound Request, Response, and RequestBuilder
  • A Transport*Action to bind the request and response to ShardResponse: it manages the reduce part of the query
  • A ShardResponse to manage the shard results

我们将执行以下步骤:

  1. We write a SimpleRequest class as follows:
public class SimpleRequest extends BroadcastRequest<SimpleRequest> {

    private String field;

    SimpleRequest() {
    }

    public SimpleRequest(String... indices) {
        super(indices);
    }

    public void setField(String field) {
        this.field = field;
    }

    public String getField() {
        return field;
    }

    @Override
    public void readFrom(StreamInput in) throws IOException {
        super.readFrom(in);
        field = in.readString();
    }

    @Override
    public void writeTo(StreamOutput out) throws IOException {
        super.writeTo(out);
        out.writeString(field);
    }
}
  1. The SimpleResponse class is very similar to the SimpleRequest
  1. To bind the request and the response, an action (SimpleAction) is required as follows:
import org.elasticsearch.action.Action;

public class SimpleAction extends Action<SimpleResponse> {

    public static final SimpleAction INSTANCE = new SimpleAction();
    public static final String NAME = "custom:indices/simple";

    private SimpleAction() {
        super(NAME);
    }

    @Override
    public SimpleResponse newResponse() {
        return new SimpleResponse();
    }

}
  1. The Transport class is the core of the action. It's quite long so we'll present only the main important parts as follows:
public class TransportSimpleAction
        extends TransportBroadcastByNodeAction<SimpleRequest, SimpleResponse, ShardSimpleResponse> {

    private final IndicesService indicesService;

    @Inject
    public TransportSimpleAction(ClusterService clusterService,
                                 TransportService transportService, IndicesService indicesService,
                                 ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver) {
        super(SimpleAction.NAME, clusterService, transportService, actionFilters,
                indexNameExpressionResolver, SimpleRequest::new, ThreadPool.Names.SEARCH);
        this.indicesService = indicesService;
    }

    @Override
    protected SimpleResponse newResponse(SimpleRequest request, int totalShards, int successfulShards, int failedShards,
                                         List<ShardSimpleResponse> shardSimpleResponses,
                                         List<DefaultShardOperationFailedException> shardFailures,
                                         ClusterState clusterState) {
        Set<String> simple = new HashSet<String>();
        for (ShardSimpleResponse shardSimpleResponse : shardSimpleResponses) {
            simple.addAll(shardSimpleResponse.getTermList());
        }

        return new SimpleResponse(totalShards, successfulShards, failedShards, shardFailures, simple);
    }


    @Override
    protected ShardSimpleResponse shardOperation(SimpleRequest request, ShardRouting shardRouting) throws IOException {
        IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
        IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());
        indexShard.store().directory();
        Set<String> set = new HashSet<String>();
        set.add(request.getField() + "_" + shardRouting.shardId());
        return new ShardSimpleResponse(shardRouting, set);
    }


    @Override
    protected ShardSimpleResponse readShardResult(StreamInput in) throws IOException {
        return ShardSimpleResponse.readShardResult(in);
    }

    @Override
    protected SimpleRequest readRequestFrom(StreamInput in) throws IOException {
        SimpleRequest request = new SimpleRequest();
        request.readFrom(in);
        return request;
    }

    @Override
    protected ShardsIterator shards(ClusterState clusterState, SimpleRequest request, String[] concreteIndices) {
        return clusterState.routingTable().allShards(concreteIndices);
    }

    @Override
    protected ClusterBlockException checkGlobalBlock(ClusterState state, SimpleRequest request) {
        return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_READ);
    }

    @Override
    protected ClusterBlockException checkRequestBlock(ClusterState state, SimpleRequest request, String[] concreteIndices) {
        return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_READ, concreteIndices);
    }
}

How it works...

在此示例中,我们使用了在每个集群节点以及在该节点上选择的每个分片执行的操作。

如您所见,要执行集群操作,需要以下类:

  • A couple of Request/Response to interact with the cluster
  • A task action on the cluster level
  • A Shard Response to interact with the shards
  • A Transport class to manage the map/reduce shard part that must be invoked by the REST call

这些类必须扩展支持的操作之一,例如:

  • TrasportBroadcastAction: For actions that must be spread across the all cluster.
  • TransportClusterInfoAction: For actions that need to read information at the cluster level.
  • TransportMasterNodeAction: For actions that must be executed only by the master node.(such as index and mapping configuration). For simple acknowledge on the master, there are also AcknowledgedRequest response.
  • TransportNodeAction: For actions that must be executed on nodes (that is, all the node statistic actions).
  • TransportBroadcastReplicationAction, TransportReplicationAction, TransportWriteAction: For actions that must be executed by a particular replica, first on primary and then on secondary ones.
  • TransportInstanceSingleOperationAction: For actions that must be executed as a singleton in the cluster.
  • TransportSingleShardAction: For actions that must be executed only in a shard (that is, GET actions). If it fails on a shard, it automatically tries on the shard replicas.
  • TransportTasksAction: For actions that need to interact with cluster tasks.

在我们的示例中,我们定义了一个将广播到每个节点的操作,并且对于每个节点,它会收集其分片结果,然后聚合 如下:

public class TransportSimpleAction
        extends TransportBroadcastByNodeAction<SimpleRequest, SimpleResponse, ShardSimpleResponse> {

所有的请求/响应类都扩展了一个 Streamable 类,因此必须提供以下两种序列化其内容的方法:

  • readFrom, which reads from an StreamInput, a class that encapsulates common input stream operations. This method allows the deserialization of the data we transmit on the wire. In the preceding example, we read a string with the following code:
@Override
public void readFrom(StreamInput in) throws IOException {
    super.readFrom(in);
    field = in.readString();
}
  • writeTo, which writes the contents of the class to be sent via the network. The StreamOutput provides convenient methods to process the output. In the following example, we serialized the StreamOutput string:
@Override
public void writeTo(StreamOutput out) throws IOException {
    super.writeTo(out);
    out.writeString(field);
}

在这两个动作中, super 必须被调用以允许父类的正确序列化。

Elasticsearch 中的每个内部操作都设计为请求/响应模式。

要完成请求/响应动作,我们必须定义一个将请求与正确响应绑定的动作和一个构建器来构造它。为此,我们需要定义一个 Action 类 如下:

public class SimpleAction extends Action<SimpleResponse> {

这个Action对象是一个单例对象:我们通过创建一个默认的静态实例和私有构造函数来获得它 如下:

public static final SimpleAction INSTANCE = new SimpleAction();
public static final String NAME = "custom:indices/simple";

private SimpleAction() {
    super(NAME);
}

静态字符串 NAME 用于唯一标识集群级别的操作。

要完成Action定义,必须定义 newResponse 方法,用于创建新的空响应 如下:

@Override
public SimpleResponse newResponse() {
    return new SimpleResponse();
}

当动作执行时,请求和响应被序列化并发送到集群。要在集群级别执行我们的自定义代码,需要传输操作。

传输操作通常定义为 map 和 reduce 作业。 map 部分包括在几个分片上执行操作,然后减少部分,包括收集来自分片的所有结果作为必须发送回请求者的响应。为了加快 Elasticsearch 5.x 或更高版本中的处理速度,所有属于同一节点的分片响应都会被减少到适当的位置,以优化 I/O 和网络使用。

传输操作是一个包含许多方法的长类,但最重要的是 ShardOperation(map 部分)和 newResponse(reduce 部分)。

原始请求转换成分布式ShardRequest,由shardOperation方法处理 如下:

@Override
protected ShardSimpleResponse shardOperation(SimpleRequest request, ShardRouting shardRouting) {

要获取内部分片,我们需要在 IndexService 请求根据想要的索引返回一个分片。

分片请求包含执行动作必须使用的分片的索引和ID 如下:

IndexService indexService = indicesService.indexServiceSafe(shardRouting.shardId().getIndex());
IndexShard indexShard = indexService.getShard(shardRouting.shardId().id());

IndexShard 对象允许执行每个可能的分片操作(searchgetindex 等等) .通过这种方法,我们可以执行我们想要的每个数据分片操作。

自定义分片动作可以分布式、快速的方式执行应用的业务操作。

以下 示例中,我们创建了一组简单的值:

indexShard.store().directory();
Set<String> set = new HashSet<String>();
set.add(request.getField() + "_" + shardRouting.shardId());

我们分片操作的最后一步是创建一个响应以发送回 reduce 步骤。在创建 ShardResponse 时,我们需要返回结果以及执行该操作的索引和分片的信息 如下:

return new ShardSimpleResponse(shardRouting, set);

在 reduce 步骤(newResponse 方法)中收集分布式分片操作。这一步聚合所有分片结果,并将结果发送回原来的Action 如下:

@Override
protected SimpleResponse newResponse(SimpleRequest request, int totalShards, int successfulShards, int failedShards,
                                     List<ShardSimpleResponse> shardSimpleResponses,
                                     List<DefaultShardOperationFailedException> shardFailures,
                                     ClusterState clusterState) {

除了分片的结果,方法接收分片级别操作的状态,它们被收集在三个值中:successfulShardsfailedShardsshardFailures

请求结果是一组收集到的字符串,所以我们创建一个空集来收集term的结果 如下:

Set<String> simple = new HashSet<String>();

然后你收集我们需要迭代分片响应的结果 如下:

for (ShardSimpleResponse shardSimpleResponse : shardSimpleResponses) {
    simple.addAll(shardSimpleResponse.getTermList());
}

最后一步是创建响应,收集之前的结果和响应状态 如下:

return new SimpleResponse(totalShards, successfulShards, failedShards, shardFailures, simple);

当我们想要快速执行低级别操作时,需要创建集群操作,例如特殊聚合、服务器端连接或需要执行多个 Elasticsearch 调用的复杂操作。编写自定义 Elasticsearch 操作是 Elasticsearch 的一项高级功能,但它可以创建新的业务使用场景,从而提升 Elasticsearch 的功能。

See also

在本章中创建一个 REST 插件,了解如何将集群操作与 REST 调用接口

Creating an ingest plugin

Elasticsearch 5.x 引入了摄取节点,该节点允许在 Elasticsearch 摄取之前通过管道对记录进行修改。我们已经在  第 12 章 使用 摄取模块, 管道由一个或多个处理器操作组成。在这个秘籍中,我们将看到如何创建一个自定义处理器,在一个字段中存储另一个字段的初始字符。

Getting ready

您需要启动并运行 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 第 1 章, < /span>开始.

需要使用 Gradle 或支持使用 Gradle 进行 Java 编程的 IDE,例如 Eclipse 或 IntelliJ IDEA。 此配方的代码可在 ch16/ingest_plugin 目录中找到。

How to do it...

要创建摄取处理器插件,我们需要创建处理器,然后将其注册到 plugin 类中。我们将执行以下步骤:

  1. We create the processor and its factory as follows:
public final class InitialProcessor extends AbstractProcessor {

    public static final String TYPE = "initial";

    private final String field;
    private final String targetField;
    private final String defaultValue;
    private final boolean ignoreMissing;

    public InitialProcessor(String tag, String field, String targetField, boolean ignoreMissing, String defaultValue) {
        super(tag);
        this.field = field;
        this.targetField = targetField;
        this.ignoreMissing = ignoreMissing;
        this.defaultValue = defaultValue;
    }

    String getField() { return field; }
    String getTargetField() { return targetField; }
    String getDefaultField() { return defaultValue; }
    boolean isIgnoreMissing() { return ignoreMissing; }

    @Override
    public IngestDocument execute(IngestDocument document) {
        if (document.hasField(field, true) == false) {
            if (ignoreMissing) { return document;
            } else {
                throw new IllegalArgumentException("field [" + field + "] not present as part of path [" + field + "]");
            }
        }
        // We fail here if the target field point to an array slot that is out of range.
        // If we didn't do this then we would fail if we set the value in the target_field
        // and then on failure processors would not see that value we tried to rename as we already
        // removed it.
        if (document.hasField(targetField, true)) {
            throw new IllegalArgumentException("field [" + targetField + "] already exists");
        }

        Object value = document.getFieldValue(field, Object.class);
        if( value!=null && value instanceof String ) {
            String myValue=value.toString().trim();
            if(myValue.length()>1){
                try {
                    document.setFieldValue(targetField, myValue.substring(0,1).toLowerCase(Locale.getDefault()));
                } catch (Exception e) {
                    // setting the value back to the original field shouldn't as we just fetched it from that field:
                    document.setFieldValue(field, value);
                    throw e;
                }
            }
        }
        return document;
    }

    @Override
    public String getType() { return TYPE;}

    public static final class Factory implements Processor.Factory {
        @Override
        public InitialProcessor create(Map<String, Processor.Factory> registry, String processorTag,
                                      Map<String, Object> config) throws Exception {
            String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
            String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag,
                    config, "target_field");
            String defaultValue = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag,
                    config, "defaultValue");
            boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag,
                    config, "ignore_missing", false);
            return new InitialProcessor(processorTag, field, targetField, ignoreMissing, defaultValue);
        }
    }
}
  1. We need to register it in the Plugin class with the following lines:
public class InitialIngestPlugin extends Plugin implements IngestPlugin {
    @Override
    public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
        return Collections.singletonMap(InitialProcessor.TYPE,
                (factories, tag, config) -> new InitialProcessor.Factory().create(factories, tag, config));
    }
}
  1. Now we can build the plugin via mvn package and manually install the ZIP. If we restart the Elasticsearch server, we should see the plugin loaded as follows:
[2019-02-09T11:47:53,126][WARN ][o.e.n.Node ] [iMacParo.local] version [7.0.0-alpha2] is a pre-release version of Elasticsearch and is not suitable for production
 [2019-02-09T11:47:54,797][INFO ][o.e.p.PluginsService ] [iMacParo.local] loaded module [aggs-matrix-stats]
 ... truncated ...
 [2019-02-09T11:47:54,802][INFO ][o.e.p.PluginsService ] [iMacParo.local] loaded plugin [initial-processor]
 [2019-02-09T11:47:54,802][INFO ][o.e.p.PluginsService ] [iMacParo.local] loaded plugin [rest-plugin]
  1. We can test our custom ingest plugin via Simulate Ingest API with a curl as follows:
curl -XPOST -H "Content-Type: application/json" 'http://127.0.0.1:9200/_ingest/pipeline/_simulate?verbose&pretty' -d '{
"pipeline": {
     "description": "Test my custom plugin",
     "processors": [
      {
         "initial": {
           "field": "user",
           "target_field": "user_initial"
        }
      }
    ],
    "version": 1
  },
"docs": [
    {
       "_source": {
        "user": "john"
      }
    },
    {
       "_source": {
         "user": "Nancy"
       }
    }
  ]
}'
  1. The result will be something similar to the following:
{
  "docs" : [
    {
      "processor_results" : [
        {
          "doc" : {
            "_index" : "_index",
            "_type" : "_type",
            "_id" : "_id",
            "_source" : {
              "user_initial" : "j",
              "user" : "john"
            },
            "_ingest" : {
              "timestamp" : "2019-02-09T10:50:16.011932Z"
            }
          }
        }
      ]
    },
    {
      "processor_results" : [
        {
          "doc" : {
            "_index" : "_index",
            "_type" : "_type",
            "_id" : "_id",
            "_source" : {
              "user_initial" : "n",
              "user" : "Nancy"
            },
            "_ingest" : {
              "timestamp" : "2019-02-09T10:50:16.011973Z"
            }
          }
        }
      ]
    }
  ]
}

How it works...

首先,您需要定义将管理您的自定义处理器的类,它扩展了 AbstractProcessor

公共最终类 InitialProcessor 扩展 AbstractProcessor {

public final class InitialProcessor extends AbstractProcessor {

处理器 需要知道它操作的字段。它们保持在处理器的内部状态 如下:

public static final String TYPE = "initial";

private final String field;
private final String targetField;
private final String defaultValue;
private final boolean ignoreMissing;

public InitialProcessor(String tag, String field, String targetField, boolean ignoreMissing, String defaultValue) {
    super(tag);
    this.field = field;
    this.targetField = targetField;
    this.ignoreMissing = ignoreMissing;
    this.defaultValue = defaultValue;
}

处理器的核心是execute函数,里面包含了我们的处理器登录 如下:

@Override
public IngestDocument execute(IngestDocument document) {

execute 函数由以下步骤组成:

  1. Check if the source field exits as follows:
if (document.hasField(field, true) == false) {
    if (ignoreMissing) {
        return document;
    } else {
        throw new IllegalArgumentException("field [" + field + "] not present as part of path [" + field + "]");
    }
}
  1. Check if the target field does not exist as follows:
if (document.hasField(targetField, true)) {
    throw new IllegalArgumentException("field [" + targetField + "] already exists");
}
  1. We extract the value from document and check if it's valid as follows:
Object value = document.getFieldValue(field, Object.class);
if( value!=null && value instanceof String ) {
  1. Now, we can process the value and set in the target field as follows:
String myValue=value.toString().trim();
if(myValue.length()>1){
    try {
        document.setFieldValue(targetField, myValue.substring(0,1).toLowerCase(Locale.getDefault()));
    } catch (Exception e) {
        // setting the value back to the original field shouldn't as we just fetched it from that field:
        document.setFieldValue(field, value);
        throw e;
    }
}

为了能够为其定义初始化处理器,我们需要定义一个Factory对象 如下:

public static final class Factory implements Processor.Factory {

Factory 对象包含接收已注册处理器的create 方法、processorTag 及其必须读取的配置 如下:

@Override
public InitialProcessor create(Map<String, Processor.Factory> registry, String processorTag,
                              Map<String, Object> config) throws Exception {
    String field = ConfigurationUtils.readStringProperty(TYPE, processorTag, config, "field");
    String targetField = ConfigurationUtils.readStringProperty(TYPE, processorTag,
            config, "target_field");
    String defaultValue = ConfigurationUtils.readOptionalStringProperty(TYPE, processorTag,
            config, "defaultValue");
    boolean ignoreMissing = ConfigurationUtils.readBooleanProperty(TYPE, processorTag,
            config, "ignore_missing", false);

恢复后,我们可以初始化处理器参数 如下:

    return new InitialProcessor(processorTag, field, targetField, ignoreMissing, defaultValue);
}

要用作自定义处理器,它需要在插件中注册。这是通过将插件扩展为 IngestPlugin  来完成的,如下所示:

public class InitialIngestPlugin extends Plugin implements IngestPlugin {

现在,我们可以在getProcessors方法中注册Factory插件 如下:

@Override
public Map<String, Processor.Factory> getProcessors(Processor.Parameters parameters) {
    return Collections.singletonMap(InitialProcessor.TYPE,
            (factories, tag, config) -> new InitialProcessor.Factory().create(factories, tag, config));
}

通过插件实现摄取处理器非常简单,而且它是一个非常强大的功能。使用这种方法,用户可以创建自定义扩充管道。