Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Elasticsearch 5.x output #2332

Merged
merged 19 commits into from
Mar 21, 2017
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Godeps
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ github.com/influxdata/config b79f6829346b8d6e78ba73544b1e1038f1f1c9da
github.com/influxdata/influxdb fc57c0f7c635df3873f3d64f0ed2100ddc94d5ae
github.com/influxdata/toml af4df43894b16e3fd2b788d01bd27ad0776ef2d0
github.com/influxdata/wlog 7c63b0a71ef8300adc255344d275e10e5c3a71ec
github.com/jackc/pgx bb73d8427902891bbad7b949b9c60b32949d935f
github.com/kardianos/osext 29ae4ffbc9a6fe9fb2bc5029050ce6996ea1d3bc
github.com/kardianos/service 5e335590050d6d00f3aa270217d288dda1c94d0a
github.com/kballard/go-shellquote d8ec1a69a250a17bb0e419c386eac1f3711dc142
Expand Down Expand Up @@ -61,5 +62,5 @@ golang.org/x/text a71fd10341b064c10f4a81ceac72bcf70f26ea34
gopkg.in/dancannon/gorethink.v1 7d1af5be49cb5ecc7b177bf387d232050299d6ef
gopkg.in/fatih/pool.v2 cba550ebf9bce999a02e963296d4bc7a486cb715
gopkg.in/mgo.v2 d90005c5262a3463800497ea5a89aed5fe22c886
gopkg.in/olivere/elastic.v5 ee3ebceab960cf68ab9a89ee6d78c031ef5b4a4e
gopkg.in/yaml.v2 a83829b6f1293c91addabc89d0571c246397bbf4
github.com/jackc/pgx bb73d8427902891bbad7b949b9c60b32949d935f
6 changes: 4 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ docker-run:
-e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \
-d spotify/kafka
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
docker run --name mysql -p "3306:3306" -e MYSQL_ALLOW_EMPTY_PASSWORD=yes -d mysql
docker run --name memcached -p "11211:11211" -d memcached
docker run --name postgres -p "5432:5432" -d postgres
Expand All @@ -69,15 +70,16 @@ docker-run-circle:
-e ADVERTISED_PORT=9092 \
-p "2181:2181" -p "9092:9092" \
-d spotify/kafka
docker run --name elasticsearch -p "9200:9200" -p "9300:9300" -d elasticsearch:5
docker run --name nsq -p "4150:4150" -d nsqio/nsq /nsqd
docker run --name mqtt -p "1883:1883" -d ncarlier/mqtt
docker run --name riemann -p "5555:5555" -d blalor/riemann
docker run --name nats -p "4222:4222" -d nats

# Kill all docker containers, ignore errors
docker-kill:
-docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats
-docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats
-docker kill nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch
-docker rm nsq aerospike redis rabbitmq postgres memcached mysql kafka mqtt riemann nats elasticsearch

# Run full unit tests using docker containers (includes setup and teardown)
test: vet docker-kill docker-run
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ Telegraf can also collect metrics via the following service plugins:
* [aws cloudwatch](./plugins/outputs/cloudwatch)
* [datadog](./plugins/outputs/datadog)
* [discard](./plugins/outputs/discard)
* [elasticsearch](./plugins/outputs/elasticsearch)
* [file](./plugins/outputs/file)
* [graphite](./plugins/outputs/graphite)
* [graylog](./plugins/outputs/graylog)
Expand Down
1 change: 1 addition & 0 deletions plugins/outputs/all/all.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
_ "github.com/influxdata/telegraf/plugins/outputs/cloudwatch"
_ "github.com/influxdata/telegraf/plugins/outputs/datadog"
_ "github.com/influxdata/telegraf/plugins/outputs/discard"
_ "github.com/influxdata/telegraf/plugins/outputs/elasticsearch"
_ "github.com/influxdata/telegraf/plugins/outputs/file"
_ "github.com/influxdata/telegraf/plugins/outputs/graphite"
_ "github.com/influxdata/telegraf/plugins/outputs/graylog"
Expand Down
89 changes: 89 additions & 0 deletions plugins/outputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
# Elasticsearch Output Plugin

