A pipeline is a definition of a series of processors that are to be executed
in the same order as they are declared. A pipeline consists of two main fields: a description
and a list of processors
:
{
"description" : "...",
"processors" : [ ... ]
}
The description
is a special field to store a helpful description of
what the pipeline does.
The processors
parameter defines a list of processors to be executed in
order.
The following ingest APIs are available for managing pipelines:
-
Put Pipeline API to add or update a pipeline
-
Get Pipeline API to return a specific pipeline
-
Delete Pipeline API to delete a pipeline
-
Simulate Pipeline API to simulate a call to a pipeline
The put pipeline API adds pipelines and updates existing pipelines in the cluster.
PUT _ingest/pipeline/my-pipeline-id
{
"description" : "describe pipeline",
"processors" : [
{
"set" : {
"field": "foo",
"value": "bar"
}
}
]
}
Note
|
The put pipeline API also instructs all ingest nodes to reload their in-memory representation of pipelines, so that pipeline changes take effect immediately. |
The get pipeline API returns pipelines based on ID. This API always returns a local reference of the pipeline.
GET _ingest/pipeline/my-pipeline-id
Example response:
{
"my-pipeline-id" : {
"description" : "describe pipeline",
"processors" : [
{
"set" : {
"field" : "foo",
"value" : "bar"
}
}
]
}
}
For each returned pipeline, the source and the version are returned. The version is useful for knowing which version of the pipeline the node has. You can specify multiple IDs to return more than one pipeline. Wildcards are also supported.
Pipelines can optionally add a version
number, which can be any integer value,
in order to simplify pipeline management by external systems. The version
field is completely optional and it is meant solely for external management of
pipelines. To unset a version
, simply replace the pipeline without specifying
one.
PUT _ingest/pipeline/my-pipeline-id
{
"description" : "describe pipeline",
"version" : 123,
"processors" : [
{
"set" : {
"field": "foo",
"value": "bar"
}
}
]
}
To check for the version
, you can
filter responses
using filter_path
to limit the response to just the version
:
GET /_ingest/pipeline/my-pipeline-id?filter_path=*.version
This should give a small response that makes it both easy and inexpensive to parse:
{
"my-pipeline-id" : {
"version" : 123
}
}
The delete pipeline API deletes pipelines by ID or wildcard match (my-
, ).
DELETE _ingest/pipeline/my-pipeline-id
The simulate pipeline API executes a specific pipeline against the set of documents provided in the body of the request.
You can either specify an existing pipeline to execute against the provided documents, or supply a pipeline definition in the body of the request.
Here is the structure of a simulate request with a pipeline definition provided in the body of the request:
POST _ingest/pipeline/_simulate
{
"pipeline" : {
// pipeline definition here
},
"docs" : [
{ "_source": {/** first document **/} },
{ "_source": {/** second document **/} },
// ...
]
}
Here is the structure of a simulate request against an existing pipeline:
POST _ingest/pipeline/my-pipeline-id/_simulate
{
"docs" : [
{ "_source": {/** first document **/} },
{ "_source": {/** second document **/} },
// ...
]
}
Here is an example of a simulate request with a pipeline defined in the request and its response:
POST _ingest/pipeline/_simulate
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value" : "_value"
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "_doc",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_type": "_doc",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
Response:
{
"docs": [
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.187Z"
}
}
},
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value",
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:30:03.188Z"
}
}
}
]
}
You can use the simulate pipeline API to see how each processor affects the ingest document
as it passes through the pipeline. To see the intermediate results of
each processor in the simulate request, you can add the verbose
parameter
to the request.
Here is an example of a verbose request and its response:
POST _ingest/pipeline/_simulate?verbose
{
"pipeline" :
{
"description": "_description",
"processors": [
{
"set" : {
"field" : "field2",
"value" : "_value2"
}
},
{
"set" : {
"field" : "field3",
"value" : "_value3"
}
}
]
},
"docs": [
{
"_index": "index",
"_type": "_doc",
"_id": "id",
"_source": {
"foo": "bar"
}
},
{
"_index": "index",
"_type": "_doc",
"_id": "id",
"_source": {
"foo": "rab"
}
}
]
}
Response:
{
"docs": [
{
"processor_results": [
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value2",
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.674Z"
}
}
},
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field3": "_value3",
"field2": "_value2",
"foo": "bar"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.675Z"
}
}
}
]
},
{
"processor_results": [
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field2": "_value2",
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.676Z"
}
}
},
{
"doc": {
"_id": "id",
"_index": "index",
"_type": "_doc",
"_source": {
"field3": "_value3",
"field2": "_value2",
"foo": "rab"
},
"_ingest": {
"timestamp": "2017-05-04T22:46:09.677Z"
}
}
}
]
}
]
}
The processors in a pipeline have read and write access to documents that pass through the pipeline. The processors can access fields in the source of a document and the document’s metadata fields.
Accessing a field in the source is straightforward. You simply refer to fields by their name. For example:
{
"set": {
"field": "my_field",
"value": 582.1
}
}
On top of this, fields from the source are always accessible via the _source
prefix:
{
"set": {
"field": "_source.my_field",
"value": 582.1
}
}
You can access metadata fields in the same way that you access fields in the source. This is possible because Elasticsearch doesn’t allow fields in the source that have the same name as metadata fields.
The following example sets the _id
metadata field of a document to 1
:
{
"set": {
"field": "_id",
"value": "1"
}
}
The following metadata fields are accessible by a processor: _index
, _type
, _id
, _routing
.
Beyond metadata fields and source fields, ingest also adds ingest metadata to the documents that it processes.
These metadata properties are accessible under the _ingest
key. Currently ingest adds the ingest timestamp
under the _ingest.timestamp
key of the ingest metadata. The ingest timestamp is the time when Elasticsearch
received the index or bulk request to pre-process the document.
Any processor can add ingest-related metadata during document processing. Ingest metadata is transient and is lost after a document has been processed by the pipeline. Therefore, ingest metadata won’t be indexed.
The following example adds a field with the name received
. The value is the ingest timestamp:
{
"set": {
"field": "received",
"value": "{{_ingest.timestamp}}"
}
}
Unlike Elasticsearch metadata fields, the ingest metadata field name _ingest
can be used as a valid field name
in the source of a document. Use _source._ingest
to refer to the field in the source document. Otherwise, _ingest
will be interpreted as an ingest metadata field.
A number of processor settings also support templating. Settings that support templating can have zero or more
template snippets. A template snippet begins with {{
and ends with }}
.
Accessing fields and metafields in templates is exactly the same as via regular processor field settings.
The following example adds a field named field_c
. Its value is a concatenation of
the values of field_a
and field_b
.
{
"set": {
"field": "field_c",
"value": "{{field_a}} {{field_b}}"
}
}
The following example uses the value of the geoip.country_iso_code
field in the source
to set the index that the document will be indexed into:
{
"set": {
"field": "_index",
"value": "{{geoip.country_iso_code}}"
}
}
Dynamic field names are also supported. This example sets the field named after the
value of service
to the value of the field code
:
{
"set": {
"field": "{{service}}",
"value": "{{code}}"
}
}
In its simplest use case, a pipeline defines a list of processors that are executed sequentially, and processing halts at the first exception. This behavior may not be desirable when failures are expected. For example, you may have logs that don’t match the specified grok expression. Instead of halting execution, you may want to index such documents into a separate index.
To enable this behavior, you can use the on_failure
parameter. The on_failure
parameter
defines a list of processors to be executed immediately following the failed processor.
You can specify this parameter at the pipeline level, as well as at the processor
level. If a processor specifies an on_failure
configuration, whether
it is empty or not, any exceptions that are thrown by the processor are caught, and the
pipeline continues executing the remaining processors. Because you can define further processors
within the scope of an on_failure
statement, you can nest failure handling.
The following example defines a pipeline that renames the foo
field in
the processed document to bar
. If the document does not contain the foo
field, the processor
attaches an error message to the document for later analysis within
Elasticsearch.
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "bar",
"on_failure" : [
{
"set" : {
"field" : "error",
"value" : "field \"foo\" does not exist, cannot rename to \"bar\""
}
}
]
}
}
]
}
The following example defines an on_failure
block on a whole pipeline to change
the index to which failed documents get sent.
{
"description" : "my first pipeline with handled exceptions",
"processors" : [ ... ],
"on_failure" : [
{
"set" : {
"field" : "_index",
"value" : "failed-{{ _index }}"
}
}
]
}
Alternatively instead of defining behaviour in case of processor failure, it is also possible
to ignore a failure and continue with the next processor by specifying the ignore_failure
setting.
In case in the example below the field foo
doesn’t exist the failure will be caught and the pipeline
continues to execute, which in this case means that the pipeline does nothing.
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "bar",
"ignore_failure" : true
}
}
]
}
The ignore_failure
can be set on any processor and defaults to false
.
You may want to retrieve the actual error message that was thrown
by a failed processor. To do so you can access metadata fields called
on_failure_message
, on_failure_processor_type
, and on_failure_processor_tag
. These fields are only accessible
from within the context of an on_failure
block.
Here is an updated version of the example that you
saw earlier. But instead of setting the error message manually, the example leverages the on_failure_message
metadata field to provide the error message.
{
"description" : "my first pipeline with handled exceptions",
"processors" : [
{
"rename" : {
"field" : "foo",
"to" : "bar",
"on_failure" : [
{
"set" : {
"field" : "error",
"value" : "{{ _ingest.on_failure_message }}"
}
}
]
}
}
]
}
All processors are defined in the following way within a pipeline definition:
{
"PROCESSOR_NAME" : {
... processor configuration options ...
}
}
Each processor defines its own configuration parameters, but all processors have
the ability to declare tag
and on_failure
fields. These fields are optional.
A tag
is simply a string identifier of the specific instantiation of a certain
processor in a pipeline. The tag
field does not affect the processor’s behavior,
but is very useful for bookkeeping and tracing errors to specific processors.
See Handling Failures in Pipelines to learn more about the on_failure
field and error handling in pipelines.
The node info API can be used to figure out what processors are available in a cluster. The node info API will provide a per node list of what processors are available.
Custom processors must be installed on all nodes. The put pipeline API will fail if a processor specified in a pipeline
doesn’t exist on all nodes. If you rely on custom processor plugins make sure to mark these plugins as mandatory by adding
plugin.mandatory
setting to the config/elasticsearch.yml
file, for example:
plugin.mandatory: ingest-attachment,ingest-geoip
A node will not start if either of these plugins are not available.
The node stats API can be used to fetch ingest usage statistics, globally and on a per pipeline basis. Useful to find out which pipelines are used the most or spent the most time on preprocessing.
Appends one or more values to an existing array if the field already exists and it is an array. Converts a scalar to an array and appends one or more values to it if the field exists and it is a scalar. Creates an array containing the provided values if the field doesn’t exist. Accepts a single value or an array of values.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to be appended to |
|
yes |
- |
The value to be appended |
{
"append": {
"field": "field1",
"value": ["item2", "item3", "item4"]
}
}
Converts an existing field’s value to a different type, such as converting a string to an integer. If the field value is an array, all members will be converted.
The supported types include: integer
, long
, float
, double
, string
, boolean
, and auto
.
Specifying boolean
will set the field to true if its string value is equal to true
(ignore case), to
false if its string value is equal to false
(ignore case), or it will throw an exception otherwise.
Specifying auto
will attempt to convert the string-valued field
into the closest non-string type.
For example, a field whose value is "true"
will be converted to its respective boolean type: true
. Do note
that float takes precedence of double in auto
. A value of "242.15"
will "automatically" be converted to
242.15
of type float
. If a provided field cannot be appropriately converted, the Convert Processor will
still process successfully and leave the field value as-is. In such a case, target_field
will
still be updated with the unconverted field value.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field whose value is to be converted |
|
no |
|
The field to assign the converted value to, by default |
|
yes |
- |
The type to convert the existing value to |
|
no |
|
If |
{
"convert": {
"field" : "foo",
"type": "integer"
}
}
Parses dates from fields, and then uses the date or timestamp as the timestamp for the document.
By default, the date processor adds the parsed date as a new field called @timestamp
. You can specify a
different field by setting the target_field
configuration parameter. Multiple date formats are supported
as part of the same date processor definition. They will be used sequentially to attempt parsing the date field,
in the same order they were defined as part of the processor definition.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to get the date from. |
|
no |
@timestamp |
The field that will hold the parsed date. |
|
yes |
- |
An array of the expected date formats. Can be a Joda pattern or one of the following formats: ISO8601, UNIX, UNIX_MS, or TAI64N. |
|
no |
UTC |
The timezone to use when parsing the date. |
|
no |
ENGLISH |
The locale to use when parsing the date, relevant when parsing month names or week days. |
Here is an example that adds the parsed date to the timestamp
field based on the initial_date
field:
{
"description" : "...",
"processors" : [
{
"date" : {
"field" : "initial_date",
"target_field" : "timestamp",
"formats" : ["dd/MM/yyyy hh:mm:ss"],
"timezone" : "Europe/Amsterdam"
}
}
]
}
The timezone
and locale
processor parameters are templated. This means that their values can be
extracted from fields within documents. The example below shows how to extract the locale/timezone
details from existing fields, my_timezone
and my_locale
, in the ingested document that contain
the timezone and locale values.
{
"description" : "...",
"processors" : [
{
"date" : {
"field" : "initial_date",
"target_field" : "timestamp",
"formats" : ["ISO8601"],
"timezone" : "{{ my_timezone }}",
"locale" : "{{ my_locale }}"
}
}
]
}
The purpose of this processor is to point documents to the right time based index based on a date or timestamp field in a document by using the date math index name support.
The processor sets the _index
meta field with a date math index name expression based on the provided index name
prefix, a date or timestamp field in the documents being processed and the provided date rounding.
First, this processor fetches the date or timestamp from a field in the document being processed. Optionally, date formatting can be configured on how the field’s value should be parsed into a date. Then this date, the provided index name prefix and the provided date rounding get formatted into a date math index name expression. Also here optionally date formatting can be specified on how the date should be formatted into a date math index name expression.
An example pipeline that points documents to a monthly index that starts with a myindex-
prefix based on a
date in the date1
field:
PUT _ingest/pipeline/monthlyindex
{
"description": "monthly date-time index naming",
"processors" : [
{
"date_index_name" : {
"field" : "date1",
"index_name_prefix" : "myindex-",
"date_rounding" : "M"
}
}
]
}
Using that pipeline for an index request:
PUT /myindex/_doc/1?pipeline=monthlyindex
{
"date1" : "2016-04-25T12:02:01.789Z"
}
{
"_index" : "myindex-2016-04-01",
"_type" : "_doc",
"_id" : "1",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"_seq_no" : 0,
"_primary_term" : 1
}
The above request will not index this document into the myindex
index, but into the myindex-2016-04-01
index because
it was rounded by month. This is because the date-index-name-processor overrides the _index
property of the document.
To see the date-math value of the index supplied in the actual index request which resulted in the above document being
indexed into myindex-2016-04-01
we can inspect the effects of the processor using a simulate request.
POST _ingest/pipeline/_simulate
{
"pipeline" :
{
"description": "monthly date-time index naming",
"processors" : [
{
"date_index_name" : {
"field" : "date1",
"index_name_prefix" : "myindex-",
"date_rounding" : "M"
}
}
]
},
"docs": [
{
"_source": {
"date1": "2016-04-25T12:02:01.789Z"
}
}
]
}
and the result:
{
"docs" : [
{
"doc" : {
"_id" : "_id",
"_index" : "<myindex-{2016-04-25||/M{yyyy-MM-dd|UTC}}>",
"_type" : "_type",
"_source" : {
"date1" : "2016-04-25T12:02:01.789Z"
},
"_ingest" : {
"timestamp" : "2016-11-08T19:43:03.850+0000"
}
}
}
]
}
The above example shows that _index
was set to <myindex-{2016-04-25||/M{yyyy-MM-dd|UTC}}>
. Elasticsearch
understands this to mean 2016-04-01
as is explained in the date math index name documentation
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to get the date or timestamp from. |
|
no |
- |
A prefix of the index name to be prepended before the printed date. |
|
yes |
- |
How to round the date when formatting the date into the index name. Valid values are: |
|
no |
yyyy-MM-dd’T’HH:mm:ss.SSSZ |
An array of the expected date formats for parsing dates / timestamps in the document being preprocessed. Can be a Joda pattern or one of the following formats: ISO8601, UNIX, UNIX_MS, or TAI64N. |
|
no |
UTC |
The timezone to use when parsing the date and when date math index supports resolves expressions into concrete index names. |
|
no |
ENGLISH |
The locale to use when parsing the date from the document being preprocessed, relevant when parsing month names or week days. |
|
no |
yyyy-MM-dd |
The format to be used when printing the parsed date into the index name. An valid Joda pattern is expected here. |
Raises an exception. This is useful for when you expect a pipeline to fail and want to relay a specific message to the requester.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The error message of the |
{
"fail": {
"message": "an error message"
}
}
Processes elements in an array of unknown length.
All processors can operate on elements inside an array, but if all elements of an array need to
be processed in the same way, defining a processor for each element becomes cumbersome and tricky
because it is likely that the number of elements in an array is unknown. For this reason the foreach
processor exists. By specifying the field holding array elements and a processor that
defines what should happen to each element, array fields can easily be preprocessed.
A processor inside the foreach processor works in the array element context and puts that in the ingest metadata
under the _ingest._value
key. If the array element is a json object it holds all immediate fields of that json object.
and if the nested object is a value is _ingest._value
just holds that value. Note that if a processor prior to the
foreach
processor used _ingest._value
key then the specified value will not be available to the processor inside
the foreach
processor. The foreach
processor does restore the original value, so that value is available to processors
after the foreach
processor.
Note that any other field from the document are accessible and modifiable like with all other processors. This processor
just puts the current array element being read into _ingest._value
ingest metadata attribute, so that it may be
pre-processed.
If the foreach
processor fails to process an element inside the array, and no on_failure
processor has been specified,
then it aborts the execution and leaves the array unmodified.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The array field |
|
yes |
- |
The processor to execute against each field |
Assume the following document:
{
"values" : ["foo", "bar", "baz"]
}
When this foreach
processor operates on this sample document:
{
"foreach" : {
"field" : "values",
"processor" : {
"uppercase" : {
"field" : "_ingest._value"
}
}
}
}
Then the document will look like this after preprocessing:
{
"values" : ["FOO", "BAR", "BAZ"]
}
Let’s take a look at another example:
{
"persons" : [
{
"id" : "1",
"name" : "John Doe"
},
{
"id" : "2",
"name" : "Jane Doe"
}
]
}
In this case, the id
field needs to be removed,
so the following foreach
processor is used:
{
"foreach" : {
"field" : "persons",
"processor" : {
"remove" : {
"field" : "_ingest._value.id"
}
}
}
}
After preprocessing the result is:
{
"persons" : [
{
"name" : "John Doe"
},
{
"name" : "Jane Doe"
}
]
}
The wrapped processor can have a on_failure
definition.
For example, the id
field may not exist on all person objects.
Instead of failing the index request, you can use an on_failure
block to send the document to the 'failure_index' index for later inspection:
{
"foreach" : {
"field" : "persons",
"processor" : {
"remove" : {
"field" : "_value.id",
"on_failure" : [
{
"set" : {
"field", "_index",
"value", "failure_index"
}
}
]
}
}
}
}
In this example, if the remove
processor does fail, then
the array elements that have been processed thus far will
be updated.
Another advanced example can be found in the {plugins}/ingest-attachment-with-arrays.html[attachment processor documentation].
Extracts structured fields out of a single text field within a document. You choose which field to extract matched fields from, as well as the grok pattern you expect will match. A grok pattern is like a regular expression that supports aliased expressions that can be reused.
This tool is perfect for syslog logs, apache and other webserver logs, mysql logs, and in general, any log format that is generally written for humans and not computer consumption. This processor comes packaged with many reusable patterns.
If you need help building patterns to match your logs, you will find the {kibana-ref}/xpack-grokdebugger.html[Grok Debugger] tool quite useful! The Grok Debugger is an {xpack} feature under the Basic License and is therefore free to use. The Grok Constructor at http://grokconstructor.appspot.com/ is also a useful tool.
Grok sits on top of regular expressions, so any regular expressions are valid in grok as well. The regular expression library is Oniguruma, and you can see the full supported regexp syntax on the Onigiruma site.
Grok works by leveraging this regular expression language to allow naming existing patterns and combining them into more complex patterns that match your fields.
The syntax for reusing a grok pattern comes in three forms: %{SYNTAX:SEMANTIC}
, %{SYNTAX}
, %{SYNTAX:SEMANTIC:TYPE}
.
The SYNTAX
is the name of the pattern that will match your text. For example, 3.44
will be matched by the NUMBER
pattern and 55.3.244.1
will be matched by the IP
pattern. The syntax is how you match. NUMBER
and IP
are both
patterns that are provided within the default patterns set.
The SEMANTIC
is the identifier you give to the piece of text being matched. For example, 3.44
could be the
duration of an event, so you could call it simply duration
. Further, a string 55.3.244.1
might identify
the client
making a request.
The TYPE
is the type you wish to cast your named field. int
, long
, double
, float
and boolean
are supported types for coercion.
For example, you might want to match the following text:
3.44 55.3.244.1
You may know that the message in the example is a number followed by an IP address. You can match this text by using the following Grok expression.
%{NUMBER:duration} %{IP:client}
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to use for grok expression parsing |
|
yes |
- |
An ordered list of grok expression to match and extract named captures with. Returns on the first expression in the list that matches. |
|
no |
- |
A map of pattern-name and pattern tuples defining custom patterns to be used by the current processor. Patterns matching existing names will override the pre-existing definition. |
|
no |
false |
when true, |
|
no |
false |
If |
Here is an example of using the provided patterns to extract out and name structured fields from a string field in a document.
{
"message": "55.3.244.1 GET /index.html 15824 0.043"
}
The pattern for this could be:
%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}
Here is an example pipeline for processing the above document by using Grok:
{
"description" : "...",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"]
}
}
]
}
This pipeline will insert these named captures as new fields within the document, like so:
{
"message": "55.3.244.1 GET /index.html 15824 0.043",
"client": "55.3.244.1",
"method": "GET",
"request": "/index.html",
"bytes": 15824,
"duration": "0.043"
}
The Grok processor comes pre-packaged with a base set of pattern. These patterns may not always have what you are looking for. Pattern have a very basic format. Each entry describes has a name and the pattern itself.
You can add your own patterns to a processor definition under the pattern_definitions
option.
Here is an example of a pipeline specifying custom pattern definitions:
{
"description" : "...",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["my %{FAVORITE_DOG:dog} is colored %{RGB:color}"],
"pattern_definitions" : {
"FAVORITE_DOG" : "beagle",
"RGB" : "RED|GREEN|BLUE"
}
}
}
]
}
Sometimes one pattern is not enough to capture the potential structure of a field. Let’s assume we
want to match all messages that contain your favorite pet breeds of either cats or dogs. One way to accomplish
this is to provide two distinct patterns that can be matched, instead of one really complicated expression capturing
the same or
behavior.
Here is an example of such a configuration executed against the simulate API:
POST _ingest/pipeline/_simulate
{
"pipeline": {
"description" : "parse multiple patterns",
"processors": [
{
"grok": {
"field": "message",
"patterns": ["%{FAVORITE_DOG:pet}", "%{FAVORITE_CAT:pet}"],
"pattern_definitions" : {
"FAVORITE_DOG" : "beagle",
"FAVORITE_CAT" : "burmese"
}
}
}
]
},
"docs":[
{
"_source": {
"message": "I love burmese cats!"
}
}
]
}
response:
{
"docs": [
{
"doc": {
"_type": "_type",
"_index": "_index",
"_id": "_id",
"_source": {
"message": "I love burmese cats!",
"pet": "burmese"
},
"_ingest": {
"timestamp": "2016-11-08T19:43:03.850+0000"
}
}
}
]
}
Both patterns will set the field pet
with the appropriate match, but what if we want to trace which of our
patterns matched and populated our fields? We can do this with the trace_match
parameter. Here is the output of
that same pipeline, but with "trace_match": true
configured:
{
"docs": [
{
"doc": {
"_type": "_type",
"_index": "_index",
"_id": "_id",
"_source": {
"message": "I love burmese cats!",
"pet": "burmese"
},
"_ingest": {
"_grok_match_index": "1",
"timestamp": "2016-11-08T19:43:03.850+0000"
}
}
}
]
}
In the above response, you can see that the index of the pattern that matched was "1"
. This is to say that it was the
second (index starts at zero) pattern in patterns
to match.
This trace metadata enables debugging which of the patterns matched. This information is stored in the ingest metadata and will not be indexed.
The Grok Processor comes packaged with its own REST endpoint for retrieving which patterns the processor is packaged with.
GET _ingest/processor/grok
The above request will return a response body containing a key-value representation of the built-in patterns dictionary.
{
"patterns" : {
"BACULA_CAPACITY" : "%{INT}{1,3}(,%{INT}{3})*",
"PATH" : "(?:%{UNIXPATH}|%{WINPATH})",
...
}
This can be useful to reference as the built-in patterns change across versions.
Grok expressions that take too long to execute are interrupted and the grok processor then fails with an exception. The grok processor has a watchdog thread that determines when evaluation of a grok expression takes too long and is controlled by the following settings:
Name | Default | Description |
---|---|---|
|
1s |
How often to check whether there are grok evaluations that take longer than the maximum allowed execution time. |
|
1s |
The maximum allowed execution of a grok expression evaluation. |
Converts a string field by applying a regular expression and a replacement. If the field is not a string, the processor will throw an exception.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to apply the replacement to |
|
yes |
- |
The pattern to be replaced |
|
yes |
- |
The string to replace the matching patterns with |
|
no |
|
The field to assign the converted value to, by default |
|
no |
|
If |
{
"gsub": {
"field": "field1",
"pattern": "\.",
"replacement": "-"
}
}
Joins each element of an array into a single string using a separator character between each element. Throws an error when the field is not an array.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to be separated |
|
yes |
- |
The separator character |
|
no |
|
The field to assign the joined value to, by default |
{
"join": {
"field": "joined_array_field",
"separator": "-"
}
}
Converts a JSON string into a structured JSON object.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to be parsed |
|
no |
|
The field to insert the converted structured object into |
|
no |
false |
Flag that forces the serialized json to be injected into the top level of the document. |
All JSON-supported types will be parsed (null, boolean, number, array, object, string).
Suppose you provide this configuration of the json
processor:
{
"json" : {
"field" : "string_source",
"target_field" : "json_target"
}
}
If the following document is processed:
{
"string_source": "{\"foo\": 2000}"
}
after the json
processor operates on it, it will look like:
{
"string_source": "{\"foo\": 2000}",
"json_target": {
"foo": 2000
}
}
If the following configuration is provided, omitting the optional target_field
setting:
{
"json" : {
"field" : "source_and_target"
}
}
then after the json
processor operates on this document:
{
"source_and_target": "{\"foo\": 2000}"
}
it will look like:
{
"source_and_target": {
"foo": 2000
}
}
This illustrates that, unless it is explicitly named in the processor configuration, the target_field
is the same field provided in the required field
configuration.
This processor helps automatically parse messages (or specific event fields) which are of the foo=bar variety.
For example, if you have a log message which contains ip=1.2.3.4 error=REFUSED
, you can parse those automatically by configuring:
{
"kv": {
"field": "message",
"field_split": " ",
"value_split": "="
}
}
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to be parsed |
|
yes |
- |
Regex pattern to use for splitting key-value pairs |
|
yes |
- |
Regex pattern to use for splitting the key from the value within a key-value pair |
|
no |
|
The field to insert the extracted keys into. Defaults to the root of the document |
|
no |
|
List of keys to filter and insert into document. Defaults to including all keys |
|
no |
|
List of keys to exclude from document |
|
no |
|
If |
Converts a string to its lowercase equivalent.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to make lowercase |
|
no |
|
The field to assign the converted value to, by default |
|
no |
|
If |
{
"lowercase": {
"field": "foo"
}
}
Removes existing fields. If one field doesn’t exist, an exception will be thrown.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
Fields to be removed |
Here is an example to remove a single field:
{
"remove": {
"field": "foo"
}
}
To remove multiple fields, you can use the following query:
{
"remove": {
"field": ["foo", "bar"]
}
}
Renames an existing field. If the field doesn’t exist or the new name is already used, an exception will be thrown.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to be renamed |
|
yes |
- |
The new name of the field |
|
no |
|
If |
{
"rename": {
"field": "foo",
"target_field": "foobar"
}
}
Allows inline and stored scripts to be executed within ingest pipelines.
See How to use scripts to learn more about writing scripts. The Script Processor leverages caching of compiled scripts for improved performance. Since the script specified within the processor is potentially re-compiled per document, it is important to understand how script caching works. To learn more about caching see Script Caching.
Name | Required | Default | Description |
---|---|---|---|
|
no |
"painless" |
The scripting language |
|
no |
- |
The stored script id to refer to |
|
no |
- |
An inline script to be executed |
|
no |
- |
Script Parameters |
One of id
or source
options must be provided in order to properly reference a script to execute.
You can access the current ingest document from within the script context by using the ctx
variable.
The following example sets a new field called field_a_plus_b_times_c
to be the sum of two existing
numeric fields field_a
and field_b
multiplied by the parameter param_c:
{
"script": {
"lang": "painless",
"source": "ctx.field_a_plus_b_times_c = (ctx.field_a + ctx.field_b) * params.param_c",
"params": {
"param_c": 10
}
}
}
It is possible to use the Script Processor to manipulate document metadata like _index
and _type
during
ingestion. Here is an example of an Ingest Pipeline that renames the index and type to my_index
no matter what
was provided in the original index request:
PUT _ingest/pipeline/my_index
{
"description": "use index:my_index and type:_doc",
"processors": [
{
"script": {
"source": """
ctx._index = 'my_index';
ctx._type = '_doc';
"""
}
}
]
}
Using the above pipeline, we can attempt to index a document into the any_index
index.
PUT any_index/_doc/1?pipeline=my_index
{
"message": "text"
}
The response from the above index request:
{
"_index": "my_index",
"_type": "_doc",
"_id": "1",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"_seq_no": 0,
"_primary_term": 1,
}
In the above response, you can see that our document was actually indexed into my_index
instead of
any_index
. This type of manipulation is often convenient in pipelines that have various branches of transformation,
and depending on the progress made, indexed into different indices.
Sets one field and associates it with the specified value. If the field already exists, its value will be replaced with the provided one.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to insert, upsert, or update |
|
yes |
- |
The value to be set for the field |
|
no |
true |
If processor will update fields with pre-existing non-null-valued field. When set to |
{
"set": {
"field": "field1",
"value": 582.1
}
}
Splits a field into an array using a separator character. Only works on string fields.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to split |
|
yes |
- |
A regex which matches the separator, eg |
|
no |
|
The field to assign the split value to, by default |
|
no |
|
If |
{
"split": {
"field": "my_field",
"separator": "\\s+" (1)
}
}
-
Treat all consecutive whitespace characters as a single separator
Sorts the elements of an array ascending or descending. Homogeneous arrays of numbers will be sorted numerically, while arrays of strings or heterogeneous arrays of strings + numbers will be sorted lexicographically. Throws an error when the field is not an array.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to be sorted |
|
no |
|
The sort order to use. Accepts |
|
no |
|
The field to assign the sorted value to, by default |
{
"sort": {
"field": "field_to_sort",
"order": "desc"
}
}
Trims whitespace from field.
Note
|
This only works on leading and trailing whitespace. |
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The string-valued field to trim whitespace from |
|
no |
|
The field to assign the trimmed value to, by default |
|
no |
|
If |
{
"trim": {
"field": "foo"
}
}
Converts a string to its uppercase equivalent.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to make uppercase |
|
no |
|
The field to assign the converted value to, by default |
|
no |
|
If |
{
"uppercase": {
"field": "foo"
}
}
Expands a field with dots into an object field. This processor allows fields with dots in the name to be accessible by other processors in the pipeline. Otherwise these fields can’t be accessed by any processor.
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to expand into an object field |
|
no |
- |
The field that contains the field to expand. Only required if the field to expand is part another object field, because the |
{
"dot_expander": {
"field": "foo.bar"
}
}
For example the dot expand processor would turn this document:
{
"foo.bar" : "value"
}
into:
{
"foo" : {
"bar" : "value"
}
}
If there is already a bar
field nested under foo
then
this processor merges the foo.bar
field into it. If the field is
a scalar value then it will turn that field into an array field.
For example, the following document:
{
"foo.bar" : "value2",
"foo" : {
"bar" : "value1"
}
}
is transformed by the dot_expander
processor into:
{
"foo" : {
"bar" : ["value1", "value2"]
}
}
If any field outside of the leaf field conflicts with a pre-existing field of the same name, then that field needs to be renamed first.
Consider the following document:
{
"foo": "value1",
"foo.bar": "value2"
}
Then the foo
needs to be renamed first before the dot_expander
processor is applied. So in order for the foo.bar
field to properly
be expanded into the bar
field under the foo
field the following
pipeline should be used:
{
"processors" : [
{
"rename" : {
"field" : "foo",
"target_field" : "foo.bar""
}
},
{
"dot_expander": {
"field": "foo.bar"
}
}
]
}
The reason for this is that Ingest doesn’t know how to automatically cast a scalar field to an object field.
URL-decodes a string
Name | Required | Default | Description |
---|---|---|---|
|
yes |
- |
The field to decode |
|
no |
|
The field to assign the converted value to, by default |
|
no |
|
If |
{
"urldecode": {
"field": "my_url_to_decode"
}
}