在这个秘籍中,我们将看到一个 REST 调用被转换为一个内部集群操作。要执行内部集群操作,需要以下类:
- A Request and Response class to communicate with the cluster
- A RequestBuilder used to execute a request to the cluster
- An Action used to register the action and bound Request, Response, and RequestBuilder
- A Transport*Action to bind the request and response to ShardResponse: it manages the reduce part of the query
- A ShardResponse to manage the shard results
我们将执行以下步骤:
- We write a SimpleRequest class as follows:
- The SimpleResponse class is very similar to the SimpleRequest
- To bind the request and the response, an action (SimpleAction) is required as follows:
- The Transport class is the core of the action. It's quite long so we'll present only the main important parts as follows:
在此示例中,我们使用了在每个集群节点以及在该节点上选择的每个分片执行的操作。
如您所见,要执行集群操作,需要以下类:
- A couple of Request/Response to interact with the cluster
- A task action on the cluster level
- A Shard Response to interact with the shards
- A Transport class to manage the map/reduce shard part that must be invoked by the REST call
这些类必须扩展支持的操作之一,例如:
- TrasportBroadcastAction: For actions that must be spread across the all cluster.
- TransportClusterInfoAction: For actions that need to read information at the cluster level.
- TransportMasterNodeAction: For actions that must be executed only by the master node.(such as index and mapping configuration). For simple acknowledge on the master, there are also AcknowledgedRequest response.
- TransportNodeAction: For actions that must be executed on nodes (that is, all the node statistic actions).
- TransportBroadcastReplicationAction, TransportReplicationAction, TransportWriteAction: For actions that must be executed by a particular replica, first on primary and then on secondary ones.
- TransportInstanceSingleOperationAction: For actions that must be executed as a singleton in the cluster.
- TransportSingleShardAction: For actions that must be executed only in a shard (that is, GET actions). If it fails on a shard, it automatically tries on the shard replicas.
- TransportTasksAction: For actions that need to interact with cluster tasks.
在我们的示例中,我们定义了一个将广播到每个节点的操作,并且对于每个节点,它会收集其分片结果,然后聚合 如下:
所有的请求/响应类都扩展了一个 Streamable 类,因此必须提供以下两种序列化其内容的方法:
- readFrom, which reads from an StreamInput, a class that encapsulates common input stream operations. This method allows the deserialization of the data we transmit on the wire. In the preceding example, we read a string with the following code:
- writeTo, which writes the contents of the class to be sent via the network. The StreamOutput provides convenient methods to process the output. In the following example, we serialized the StreamOutput string:
在这两个动作中, super 必须被调用以允许父类的正确序列化。
Elasticsearch 中的每个内部操作都设计为请求/响应模式。
要完成请求/响应动作,我们必须定义一个将请求与正确响应绑定的动作和一个构建器来构造它。为此,我们需要定义一个 Action 类 如下:
这个Action对象是一个单例对象:我们通过创建一个默认的静态实例和私有构造函数来获得它 如下:
静态字符串 NAME 用于唯一标识集群级别的操作。
要完成Action定义,必须定义 newResponse 方法,用于创建新的空响应 如下:
当动作执行时,请求和响应被序列化并发送到集群。要在集群级别执行我们的自定义代码,需要传输操作。
传输操作通常定义为 map 和 reduce 作业。 map 部分包括在几个分片上执行操作,然后减少部分,包括收集来自分片的所有结果作为必须发送回请求者的响应。为了加快 Elasticsearch 5.x 或更高版本中的处理速度,所有属于同一节点的分片响应都会被减少到适当的位置,以优化 I/O 和网络使用。
传输操作是一个包含许多方法的长类,但最重要的是 ShardOperation(map 部分)和 newResponse(reduce 部分)。
原始请求转换成分布式ShardRequest,由shardOperation方法处理 如下:
要获取内部分片,我们需要在 IndexService 请求根据想要的索引返回一个分片。
分片请求包含执行动作必须使用的分片的索引和ID 如下:
IndexShard 对象允许执行每个可能的分片操作(search、get、index 等等) .通过这种方法,我们可以执行我们想要的每个数据分片操作。
自定义分片动作可以分布式、快速的方式执行应用的业务操作。
在以下 示例中,我们创建了一组简单的值:
我们分片操作的最后一步是创建一个响应以发送回 reduce 步骤。在创建 ShardResponse 时,我们需要返回结果以及执行该操作的索引和分片的信息 如下:
在 reduce 步骤(newResponse 方法)中收集分布式分片操作。这一步聚合所有分片结果,并将结果发送回原来的Action 如下:
除了分片的结果,方法接收分片级别操作的状态,它们被收集在三个值中:successfulShards、failedShards和shardFailures。
请求结果是一组收集到的字符串,所以我们创建一个空集来收集term的结果 如下:
然后你收集我们需要迭代分片响应的结果 如下:
最后一步是创建响应,收集之前的结果和响应状态 如下:
当我们想要快速执行低级别操作时,需要创建集群操作,例如特殊聚合、服务器端连接或需要执行多个 Elasticsearch 调用的复杂操作。编写自定义 Elasticsearch 操作是 Elasticsearch 的一项高级功能,但它可以创建新的业务使用场景,从而提升 Elasticsearch 的功能。