This plugin writes to [Elasticsearch](https://www.elastic.co) via HTTP using Elastic (http://olivere.github.io/elastic/).

It only supports Elasticsearch 5.x series currently.

### Example:

The plugin will format the metrics in the following way:

```
{
"@timestamp": "2017-01-01T00:00:00+00:00",
"input_plugin": "cpu",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not super familiar with ES best practices, but this seems a bit redundant to me. Why put "input_plugin": "cpu" in the top-level of the metric if you already have the name of the plugin in "cpu": ...?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's to make possible/easier to query/filter for metrics from a particular input. It is not so easy/convenient to query for field names in ES. It can be done by issuing a terms query on _field_names, but I don't know how to do this in grafana/kibana for example (or even if it is possible to do).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, shouldn't we put a "measurement_name" field in the ES metric? seems like this is more useful than the plugin name. There are many plugins that write more than one measurement name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense. Actually I think the current "input_plugin" is already the measurement name, it comes from metric.Name(). I will change the name of the field.

"cpu": {
"usage_guest": 0,
"usage_guest_nice": 0,
"usage_idle": 71.85413456197966,
"usage_iowait": 0.256805341656516,
"usage_irq": 0,
"usage_nice": 0,
"usage_softirq": 0.2054442732579466,
"usage_steal": 0,
"usage_system": 15.04879301548127,
"usage_user": 12.634822807288275
},
"tag": {
"cpu": "cpu-total",
"host": "elastichost",
"dc": "datacenter1"
}
}

{
"@timestamp": "2017-01-01T00:00:00+00:00",
"input_plugin": "system",
"system": {
"load1": 0.78,
"load15": 0.8,
"load5": 0.8,
"n_cpus": 2,
"n_users": 2
},
"tag": {
"host": "elastichost",
"dc": "datacenter1"
}
}


```

### Configuration:

```toml
# Configuration for Elasticsearch to send metrics to.
[[outputs.elasticsearch]]
## The full HTTP endpoint URL for your Elasticsearch instance
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
urls = [ "http://node1.es.example.com:9200" ] # required.
## Set to true to ask Elasticsearch a list of all cluster nodes,
## thus it is not necessary to list all nodes in the urls config option
enable_sniffer = true
## Set the interval to check if the nodes are available, in seconds
## Setting to 0 will disable the health check (not recommended in production)
health_check_interval = 10
## HTTP basic authentication details (eg. when using Shield)
# username = "telegraf"
# password = "mypassword"

# Index Config
## The target index for metrics (telegraf will create it if not exists)
## You can use a different index per time frame using the patterns below
# %Y - year (2016)
# %y - last two digits of year (00..99)
# %m - month (01..12)
# %d - day of month (e.g., 01)
# %H - hour (00..23)
index_name = "telegraf-%Y.%m.%d" # required.

## Template Config
## set to true if you want telegraf to manage its index template
manage_template = true
## The template name used for telegraf indexes
template_name = "telegraf"
## set to true if you want to overwrite an existing template
overwrite_template = false
```
246 changes: 246 additions & 0 deletions plugins/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,246 @@
package elasticsearch

import (
"context"
"fmt"
"log"
"strconv"
"strings"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/plugins/outputs"
"gopkg.in/olivere/elastic.v5"
)

type Elasticsearch struct {
URLs []string `toml:"urls"`
IndexName string
Username string
Password string
EnableSniffer bool
HealthCheckInterval int
ManageTemplate bool
TemplateName string
OverwriteTemplate bool
Client *elastic.Client
}

var sampleConfig = `
## The full HTTP endpoint URL for your Elasticsearch instance
## Multiple urls can be specified as part of the same cluster,
## this means that only ONE of the urls will be written to each interval.
urls = [ "http://node1.es.example.com:9200" ] # required.
## Set to true to ask Elasticsearch a list of all cluster nodes,
## thus it is not necessary to list all nodes in the urls config option
enable_sniffer = true
## Set the interval to check if the nodes are available, in seconds
## Setting to 0 will disable the health check (not recommended in production)
health_check_interval = 10
## HTTP basic authentication details (eg. when using Shield)
# username = "telegraf"
# password = "mypassword"

# Index Config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when is this evaluated? only when telegraf start? or each metric "batch" under the same timestamp gets added to it's own index? (add that to the doc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the evaluation is done currently per metric inside the batch... there is a comment in the code about that.
But... is it always the case that a batch of metrics all have same timestamp? If yes, I will update this to do the evaluation once per batch only... and a note in the doc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope, that is definitely not guaranteed, please leave it as-is!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but also please document that the index is set per-metric in the README and sample config

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I will send later today

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added additional information, let me know if it is enough.

## The target index for metrics (telegraf will create it if not exists)
## You can use a different index per time frame using the patterns below
# %Y - year (2016)
# %y - last two digits of year (00..99)
# %m - month (01..12)
# %d - day of month (e.g., 01)
# %H - hour (00..23)
index_name = "telegraf-%Y.%m.%d" # required.

## Template Config
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you provide more information on the template management? doesn't have to be here, maybe in the README would be sufficient.

links to elasticsearch documentation would be extra good.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, template management is about creating a proper template for the index name provided. I will add more info in the doc... These config option names are inspired on similar logstash config options that should be familiar to who already uses ES

## set to true if you want telegraf to manage its index template
manage_template = true
## The template name used for telegraf indexes
template_name = "telegraf"
## set to true if you want to overwrite an existing template
overwrite_template = false
`

func (a *Elasticsearch) Connect() error {
if a.URLs == nil || a.IndexName == "" {
return fmt.Errorf("Elasticsearch urls or index_name is not defined")
}

ctx := context.Background()

var clientOptions []elastic.ClientOptionFunc

clientOptions = append(clientOptions,
elastic.SetSniff(a.EnableSniffer),
elastic.SetURL(a.URLs...),
)

if a.Username != "" && a.Password != "" {
clientOptions = append(clientOptions,
elastic.SetBasicAuth(a.Username, a.Password),
)
}

if a.HealthCheckInterval > 0 {
clientOptions = append(clientOptions,
elastic.SetHealthcheckInterval(time.Duration(a.HealthCheckInterval)*time.Second),
)
}

client, err := elastic.NewClient(clientOptions...)

if err != nil {
return fmt.Errorf("Elasticsearch connection failed: %s", err)
}

// check for version on first node
esVersion, err := client.ElasticsearchVersion(a.URLs[0])

if err != nil {
return fmt.Errorf("Elasticsearch version check failed: %s", err)
}

// warn about ES version
if i, err := strconv.Atoi(strings.Split(esVersion, ".")[0]); err == nil {
if i < 5 {
log.Println("W! Elasticsearch version not supported: " + esVersion)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be an error? should we continue with the plugin or just return here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure here, I didn't test to really know it is not going to work on older versions. Maybe someone wants to take the risk, so there's the warning. But I agree to return if you think it is safer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to return and not bother trying to support older versions of elastic

} else {
log.Println("I! Elasticsearch version: " + esVersion)
}
}

a.Client = client

if a.ManageTemplate {
templateExists, errExists := a.Client.IndexTemplateExists(a.TemplateName).Do(ctx)

if errExists != nil {
return fmt.Errorf("Elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists)
}

if (a.OverwriteTemplate) || (!templateExists) {
// Create or update the template
tmpl := fmt.Sprintf(`
{ "template":"%s*",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should you include in the documentation what this template is? how do you know that this template will work for everyone? should the template be configurable?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The template is kind of optional, the plugin will work without it and ES would detect correctly most of the types, and even if not optimal, it could still work for querying and graphing the data.

One of the drawbacks to not use this template is to have the tags analyzed by ES. In this case, using the default mapping from ES, instead of using tag.host one would have to use tag.host.keyword to access the non-analyzed field.

The idea is that the template provided will have sane defaults/types. Everything can be override by another template if the ES admin wants, or have this template updated manually.

Maybe I can add in the doc that it is possible to check the template created by issuing curl http://localhost:9200/_template/telegraf?pretty (considering the template name is telegraf)

BTW I need to check few additional things about the mapping of the number fields. I saw today some fields being mapped as float and others as long, and I need to check if this is correct and/or it causes any trouble.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made few updates to the template and added more information in the README.md file.

"mappings" : {
"_default_" : {
"_all": { "enabled": false },
"properties" : {
"@timestamp" : { "type" : "date" },
"input_plugin" : { "type" : "keyword" }
},
"dynamic_templates": [{
"tag": {
"path_match": "tag.*",
"mapping": {
"ignore_above": 512,
"type": "keyword"
},
"match_mapping_type": "string"
}
}]
}
}
}`, a.TemplateName)

_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl).Do(ctx)

if errCreateTemplate != nil {
return fmt.Errorf("Elasticsearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate)
}

log.Printf("D! Elasticsearch template %s created or updated\n", a.TemplateName)

} else {

log.Println("D! Found existing Elasticsearch template. Skipping template management")

}
}

return nil
}

func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
}

ctx := context.Background()
bulkRequest := a.Client.Bulk()

for _, metric := range metrics {
var name = metric.Name()

// index name has to be re-evaluated each time for telegraf
// to send the metric to the correct time-based index
indexName := a.GetIndexName(a.IndexName, metric.Time())

m := make(map[string]interface{})
mName := make(map[string]interface{})
mTag := make(map[string]interface{})

m["@timestamp"] = metric.Time()
m["input_plugin"] = name

for key, value := range metric.Tags() {
mTag[key] = value
}

for key, value := range metric.Fields() {
mName[key] = value
}

m["tag"] = mTag
m[name] = mName

bulkRequest.Add(elastic.NewBulkIndexRequest().
Index(indexName).
Type("metrics").
Doc(m))

}

_, err := bulkRequest.Do(ctx)

if err != nil {
return fmt.Errorf("Error sending bulk request to Elasticsearch: %s", err)
}

return nil

}

func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) string {
if strings.Contains(indexName, "%") {
var dateReplacer = strings.NewReplacer(
"%Y", strconv.Itoa(eventTime.Year()),
"%m", fmt.Sprintf("%0.2d", eventTime.Month()),
"%d", fmt.Sprintf("%0.2d", eventTime.Day()),
"%H", fmt.Sprintf("%0.2d", eventTime.Hour()),
)

indexName = dateReplacer.Replace(indexName)
}

return indexName

}

func (a *Elasticsearch) SampleConfig() string {
return sampleConfig
}

func (a *Elasticsearch) Description() string {
return "Configuration for Elasticsearch to send metrics to."
}

func (a *Elasticsearch) Close() error {
a.Client = nil
return nil
}

func init() {
outputs.Add("elasticsearch", func() telegraf.Output {
return &Elasticsearch{}
})
}
Loading