vlambda博客
学习文章列表

读书笔记《elasticsearch-7-0-cookbook-fourth-edition》Python集成

Python Integration

在上一章中,我们看到了如何使用原生客户端通过 Java 访问 Elasticsearch 服务器。本章专门介绍 Python 语言以及如何通过其客户端管理常见任务。

除了 Java,Elasticsearch 团队还支持 Perl、PHP、Python、.NET 和 Ruby 的官方客户端(请参阅 Elasticsearch 博客上的公告帖子,网址为 http://www.elasticsearch.org/blog/unleash-the-clients-ruby-python-php-perl/)。与其他实现相比,这些客户端具有很多优势。以下列表中给出了其中一些:

  • They are strongly tied to the Elasticsearch API. These clients are direct translations of the native Elasticsearch REST interface—the Elasticsearch team.
  • They handle dynamic node detection and failovers. They are built with a strong networking base for communicating with the cluster.
  • They have full coverage of the REST API. They share the same application approach for every language in which they are available, so switching from one language to another is fast.
  • They are easily extensible.

Python 客户端与其他 Python 框架(例如 Django、web2py 和 Pyramid)配合得非常好。它允许非常快速地访问文档、索引和集群。

在本章中,我将尝试描述 Elasticsearch 官方 Python 客户端最重要的功能;对于其他示例,我建议您查看 https://github.com/elastic/elasticsearch 上的在线 GitHub 存储库和文档-py 和相关文档位于 https://elasticsearch-py.readthedocs.io/en/master/< /一>。

在本章中,我们将介绍以下食谱:

  • Creating a client
  • Managing indices
  • Managing mappings
  • Managing documents
  • Executing a standard search
  • Executing a search with aggregations

Creating a client

官方的 Elasticsearch 客户端旨在管理许多问题  通常需要创建可靠的 REST 客户端,例如 重试 如果有网络问题,集群其他节点的自动发现,以及HTTP层通信的数据转换。

在这个秘籍中,我们将学习如何用不同的选项来实例化一个客户端。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,我们描述了如何进入 the 下载和安装 Elasticsearch  配方  第一章,  span>开始.

应安装 Python 2.x 或 3.x 发行版。在 Linux 和 Mac OS X 系统中,它已经在标准安装中提供。要管理 Python,pip 包(https://pypi.python.org/pypi/pip/< /a>) 也必须  已安装。

此配方的完整代码在 ch15/code/client_creation.py 文件中。

How to do it...

要创建客户端,我们将执行以下步骤:

  1. Before using the Python client, we need to install it (possibly in a Python virtual environment). The client is officially hosted on PyPi (http://pypi.python.org/) and it's easy to install with the pip command, as shown in the following code:
pip install elasticsearch

此标准安装仅提供 HTTP。

  1. If you need to use the requests library for HTTP communication, you need to install it, as follows:
pip install requests
  1. After installing the package, we can instantiate the client. It resides in the Python elasticsearch package and it must be imported to instantiate the client.
  2. If you don't pass arguments to the Elasticsearch class, it instantiates a client that connects to the localhost and port 9200 (the default Elasticsearch HTTP one), as shown in the following code:
es = elasticsearch.Elasticsearch()
  1. If your cluster is composed of more than one node, you can pass the list of nodes as a round-robin connection between them and distribute the HTTP load, as follows:
# client using two nodes
es = elasticsearch.Elasticsearch(["search1:9200", "search2:9200"])
  1. Often, the complete topology of the cluster is unknown; if you know at least one node IP, you can use the sniff_on_start=True option, as shown in the following code. This option activates the client's ability to detect other nodes in the cluster:
# client using a node with sniffing
es = elasticsearch.Elasticsearch("localhost:9200", sniff_on_start=True)
  1. The default transport is Urllib3HttpConnection, but if you want to use the HTTP requests transport, you need to override the connection_class by passing RequestsHttpConnection, as follows:
# client using localhost:9200 and http requests transport
from elasticsearch.connection import RequestsHttpConnection
es = elasticsearch.Elasticsearch(sniff_on_start=True, connection_class=RequestsHttpConnection)

How it works…

要与 Elasticsearch 集群通信,需要客户端。

客户端使用 HTTP REST 调用管理从您的应用程序到 Elasticsearch 服务器的所有通信层。

Elasticsearch Python 客户端允许您使用以下库实现之一:

Elasticsearch Python 客户端需要连接服务器。如果一个没有定义,它会尝试在本地机器(localhost)上使用一个。如果您有多个节点,则可以传递要连接的服务器列表。

客户端自动尝试平衡所有集群节点上的操作。这是 Elasticsearch 客户端提供的一个非常强大的功能。

为了改进可用节点的列表,可以将客户端设置为自动发现新节点。我建议使用此功能,因为您经常会发现自己的集群有很多节点,并且需要关闭其中一些节点进行维护。可以传递给客户端以控制发现的选项如下:

  • sniff_on_start: The default value is False, which allows you to obtain the list of nodes from the cluster at startup time
  • sniffer_timeout: The default value is None; it is the number of seconds between the automatic sniffing of the cluster nodes
  • sniff_on_connection_fail: The default value is False, which controls whether a connection failure triggers a sniff of cluster nodes

默认客户端配置通过 urllib3 库使用 HTTP 协议。如果要使用其他传输协议,则需要将传输类的类型传递给 transport_class 变量。目前实现的类如下:

  • Transport: This is a default value—that is, a wrapper around Urllib3HttpConnection that uses HTTP (usually on port 9200)
  • RequestsHttpConnection: This is an alternative to Urllib3HttpConnection based on the requests library

See also

Managing indices

在上一个秘籍中,我们看到了如何初始化客户端以将调用发送到 Elasticsearch 集群。在这个秘籍中,我们将看看如何通过客户端调用来管理索引。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 the 下载和安装 Elasticsearch < /span>第 1 章中的食谱, 开始.

您还需要本章中创建客户端秘籍中的 Python 安装包。

这个配方的完整代码可以在 ch15/code/indices_management.py 文件中找到。

How to do it…

在 Python 中,管理索引的生命周期非常容易。为此,我们将执行以下步骤:

  1. We will initialize a client, as shown in the following code:
import elasticsearch

es = elasticsearch.Elasticsearch()

index_name = "my_index"
  1. We need to check whether the index exists, and if it does, we need to delete it. We can set this up using the following code:
if es.indices.exists(index_name):
    es.indices.delete(index_name)
  1. All the indices methods are available in the client.indices namespace. We can create and wait for the creation of an index using the following code:
es.indices.create(index_name)

es.cluster.health(wait_for_status="yellow")
  1. We can close/open an index using the following code:
es.indices.close(index_name)

es.indices.open(index_name)

es.cluster.health(wait_for_status="yellow")
  1. We can optimize an index by reducing the number of segments via the following code:
es.indices.forcemerge(index_name)
  1. We can delete an index using the following code:
es.indices.delete(index_name)

How it works…

Elasticsearch Python 客户端有两个特殊的管理器:一个用于索引( .indices )和一个用于集群( .cluster )。

对于需要使用索引的每个操作,第一个值通常是索引的名称。如果您需要一次性对多个索引执行操作,则索引必须用逗号 , 连接(即 index1,index2,indexN)。也可以使用 glob 模式来定义多索引,例如 index*

要创建索引,调用需要 index_name 和其他可选参数,例如索引设置和映射,如下面的代码所示。我们将在下一个秘籍中看到这个高级功能:

es.indices.create(index_name)

创建索引可能需要一些时间(从几毫秒到几秒);它是一个异步操作,它取决于集群的复杂性、磁盘的速度、网络拥塞等等。为了确保这个动作完成,我们需要检查集群的健康是否已经变成 yellow或者green,如下:

es.cluster.health(wait_for_status="yellow")
It's good practice to wait until the cluster status is yellow (at least) after operations that involve index creation and opening, because these actions are asynchronous.

我们用来 关闭索引 是 <client>.indices.close的方法,连同要关闭的索引,如下:

es.indices.close(index_name)

 方法 我们用来 打开一个索引 is <client>.indices .open,连同要打开的索引名称,如下代码所示:

es.indices.open(index_name)

es.cluster.health(wait_for_status="yellow")

与创建索引类似,在打开索引后,最好等到索引完全打开后再对索引执行操作;否则,在索引上执行命令会出错。此操作是通过检查集群的运行状况来完成的。

为了提高索引的性能,Elasticsearch 允许我们通过删除已删除的文档(文档被标记为已删除,但出于性能原因不会从段的索引中清除)和减少段的数量来优化它。要优化索引,必须在索引上调用 <client>.indices.forcemerge,如以下代码所示:

es.indices.forcemerge(index_name)

最后,如果我们想删除索引,我们可以调用 <client>.indices.delete,给出要删除的索引的名称。

请记住,删除索引会删除与其相关的所有内容,包括所有数据,并且此操作无法撤消。

There's more…

Python 客户端将 Elasticsearch API 包装在如下组中:

  • <client>.indices: This wraps all the REST APIs related to index management
  • <client>.ingest: This wraps all the REST APIs related to ingest calls
  • <client>.cluster: This wraps all the REST APIs related to cluster management
  • <client>.cat: This wraps the CAT API, a subset of the API that returns a textual representation of traditional JSON calls
  • <client>.nodes: This wraps all the REST APIs related to nodes management
  • <client>.snapshot: This allows us to execute a snapshot and restore data from Elasticsearch
  • <client>.tasks: This wraps all the REST APIs related to task management
  • <client>.remote: This wraps all the REST APIs related to remote information
  • <client>.xpack: This wraps all the REST APIs related to xpack information and usage

标准文档操作 (CRUD) 和搜索操作在客户端的顶层可用。

See also

  • The Creating an index recipe in Chapter 3, Basic Operations
  • The Deleting an index recipe in Chapter 3Basic Operations
  • The Opening/closing an index recipe in Chapter 3Basic Operations, for more details about the actions that are used to save cluster/node memory

Managing mappings include the mapping

Getting ready

您需要一个正常运行的 Elasticsearch 安装,我们描述了如何进入 the 下载和安装 Elasticsearch  第 1 章中的食谱< /a>, 开始.

您还需要我们在本章的创建客户端秘籍中安装的 Python 包。

此配方的代码位于 ch15/code/mapping_management.py 文件中。

How to do it…

初始化客户端并创建索引后,管理索引的步骤如下:

  1. Create a mapping
  2. Retrieve a mapping

通过执行以下步骤可以轻松管理这些步骤:

  1. We initialize the client, as follows:
import elasticsearch

es = elasticsearch.Elasticsearch()
  1. We create an index, as follows:
index_name = "my_index"
type_name = "_doc"

if es.indices.exists(index_name):
    es.indices.delete(index_name)

es.indices.create(index_name)
es.cluster.health(wait_for_status="yellow")
  1. We include the mapping, as follows:
es.indices.put_mapping(index=index_name, doc_type=type_name, body={type_name:{"properties": {
    "uuid": {"type": "keyword"},
    "title": {"type": "text", "term_vector": "with_positions_offsets"},
    "parsedtext": { "type": "text", "term_vector": "with_positions_offsets"},
    "nested": {"type": "nested", "properties": {"num": {"type": "integer"},
                                                "name": {"type": "keyword"},
                                                "value": {"type": "keyword"}}},
    "date": {"type": "date"},
    "position": {"type": "integer"},
    "name": {"type": "text", "term_vector": "with_positions_offsets"}}}})
  1. We retrieve the mapping, as follows:
mappings = es.indices.get_mapping(index_name, type_name)
  1. We delete the index, as follows:
es.indices.delete(index_name)

How it works…

我们已经在前面的秘籍中看到了如何初始化客户端和创建索引。

要创建映射,方法调用是 .indices.create_mapping ,给出索引名称、类型名称和映射,如以下代码所示。  第 3 章管理映射全面介绍了映射的创建。将标准 Python 类型转换为 JSON 很容易,反之亦然:

es.indices.put_mapping(index_name, type_name, {...})

如果在映射过程中产生错误,则会引发异常。 put_mapping API 有两个行为:创建和更新。

在 Elasticsearch 中,您不能从映射中删除属性。模式操作允许我们使用 put_mapping 调用。

要使用 get_mapping API 检索映射,请使用 .indices.get_mapping 方法,提供索引名称和类型名称,如以下代码所示:

mappings = es.indices.get_mapping(index_name, type_name)

返回的对象显然是描述映射的字典。

See also

  • The Putting a mapping in an index recipe in Chapter 3Basic Operations
  • The Getting a mapping recipe in Chapter 3Basic Operations

Managing documents

用于管理文档(索引、更新和删除)的 API 是仅次于搜索 API 的最重要的 API。在这个秘籍中,我们将看到如何以标准方式使用它们并使用批量操作来提高性能。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,我们描述了如何进入 下载和安装 Elasticsearch  ;第 1 章中的食谱 a>, 开始.

您还需要我们在本章的创建客户端秘籍中安装的 Python 包。

这个配方的完整代码可以在 ch15/code/document_management.py 文件中找到。

How to do it…

管理文档的三个主要操作如下:

  • index: This operation stores a document in Elasticsearch. It is mapped on the index API call.
  • update: This allows us to update values in a document. This operation is composed internally (via Lucene) by deleting the previous document and reindexing the document with the new values. It is mapped to the update API call.
  • delete: This deletes a document from the index. It is mapped to the delete API call.

使用 Elasticsearch Python 客户端,可以通过以下步骤执行这些操作:

  1. We initialize a client and create an index with the mapping, as follows:
import elasticsearch
from datetime import datetime

es = elasticsearch.Elasticsearch()

index_name = "my_index"
type_name = "_doc"

from code.utils import create_and_add_mapping


if es.indices.exists(index_name):
    es.indices.delete(index_name)


create_and_add_mapping(es, index_name)
  1. Then, we index some documents (we manage the parent/child), as follows:
es.index(index=index_name, doc_type="_doc", id=1,
                 body={"name": "Joe Tester", "parsedtext": "Joe Testere nice guy", "uuid": "11111",
                       "position": 1,
                       "date": datetime(2018, 12, 8), "join_field": {"name": "book"}})
es.index(index=index_name, doc_type="_doc", id="1.1",
                 body={"name": "data1", "value": "value1", "join_field": {"name": "metadata", "parent": "1"}},
                 routing=1)
... truncated ...
  1. Next, we update a document, as follows:
es.update(index=index_name, doc_type=type_name, id=2, body={"script": 'ctx._source.position += 1'})

document=es.get(index=index_name, doc_type=type_name, id=2)
print(document)
  1. We then delete a document, as follows:
es.delete(index=index_name, doc_type=type_name, id=3)
  1. Next, we bulk insert some documents, as follows:
from elasticsearch.helpers import bulk
bulk(es, [
    {"_index":index_name, "_type":type_name, "_id":"1", "source":{"name": "Joe Tester", "parsedtext": "Joe Testere nice guy", "uuid": "11111", "position": 1,
               "date": datetime(2018, 12, 8)}},

    {"_index": index_name, "_type": type_name, "_id": "1",
     "source": {"name": "Bill Baloney", "parsedtext": "Bill Testere nice guy", "uuid": "22222", "position": 2,
               "date": datetime(2018, 12, 8)}}
])
  1. Finally, we remove the index, as follows:
es.indices.delete(index_name)

How it works…

为了简化这个例子,在实例化客户端之后,调用utils包 的一个函数,它设置索引并放置映射,如下:

from code.utils import create_and_add_mapping
create_and_add_mapping(es, index_name)

此函数包含用于创建上一个配方的映射的代码。

用于索引文档的方法是 <client>.index,它需要索引的名称、文档的类型和正文文档,如下代码所示(如果没有给出ID,会自动生成):

es.index(index=index_name, doc_type="_doc", id=1,
                 body={"name": "Joe Tester", "parsedtext": "Joe Testere nice guy", "uuid": "11111",
                       "position": 1,
                       "date": datetime(2018, 12, 8), "join_field": {"name": "book"}})

它还接受我们在 第 3 章基本操作。传递给此函数的最常见参数如下:

  • id: This provides an ID that is used to index the document
  • routing: This provides a shard routing to index the document in the specified shard
  • parent: This provides a parent ID that is used to put the child document in the correct shard

用于更新文档的方法 .update ,它需要以下参数:

  • index_name
  • type_name
  • id of the document
  • script or document to update the document
  • * lang, which is optional, and indicates the language to be used, usually painless

如果我们想将位置增加1,我们将编写类似的代码, 如下:

es.update(index=index_name, doc_type=type_name, id=2, body={"script": 'ctx._source.position += 1'})

显然,该调用接受我们在章节中的 更新文档配方中讨论的所有参数3, 基本操作

用于删除文档的方法 .delete ,它需要以下参数:

  • index_name
  • type_name
  • id of the document

如果我们要删除一个id=3的文档,我们会写一段类似的代码,如下:

es.delete(index=index_name, doc_type=type_name, id=3)
Remember that all the Elasticsearch actions that work on documents are never seen instantly in the search. If you want to search without having to wait for the automatic refresh (every 1 second), you need to manually call the refresh API on the index.

为了执行批量索引,Elasticsearch 客户端提供了一个 helper 函数,该函数接受一个连接、一个可迭代的文档列表和批量大小。批量大小(默认为 500)定义通过单个批量调用发送的操作数。为正确控制文档的索引而必须传递的参数放在带有_前缀的文档中。要提供给散货船的文件必须格式化为标准搜索结果,正文在 source 字段中,如下所示:

from elasticsearch.helpers import bulk
bulk(es, [
    {"_index":index_name, "_type":type_name, "_id":"1", "source":{"name": "Joe Tester", "parsedtext": "Joe Testere nice guy", "uuid": "11111", "position": 1,
               "date": datetime(2018, 12, 8)}},

    {"_index": index_name, "_type": type_name, "_id": "1",
     "source": {"name": "Bill Baloney", "parsedtext": "Bill Testere nice guy", "uuid": "22222", "position": 2,
               "date": datetime(2018, 12, 8)}}
])

See also

  • The Indexing a document recipe in Chapter 3Basic Operations
  • The Getting a document recipe in Chapter 3Basic Operations 
  • The Deleting a document recipe in Chapter 3Basic Operations
  • The Updating a document recipe in Chapter 3Basic Operations
  • The Speeding up atomic operations (Bulk operations) recipe in Chapter 3Basic Operations

Executing a standard search

仅次于文档插入,Elasticsearch 中最常执行的操作是搜索。用于搜索的官方 Elasticsearch 客户端 API 类似于 REST API。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,我们描述了如何进入 the 下载和安装 Elasticsearch  第 1 章中的食谱< /a>, 开始.

您还需要 安装 in 本章中的创建客户端秘诀中的Python包。

这个配方的代码可以在 ch15/code/searching.py 文件中找到。

How to do it…

要执行标准查询,客户端search 方法 必须通过传递 query 参数,正如我们在 Chapter 4, 探索搜索功能。需要的参数是 index_name, type_name和查询DSL。在这个秘籍中,我们将学习如何调用一个 match_all 查询、一个 term 查询和一个 过滤器 查询。我们将执行以下步骤:

  1. We initialize the client and populate the index, as follows:
import elasticsearch
from pprint import pprint

es = elasticsearch.Elasticsearch()
index_name = "my_index"
type_name = "_doc"

if es.indices.exists(index_name):
    es.indices.delete(index_name)

from code.utils import create_and_add_mapping, populate

create_and_add_mapping(es, index_name)
populate(es, index_name)
  1. Then, we execute a search with a match_all query and print the results, as follows:
results = es.search(index_name, type_name, {"query": {"match_all": {}}})
pprint(results)
  1. We then execute a search with a term query and print the results, as follows:
results = es.search(index_name, type_name, {
    "query": {
        "term": {"name": {"boost": 3.0, "value": "joe"}}}
})
pprint(results)
  1. Next, we execute a search with a bool filter query and print the results, as follows:
results = es.search(index_name, type_name, {"query": {
    "bool": {
        "filter": {
            "bool": {
                "should": [
                    {"term": {"position": 1}},
                    {"term": {"position": 2}}]}
        }}}})
pprint(results)
  1. Finally, we remove the index, as follows:
es.indices.delete(index_name)

How it works…

Elasticsearch 官方客户端背后的想法是他们应该提供一个更类似于 REST 调用的通用 API。在 Python 中,使用查询 DSL 非常容易,因为它提供了从 Python 字典到 JSON 对象的简单映射,反之亦然。

在前面的例子中,在调用搜索之前,我们需要初始化索引并在其中放入一些数据;这是使用 utils 包中提供的两个帮助程序完成的,该包位于 ch_15 目录中。

两种方法如下:

  • create_and_add_mapping(es, index_name, type_name): This initializes the index and inserts the correct mapping to perform the search. The code of this function was taken from the Managing mappings recipe in this chapter.
  • populate(es, index_name, type_name): This populates the index with data. The code for this function was taken from the previous recipe.

初始化一些数据后,我们可以对其执行查询。要执行搜索,必须调用的方法是客户端上的 search。此方法接受 Chapter 4< /a>探索搜索功能.

search 方法的实际方法签名如下:

@query_params('_source', '_source_exclude', '_source_include',
    'allow_no_indices', 'allow_partial_search_results', 'analyze_wildcard',
    'analyzer', 'batched_reduce_size', 'default_operator', 'df',
    'docvalue_fields', 'expand_wildcards', 'explain', 'from_',
    'ignore_unavailable', 'lenient', 'max_concurrent_shard_requests',
    'pre_filter_shard_size', 'preference', 'q', 'request_cache', 'routing',
    'scroll', 'search_type', 'size', 'sort', 'stats', 'stored_fields',
    'suggest_field', 'suggest_mode', 'suggest_size', 'suggest_text',
    'terminate_after', 'timeout', 'track_scores', 'track_total_hits',
    'typed_keys', 'version')
def search(self, index=None, doc_type=None, body=None, params=None):

index 值可以是以下之一:

  • An index name or an alias name
  • A list of index (or alias) names as a string separated by commas (that is, index1,index2,indexN)
  • _all,  the special keyword that indicates all the indices

type 值可以是以下之一:

  • A type_name
  • A list of type names as a string separated by a comma (that is, type1,type2,typeN)
  • None to indicate all the types

正文是搜索 DSL,正如我们在 第 4 章中看到的探索搜索功能。在前面的示例中,我们有以下内容:

  • A match_all query (see the Matching all the documents recipe in Chapter 4Exploring Search Capabilities) to match all the index-type documents, as follows:
results = es.search(index_name, type_name, {"query": {"match_all": {}}})
  • A term query that matches a name term, joe, with boost 3.0, as shown in the following code:
results = es.search(index_name, type_name, {
    "query": {
        "term": {"name": {"boost": 3.0, "value": "joe"}}}
})
  • A filtered query with a query (match_all) and an or filter with two term filters matching position 1 and 2, as shown in the following code:
results = es.search(index_name, type_name, {"query": {
    "bool": {
        "filter": {
            "bool": {
                "should": [
                    {"term": {"position": 1}},
                    {"term": {"position": 2}}]}
        }}}})

返回的结果是一个 JSON 字典,我们在 Chapter 4探索搜索功能

如果某些匹配项匹配,则它们会在匹配项字段中返回。返回的标准结果数是 10。要返回更多结果,您需要使用 fromstart 参数对结果进行分页。

第 4 章中,探索搜索功能,所有参数的定义列表在搜索中使用。

See also

  • The Executing a search recipe in Chapter 4, Exploring Search Capabilities, for a detailed description of some search parameters
  • The Matching all the documents recipe in Chapter 4Exploring Search Capabilities, for a description of the match_all query

Executing a search with aggregations

搜索结果显然是搜索引擎的主要活动,因此聚合非常重要,因为它们通常有助于增加结果。

通过对搜索结果执行分析,聚合与搜索一起执行。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,我们描述了如何进入 the 下载和安装 Elasticsearch  第 1 章中的食谱< /a>, 开始.

您还需要我们 安装 in 本章的创建客户端秘籍的Python包。

这个配方的代码可以在 ch15/code/aggregation.py 文件中找到。

How to do it…

要使用聚合扩展查询,您需要定义一个聚合部分,正如我们在 第 7 章中看到的,< a href="936365a7-f168-45d5-b9dc-7855f297c781.xhtml" linkend="ch07">聚合。在官方 Elasticsearch 客户端的情况下,您可以将聚合 DSL 添加到搜索字典中以提供聚合。要进行此设置,我们将执行以下步骤:

  1. We need to initialize the client and populate the index, as follows:
import elasticsearch
from pprint import pprint

es = elasticsearch.Elasticsearch()
index_name = "my_index"
type_name = "_doc"

if es.indices.exists(index_name):
    es.indices.delete(index_name)

from code.utils import create_and_add_mapping, populate

create_and_add_mapping(es, index_name)
populate(es, index_name)
  1. Then, we can execute a search with a terms aggregation, as follows:
results = es.search(index_name, type_name,
                    { "size":0,
                        "aggs": {
                            "pterms": {"terms": {"field": "name", "size": 10}}
                        }
                    })
pprint(results)
  1. Next, we execute a search with a date histogram aggregation, as follows:
results = es.search(index_name, type_name,
                    { "size":0,
                        "aggs": {
                            "date_histo": {"date_histogram": {"field": "date", "interval": "month"}}
                        }
                    })
pprint(results)

es.indices.delete(index_name)

How it works…

 第 7 章中所述, Aggregations,在搜索过程中以分布式方式计算聚合。当您使用定义的聚合向 Elasticsearch 发送查询时,它会在查询处理中添加一个额外的步骤,允许进行 计算 聚合。

在前面的示例中,有两种聚合:术语聚合和日期直方图聚合。

第一个用于计算术语,它经常出现在对结果的术语聚合提供分面过滤的站点中,例如生产者、地理位置等。这显示在以下代码中:

results = es.search(index_name, type_name,
                    { "size":0,
                        "aggs": {
                            "pterms": {"terms": {"field": "name", "size": 10}}
                        }
                    })

术语聚合需要一个可以依赖的字段。返回的字段的默认存储桶数为 10。这个值可以通过定义 size 参数来改变。

计算的第二种聚合是日期直方图,它根据 datetime 字段提供命中。此聚合至少需要两个参数:datetime 字段,用作源,以及 interval,用于计算,如下所示:

results = es.search(index_name, type_name,
                    { "size":0,
                        "aggs": {
                            "date_histo": {"date_histogram": {"field": "date", "interval": "month"}}
                        }
                    })

搜索结果是我们在 第 7 章中看到的标准搜索响应/a>, 聚合

See also

  • The Executing the terms aggregation recipe in Chapter 7, Aggregations, on aggregating term values
  • The Executing the date histogram aggregation recipe in Chapter 7, Aggregations, on computing the histogram aggregation on date/time fields

Integrating with NumPy and scikit-learn

Elasticsearch 可以轻松与许多 Python 机器学习库集成。 NumPy 是处理数据集最常用的库之一——NumPy 数组是许多 Python 机器学习库的构建块数据集。在这个秘籍中,我们将看到如何使用 Elasticsearch 作为 scikit-learn 库 (https:/ /scikit-learn.org/).

Getting ready

您需要一个启动并运行的 Elasticsearch 安装,正如我们在 下载和安装 Elasticsearch 配方中所述, 第 1 章开始

这个配方的代码在 ch15/code 目录中,下一节中使用的文件是kmeans_example .py.

我们将使用 iris 数据集(https://en.wikipedia.org/wiki/Iris_flower_data_set )我们在 第 13 章Java 集成中使用过。要准备您的索引数据集 iris,我们需要执行以下源代码中提供的 PopulatingIndex 类来填充它第 13 章Java 集成

How to do it...

我们将使用 Elasticsearch 作为数据源来构建我们的数据集,我们将使用 scikit-learn 提供的 KMeans 算法执行聚类。为此,我们将执行以下步骤:

  1. We need to add the required machine learning libraries required in the file requirements.txt and install them:
pandas
matplotlib
sklearn
  1. Now we can initialize the Elasticsearch client and fetch our samples:
import elasticsearch
es = elasticsearch.Elasticsearch()
result = es.search(index="iris", size=100)
  1. Now we can read our dataset iterating on Elasticsearch hit results:
x = []
 for hit in result["hits"]["hits"]:
     source = hit["_source"]
     x.append(np.array([source['f1'], source['f2'], source['f3'], source['f4']]))
 x = np.array(x)
  1. After having loaded our dataset, we can execute a clusterization using the KMeans algorithm:
# Finding the optimum number of clusters for k-means classification
 from sklearn.cluster import KMeans
 # Applying kmeans to the dataset / Creating the kmeans classifier
 kmeans = KMeans(n_clusters=3, init='k-means++', max_iter=300, n_init=10, random_state=0)
 y_kmeans = kmeans.fit_predict(x)
  1. Now that the clusters are computed, we need to show them to verify the result. To do this, we use the matplotlib.pyplot module:
plt.scatter(x[y_kmeans == 0, 0], x[y_kmeans == 0, 1], s=100, c='red', label='Iris-setosa')
 plt.scatter(x[y_kmeans == 1, 0], x[y_kmeans == 1, 1], s=100, c='blue', label='Iris-versicolour')
 plt.scatter(x[y_kmeans == 2, 0], x[y_kmeans == 2, 1], s=100, c='green', label='Iris-virginica')
 
 # Plotting the centroids of the clusters
 plt.scatter(kmeans.cluster_centers_[:, 0], kmeans.cluster_centers_[:, 1], s=100, c='yellow', label='Centroids')
 
 plt.legend()
 plt.show()

最终输出如下图:

读书笔记《elasticsearch-7-0-cookbook-fourth-edition》Python集成

How it works...

Elasticsearch 是一个非常强大的机器学习数据集数据存储。它允许使用其查询功能来过滤数据并检索构建用于机器学习活动的数据集所需的内容。

从前面的代码可以看出,要使用 Python 从 Elasticsearch 中获取数据,我们只需要几行代码——一行用于初始化客户端,另一行用于检索结果:

import elasticsearch
 es = elasticsearch.Elasticsearch()
 result = es.search(index="iris", size=100)

当您有结果命中时,很容易对其进行迭代并提取机器学习库所需的 NumPy 数组。

在前面的代码中,我们为每个迭代所有命中的样本生成一个 NumPy 数组:

x = []
 for hit in result["hits"]["hits"]:
     source = hit["_source"]
     x.append(np.array([source['f1'], source['f2'], source['f3'], source['f4']]))
 x = np.array(x)

生成的 Numpy 数组 x 可以用作每个统计或机器学习库的输入。

与使用 CSV 或数据文件的典型方法相比,使用 Elasticsearch 管理数据集具有更多优势,例如:

  • Automatically distribute your datasets to everyone that is able to connect to Elasticsearch
  • The samples are already in correct type format (int, double, string): not needed to covert values due to file reading
  • You can easily filter the samples without reading all the data in memory using Elasticsearch queries
  • You can use Kibana for data exploration before creating your machine models

See also