vlambda博客
学习文章列表

读书笔记《elasticsearch-7-0-cookbook-fourth-edition》使用接收模块

Using the Ingest Module

Elasticsearch 5.x 引入了一组强大的功能,针对通过摄取节点摄取文档期间出现的问题。

第 1 章中, < /span>入门,我们讨论了 Elasticsearch 节点可以是 masterdataingest;由于预处理文档时可能出现的问题,将摄取组件与其他组件分开的想法是创建一个更稳定的集群。

为了创建一个更稳定的集群,摄取节点应该由主节点(也可能与数据节点)隔离,以防可能发生一些问题,例如由于附件插件而导致的崩溃和由于复杂的负载而导致的高负载类型操作。

摄取节点可以替换 Logstash 简单场景下的安装。

在本章中,我们将介绍以下食谱:

  • Pipeline definition
  • Inserting an ingest pipeline
  • Getting an ingest pipeline
  • Deleting an ingest pipeline
  • Simulating a pipeline
  • Built-in processors
  • The grok processor
  • Using the ingest attachment plugin
  • Using the ingest GeoIP plugin

Pipeline definition

摄取节点的工作是在将文档发送到数据节点之前对其进行预处理。此过程称为管道定义,此管道的每一步都是处理器定义。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在第 1 章, 入门

要执行这些命令,可以使用任何 HTTP 客户端,例如 curl (https://curl.haxx.se/)、postman (https://www.getpostman.com/),或类似的。我们将使用 Kibana 控制台,因为它为 Elasticsearch 提供代码完成和更好的字符转义。

How to do it...

要定义摄取管道,您需要提供描述和一些处理器,如下所示:

  1. Define a pipeline that adds a field called user with the value, john:
{
  "description": "Add user john field",
  "processors": [
    {
      "set": {
        "field": "user",
        "value": "john"
      }
    }
  ],
  "version": 1
}

How it works...

通用模板表示如下:

{
  "description" : "...",
  "processors" : [ ... ],
  "version": 1,
  "on_failure" : [ ... ],
}

description 包含此管道完成的活动的定义。如果您在集群中存储大量管道,这将非常有用。

processors 字段包含处理器操作列表。它们将按顺序执行。

在前面的示例中,我们使用了一个名为 set 的简单处理器操作,它允许我们为字段设置一个值。 version 字段是可选的,但它对于跟踪管道版本非常有用。可选的 on_failure 字段允许我们定义在正常管道执行期间出现故障时要应用的处理器列表。

There's more...

为了防止在缺少字段或类似约束的情况下失败,一些处理器提供了 ignore_failure 属性。

例如,具有处理缺失字段的 rename 字段的管道应按以下方式定义:

{
  "description": "my pipeline with handled exceptions",
  "processors": [
    {
      "rename": {
        "field": "foo",
        "target_field": "bar",
        "ignore_failure": true
      }
    }
  ]
}

如果发生故障,您可以配置一个 on_failure 条目来管理错误。

例如,可以通过以下方式将错误保存在字段中:

{
  "description": "my pipeline with handled exceptions",
  "processors": [
    {
      "rename": {
        "field": "foo",
        "target_field": "bar",
        "on_failure": [
          {
            "set": {
              "field": "error",
              "value": "{{ _ingest.on_failure_message }}"
            }
          }
        ]
      }
    }
  ]
}
Many of the processors allow us to define an if statement using Painless script or Regular Expression. This property is very handy to build more complex pipelines.

See also

Inserting an ingest pipeline

管道定义的强大之处在于无需重启节点即可更新和创建它(与 Logstash 相比)。该定义通过 put 管道 API 存储在集群状态中。

现在我们已经定义了一个管道,我们需要将它提供给 Elasticsearch 集群。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在 第 1 章, 入门

要执行命令,可以使用每个 HTTP 客户端,例如 curl (https://curl.haxx.se/)、postman (https://www.getpostman.com/),或类似的。使用 Kibana 控制台,因为它为 Elasticsearch 提供了代码完成和更好的字符转义。

How to do it...

要在 Elasticsearch 中存储或更新摄取管道,我们将执行以下步骤:

  1. Store the ingest pipeline using a PUT call:
PUT /_ingest/pipeline/add-user-john
{
  "description": "Add user john field",
  "processors": [
    {
      "set": {
        "field": "user",
        "value": "john"
      }
    }
  ],
  "version": 1
}
  1. The result that's returned by Elasticsearch, if everything is okay, should be as follows:
{
  "acknowledged" : true
}

How it works...

PUT 管道方法既适用于创建管道,也适用于更新现有管道。

管道以集群状态存储,并立即传播到所有摄取节点。当摄取节点收到新的管道时,它们将在内存中的管道表示中更新其节点,并且管道更改立即生效。

当您在集群中存储管道时,请注意并提供一个有意义的名称(在示例中为 add-user-john),以便您可以轻松了解管道的作用。 put 调用中使用的管道名称将是其他管道流中管道的 ID。

将管道存储在 Elasticsearch 中后,您可以索引提供管道名称作为查询参数的文档。例如:

PUT /my_index/my_type/my_id?pipeline=add-user-john
{}

该文档将在被索引之前由管道丰富:

{
  "_index" : "my_index",
  "_type" : "_doc",
  "_id" : "my_id",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "user" : "john"
  }
}

Getting an ingest pipeline

存储管道后,通常会检索其内容,以便您检查其定义。此操作可以通过 get 管道 API 完成。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在第 1 章, 入门

要执行命令,可以使用每个 HTTP 客户端,例如 curl (https://curl.haxx.se/)、postman (https://www.getpostman.com/),或类似的。使用 Kibana 控制台,因为它为 Elasticsearch 提供了代码完成和更好的字符转义。

How to do it...

要在 Elasticsearch 中检索摄取管道,我们将执行以下步骤:

  1. We can retrieve the ingest pipeline using a GET call:
GET /_ingest/pipeline/add-user-
  1. The result that's returned by Elasticsearch, if everything is okay, should be as follows:
{
  "add-user-john" : {
    "description" : "Add user john field",
    "processors" : [
      {
        "set" : {
          "field" : "user",
          "value" : "john"
        }
      }
    ],
    "version" : 1
  }
}

How it works...

要检索摄取管道,您需要其名称或 ID。对于每个返回的管道,都会返回所有数据:源和版本(如果已定义)。

GET  管道允许我们在名称中使用通配符,因此您可以执行以下操作:

  • Retrieve all pipelines using *:
GET /_ingest/pipeline/*
  • Retrieve a partial pipeline:
GET /_ingest/pipeline/add-*
In case you have a lot of pipelines, using a good name convention helps a lot in their management.

There's more...

如果您只需要管道的一部分,例如版本,您可以使用 filter_path 将管道过滤为只需要的部分。看看下面的代码示例:

GET /_ingest/pipeline/add-user-john?filter_path=*.version

它将仅返回管道的版本部分:

{
  "add-user-john" : {
    "version" : 1
  }
}

Deleting an ingest pipeline

要清理我们的 Elasticsearch 集群以查找过时或不需要的管道,我们需要使用管道的 ID 调用 delete 管道 API。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在第 1 章, 入门

要执行命令,可以使用每个 HTTP 客户端,例如 curl (https://curl.haxx.se/)、postman (https://www.getpostman.com/),或类似的。使用 Kibana 控制台,因为它为 Elasticsearch 提供了代码完成和更好的字符转义。

How to do it...

要在 Elasticsearch 中删除摄取管道,我们将执行以下步骤:

  1. We can delete the ingest pipeline using a DELETE call:
DELETE /_ingest/pipeline/add-user-john
  1. The result that's returned by Elasticsearch, if everything is okay, should be as follows:
{
  "acknowledged" : true
}

How it works...

delete 管道 API 从 Elasticsearch 中删除命名管道。

由于管道由于其集群级存储而保存在每个节点的内存中,并且管道始终在摄取节点中启动并运行,因此最好只在集群中保留必要的管道。

delete pipeline API 不允许您在管道名称或 ID 中使用通配符。

Simulating an ingest pipeline

每个架构的摄取部分都非常敏感,因此 Elasticsearch 团队创造了模拟管道的可能性,而无需将它们存储在 Elasticsearch 中。

simulate 管道 API 允许用户测试、改进和检查管道的功能,而无需部署在 Elasticsearch 集群中。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在第 1 章, 入门

要执行命令,可以使用每个 HTTP 客户端,例如 curl (https://curl.haxx.se/)、postman (https://www.getpostman.com/),或类似的。使用 Kibana 控制台,因为它为 Elasticsearch 提供了代码完成和更好的字符转义。

How to do it...

为了在 Elasticsearch 中模拟摄取管道,我们将执行以下步骤:

  1. Execute a call for passing both the pipeline and a sample subset of a document that you can test the pipeline against:
POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Add user john field",
    "processors": [
      {
        "set": {
          "field": "user",
          "value": "john"
        }
      },
      {
        "set": {
          "field": "job",
          "value": 10
        }
      }
    ],
    "version": 1
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "1",
      "_source": {
        "name": "docs1"
      }
    },
    {
      "_index": "index",
      "_type": "type",
      "_id": "2",
      "_source": {
        "name": "docs2"
      }
    }
  ]
}
  1. The result returned by Elasticsearch, if everything is okay, should be a list of documents with the pipeline processed:
{
  "docs" : [
    {
      "doc" : {
        "_index" : "index",
        "_type" : "type",
        "_id" : "1",
        "_source" : {
          "name" : "docs1",
          "job" : 10,
          "user" : "john"
        },
        "_ingest" : {
          "timestamp" : "2019-01-06T14:14:49.805621Z"
        }
      }
    },
    {
      "doc" : {
        "_index" : "index",
        "_type" : "type",
        "_id" : "2",
        "_source" : {
          "name" : "docs2",
          "job" : 10,
          "user" : "john"
        },
        "_ingest" : {
          "timestamp" : "2019-01-06T14:14:49.805651Z"
        }
      }
    }
  ]
}

How it works...

在一次调用中,simulated 管道 API 能够在文档子集上测试管道。它在内部执行以下步骤:

  1. It parses the provided pipeline definition, creating an in-memory representation of the pipeline
  2. It reads the provided documents by applying the pipeline
  3. It returns the processed results

唯一需要的部分是 pipeline one 和包含文档列表的docs。文档(在 docs 中提供) 必须使用元数据字段和源字段进行格式化,类似于查询结果。

有能够修改元数据字段的处理器;例如,他们能够根据其内容更改_index_type。元数据字段是 _index_type_id_routing_parent

出于调试目的,可以详细添加 URL 查询参数以返回管道的所有中间步骤。例如,假设我们在代码中更改了之前模拟的调用:

POST /_ingest/pipeline/_simulate
{..truncated ...}

结果将针对每个管道步骤进行扩展:

{
  "docs" : [
    {
      "processor_results" : [
        {
          "doc" : {
            "_index" : "index",
            "_type" : "type",
            "_id" : "1",
            "_source" : {
              "name" : "docs1",
              "user" : "john"
            },
            "_ingest" : {
              "timestamp" : "2019-01-06T14:17:46.739584Z"
            }
          }
        },
        {
          "doc" : {
            "_index" : "index",
            "_type" : "type",
            "_id" : "1",
            "_source" : {
              "name" : "docs1",
              "job" : 10,
              "user" : "john"
            },
            "_ingest" : {
              "timestamp" : "2019-01-06T14:17:46.739584Z"
            }
          }
        }
      ]
    },
    ... truncated ...
}

There's more...

当用户需要检查使用特殊字段访问的复杂管道时,simulate 管道 API 非常方便,例如:

  • Ingest metadata fields: These are special metadata fields, such as _ingest.timestamp, and are available during ingestion. This kind of field allows values to be added in the document; for example:
{
  "set": {
    "field": "received",
    "value": "{{_ingest.timestamp}}"
  }
}
  • Field replace templating: Using the templating with {{}}, it's possible to inject other fields or join their values:
{
  "set": {
    "field": "full_name",
    "value": "{{name}} {{surname}}"
  }
}

摄取元数据字段(可使用 _ingest访问)如下:

  • timestamp: This contains the current pipeline timestamp.
  • on_failure_message: This is available only in the on_failure block in case of failure. It contains the failure message.
  • on_failure_processor_type: This is available only in the on_failure block in case of failure. It contains the failure processor type that has generated the failure.
  • on_failure_processor_tag: This is available only in the on_failure block in case of failure. It contains the failure tag that has generated the failure.

Built-in processors

Elasticsearch 默认提供了大量的摄取处理器。它们的数量和功能也可以从小版本更改为新场景的扩展版本。

在这个秘籍中,我们将看看最常用的那些。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在第 1 章, 入门

要执行命令,可以使用每个 HTTP 客户端,例如 curl (https://curl.haxx.se/)、postman (https://www.getpostman.com/),或类似的。使用 Kibana 控制台,因为它为 Elasticsearch 提供了代码完成和更好的字符转义。

How to do it...

要在 Elasticsearch 的摄取管道中使用多个处理器,我们将执行以下步骤:

  1. Execute a simulate pipeline API call using several processors with a sample subset of a document that you can test the pipeline against:
POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Testing some build-processors",
    "processors": [
      {
        "dot_expander": {
          "field": "extfield.innerfield"
        }
      },
      {
        "remove": {
          "field": "unwanted"
        }
      },
      {
        "trim": {
          "field": "message"
        }
      },
      {
        "set": {
          "field": "tokens",
          "value": "{{message}}"
        }
      },
      {
        "split": {
          "field": "tokens",
          "separator": "\\s+"
        }
      },
      {
        "sort": {
          "field": "tokens",
          "order": "desc"
        }
      },
      {
        "convert": {
          "field": "mynumbertext",
          "target_field": "mynumber",
          "type": "integer"
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "1",
      "_source": {
        "extfield.innerfield": "booo",
        "unwanted": 32243,
        "message": "155.2.124.3 GET /index.html 15442 0.038",
        "mynumbertext": "3123"
      }
    }
  ]
}

结果如下:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "index",
        "_type" : "type",
        "_id" : "1",
        "_source" : {
          "mynumbertext" : "3123",
          "extfield" : {
            "innerfield" : "booo"
          },
          "tokens" : [
            "GET",
            "155.2.124.3",
            "15442",
            "0.038",
            "/index.html"
          ],
          "message" : "155.2.124.3 GET /index.html 15442 0.038",
          "mynumber" : 3123
        },
        "_ingest" : {
          "timestamp" : "2019-01-06T15:08:45.489069Z"
        }
      }
    }
  ]
}

How it works...

前面的示例展示了如何构建复杂的管道以预处理文档。有很多内置处理器可以覆盖日志和文本处理中最常见的场景。更复杂的可以使用脚本来完成。

在编写本书时,Elasticsearch 提供了内置的管道。以下是最常用的处理器:

姓名

说明

附加

将值附加到字段。如果需要,它将它们转换为数组

兑换

将字段值转换为不同的类型

日期

解析日期并将其用作文档的时间戳

日期索引名称

允许我们根据日期字段设置 _index 名称

降低

删除以下文档而不引发错误

失败

引发失败

前锋

使用提供的处理器处理数组元素

格罗克

应用 grok 模式提取

Gsub

在字段上执行正则表达式 replace

加入

使用分隔符连接值数组

JSON

将 JSON 字符串转换为 JSON 对象

小写

小写字段

消除

删除一个字段

管道

允许我们执行其他管道

改名

重命名字段

脚本

允许我们执行脚本

设置字段的值

分裂

使用正则表达式拆分数组中的字段

种类

对数组字段的值进行排序

修剪

修剪字段中的空格

大写

大写字段

点扩展器

扩展对象中带有点的字段

See also

第 16 章,插件开发中,我们将介绍如何编写自定义处理器在 Java 中扩展 Elasticsearch 的功能。

Grok processor

Elasticsearch 提供了大量的内置处理器,这些处理器随着每个版本的发布而增加。在前面的例子中,我们看到了 setreplace 。在这个秘籍中,我们将介绍一个主要用于日志分析的:grok 处理器,它是 Logstash 用户所熟知的。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在第 1 章, 入门

要执行命令,可以使用每个 HTTP 客户端,例如 curl (https://curl.haxx.se/)、postman (https://www.getpostman.com/),或类似的。使用 Kibana 控制台,因为它为 Elasticsearch 提供了代码完成和更好的字符转义。

How to do it...

要针对某些日志行测试 grok 模式,我们将执行以下步骤:

  1. Execute a call by passing both the pipeline with our grok processor and a sample subset of a document to test the pipeline against:
POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "Testing grok pattern",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"
          ]
        }
      }
    ]
  },
  "docs": [
    {
      "_index": "index",
      "_type": "type",
      "_id": "1",
      "_source": {
        "message": "155.2.124.3 GET /index.html 15442 0.038"
      }
    }
  ]
}

  1. The result returned by Elasticsearch, if everything is okay, should be a list of documents with the pipeline processed:
{
  "docs" : [
    {
      "doc" : {
        "_index" : "index",
 "_type" : "type",
 "_id" : "1",
 "_source" : {
 "duration" : "0.038",
          "request" : "/index.html",
          "method" : "GET",
          "bytes" : "15442",
          "client" : "155.2.124.3",
          "message" : "155.2.124.3 GET /index.html 15442 0.038"
        },
        "_ingest" : {
          "timestamp" : "2019-01-06T15:17:02.784882Z"
        }
      }
    }
  ]
}

How it works...

grok 处理器允许您从文档中的单个文本字段中提取结构字段。 grok 模式就像一个支持可重用的别名表达式的正则表达式。它主要用于另一个 Elastic 软件 Logstash,因为它具有强大的日志数据提取语法。

Elastisearch 有大约 120 个内置的 grok 表达式(您可以在 https://github.com/elastic/elasticsearch/tree/ master/modules/ingest-common/src/main/resources/patterns)。

定义 grok 表达式非常简单,因为语法是人类可读的。如果我们想从表达式 (pattern) 中提取颜色,并检查它们的值是否在 REDYELLOW 的子集中>BLUE 使用 pattern_definitions,我们可以定义一个类似的处理器:

POST /_ingest/pipeline/_simulate
{
  "pipeline": {
    "description": "custom grok pattern",
    "processors": [
      {
        "grok": {
          "field": "message",
          "patterns": [
            "my favorite color is %{COLOR:color}"
          ],
          "pattern_definitions": {
            "COLOR": "RED|GREEN|BLUE"
          }
        }
      }
    ]
  },
  "docs": [
    {
      "_source": {
        "message": "my favorite color is RED"
      }
    },
    {
      "_source": {
        "message": "happy fail!!"
      }
    }
  ]
}

结果如下:

{
  "docs" : [
    {
      "doc" : {
        "_index" : "_index",
        "_type" : "_type",
        "_id" : "_id",
        "_source" : {
          "message" : "my favorite color is RED",
          "color" : "RED"
        },
        "_ingest" : {
          "timestamp" : "2019-01-06T15:18:53.8785Z"
        }
      }
    },
    {
      "error" : {
        "root_cause" : [
          {
            "type" : "exception",
            "reason" : "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: Provided Grok expressions do not match field value: [happy fail!!]",
            "header" : {
              "processor_type" : "grok"
            }
          }
        ],
        "type" : "exception",
        "reason" : "java.lang.IllegalArgumentException: java.lang.IllegalArgumentException: Provided Grok expressions do not match field value: [happy fail!!]",
        "caused_by" : {
          "type" : "illegal_argument_exception",
          "reason" : "java.lang.IllegalArgumentException: Provided Grok expressions do not match field value: [happy fail!!]",
          "caused_by" : {
            "type" : "illegal_argument_exception",
            "reason" : "Provided Grok expressions do not match field value: [happy fail!!]"
          }
        },
        "header" : {
          "processor_type" : "grok"
        }
      }
    }
  ]
}

在实际应用程序中,失败的 grok 处理器异常将阻止您的文档被索引。出于这个原因,当你设计你的 grok 模式时,一定要在一个大的子集上测试它。

See also

Using the ingest attachment plugin

在 5.x 之前的 Elasticsearch 中,使用附件映射器很容易使集群无响应。从文档中提取元数据需要非常高的 CPU 操作,如果您正在摄取大量文档,则您的集群负载不足。

为了防止这种情况,Elasticsearch 引入了摄取节点。摄取节点可以承受非常高的压力,而不会对 Elasticsearch 集群的其余部分造成问题。

附件处理器允许我们在摄取节点中使用 Tika 的文档提取功能。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在第 1 章, 开始

要执行命令,可以使用每个 HTTP 客户端,例如 curl (https://curl.haxx.se/)、postman (https://www.getpostman.com/),或类似的。使用 Kibana 控制台,因为它为 Elasticsearch 提供了代码完成和更好的字符转义。

How to do it...

要能够使用摄取附件处理器,请执行以下步骤:

  1. Install ingest-attachment as a plugin, by using the following command:
bi
n/elasticsearch-plugin install ingest-attachment

输出将类似于以下内容:

-> Downloading ingest-attachment from elastic
 [=================================================] 100%??
 @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
 @ WARNING: plugin requires additional permissions @
 @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
 * java.lang.RuntimePermission getClassLoader
 * java.lang.reflect.ReflectPermission suppressAccessChecks
 * java.security.SecurityPermission createAccessControlContext
 * java.security.SecurityPermission insertProvider
 * java.security.SecurityPermission putProviderProperty.BC
 Continue with the installation? [y/n] y
 -> Installed ingest-attachment

您必须接受安全权限才能成功完成 安装。

http://docs.oracle.com/javase/8/docs/technotes/ guides/security/permissions.html有关允许的权限和相关风险的更多详细信息。
  1. After installing a new plugin, your node must be restarted to be able to load it. Now, you can create a pipeline ingest with the attachment processor:
PUT /_ingest/pipeline/attachment
{
  "description": "Extract data from an attachment via Tika",
  "processors": [
    {
      "attachment": {
        "field": "data"
      }
    }
  ],
  "version": 1
}

如果一切正常,您应该会收到以下消息:

{
  "acknowledged" : true
}
  1. Now, we can index a document using a pipeline:
PUT /my_index/my_type/my_id?pipeline=attachment
{
  "data": "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0="
}
  1. Now, we can recall it:
GET /my_index/_doc/my_id

结果如下:

{
  "_index" : "my_index",
  "_type" : "_doc",
  "_id" : "my_id",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "data" : "e1xydGYxXGFuc2kNCkxvcmVtIGlwc3VtIGRvbG9yIHNpdCBhbWV0DQpccGFyIH0=",
    "attachment" : {
      "content_type" : "application/rtf",
      "language" : "ro",
      "content" : "Lorem ipsum dolor sit amet",
      "content_length" : 28
    }
  }
}

How it works...

附件摄取处理器由必须安装的单独插件提供。安装后,它就像所有其他处理器一样工作。控制它的属性如下:

  • field: This is the field that will contain the base 64 representation of the binary data.
  • target_field: This will hold the attachment information (default attachment).
  • indexed_char: This is the number of characters to be extracted to prevent very huge fields. If it is set to -1, all the characters are extracted (default 100000).
  • properties: Other metadata fields of the document that need to be extracted. They can be content, title, name, author, keywords, date, content_type, content_length, and language (default all).

Using the ingest GeoIP plugin

另一个有趣的处理器是 GeoIP 插件,它允许我们将 IP 地址映射到 GeoPoint 和其他位置数据。

Getting ready

您需要一个正常运行的 Elasticsearch 安装,正如我们在第 1 章, 入门

要执行命令,可以使用每个 HTTP 客户端,例如 curl (https://curl.haxx.se/)、postman (https://www.getpostman.com/),或类似的。使用 Kibana 控制台,因为它为 Elasticsearch 提供了代码完成和更好的字符转义。

How to do it...

为了能够使用摄取 GeoIP 处理器,请执行以下步骤:

  1. Install the ingest GeoIP processor as a plugin using the following command:
bin/elasticsearch-plugin install ingest-geoip

输出将类似于以下内容:

-> Downloading ingest-geoip from elastic
 [=================================================] 100%??
 @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
 @ WARNING: plugin requires additional permissions @
 @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
 * java.lang.RuntimePermission accessDeclaredMembers
 See http://docs.oracle.com/javase/8/docs/technotes/guides/
 security/permissions.html
 for descriptions of what these permissions allow and the
 associated risks.
 Continue with the installation? [y/n] y.
 -> Installed ingest-geoip

您必须接受安全权限才能成功 完成安装。

  1. After installing a new plugin, your node must be restarted to be able to load it.
  2. Now, you can create a pipeline ingest with the attachment processor:
PUT /_ingest/pipeline/geoip
{
  "description": "Extract geopoint from an IP",
  "processors": [
    {
      "geoip": {
        "field": "ip"
      }
    }
  ],
  "version": 1
}
  1. If everything is okay, you should receive the following message:
{
  "acknowledged" : true
}
  1. Now, we can index a document using a pipeline:
PUT /my_index/_doc/my_id?pipeline=geoip
{
  "ip": "8.8.8.8"
}
  1. Then, we can recall it:
GET /my_index/_doc/my_id

结果如下:

{
  "_index" : "my_index",
  "_type" : "_doc",
  "_id" : "my_id",
  "_version" : 3,
  "found" : true,
  "_source" : {
    "geoip" : {
      "continent_name" : "North America",
      "country_iso_code" : "US",
      "location" : {
        "lon" : -97.822,
        "lat" : 37.751
      }
    },
    "ip" : "8.8.8.8"
  }
}

How it works...

GeoIP 摄取处理器由必须安装的单独插件提供。

它使用来自 MaxMind 数据库的数据来提取有关 IP 地址地理位置的信息。默认情况下,此处理器将此信息添加到 geoip 字段下。 GeoIP 处理器可以解析 IPv4 和 IPv6 地址。

安装后,它就像所有其他处理器一样工作。控制它的属性如下:

  • field: This is the field that will contain the IP from which the geo data is extracted.
  • target_field: This will hold the geoip information (the default is geoip).
  • database_file: This is the database file that contains maps from ip to geolocations. The default one is installed during the plugin's installation (default GeoLite2-City.mmdb).
  • properties: The properties values depends on the database. You should refer to the database description for details on the extracted fields (the default is all).

See also