-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add Elasticsearch 5.x output #2332
Changes from 1 commit
648e50b
9ea4d6a
7e494b5
d1951a8
e0ebc34
d786c6b
bf75c21
8f56697
14d6ed2
866ce6c
2f94c95
8dd92ab
959638a
a6cfae2
debe2e9
4506936
0e07a9b
d9ee5d9
beb9390
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
# Elasticsearch Output Plugin | ||
|
||
This plugin writes to [Elasticsearch](https://www.elastic.co) via HTTP using Elastic (http://olivere.github.io/elastic/). | ||
|
||
It only supports Elasticsearch 5.x series currently. | ||
|
||
### Example: | ||
|
||
The plugin will format the metrics in the following way: | ||
|
||
``` | ||
{ | ||
"@timestamp": "2017-01-01T00:00:00+00:00", | ||
"input_plugin": "cpu", | ||
"cpu": { | ||
"usage_guest": 0, | ||
"usage_guest_nice": 0, | ||
"usage_idle": 71.85413456197966, | ||
"usage_iowait": 0.256805341656516, | ||
"usage_irq": 0, | ||
"usage_nice": 0, | ||
"usage_softirq": 0.2054442732579466, | ||
"usage_steal": 0, | ||
"usage_system": 15.04879301548127, | ||
"usage_user": 12.634822807288275 | ||
}, | ||
"tag": { | ||
"cpu": "cpu-total", | ||
"host": "elastichost", | ||
"dc": "datacenter1" | ||
} | ||
} | ||
|
||
{ | ||
"@timestamp": "2017-01-01T00:00:00+00:00", | ||
"input_plugin": "system", | ||
"system": { | ||
"load1": 0.78, | ||
"load15": 0.8, | ||
"load5": 0.8, | ||
"n_cpus": 2, | ||
"n_users": 2 | ||
}, | ||
"tag": { | ||
"host": "elastichost", | ||
"dc": "datacenter1" | ||
} | ||
} | ||
|
||
|
||
``` | ||
|
||
### Configuration: | ||
|
||
```toml | ||
# Configuration for Elasticsearch to send metrics to. | ||
[[outputs.elasticsearch]] | ||
## The full HTTP endpoint URL for your Elasticsearch instance | ||
## Multiple urls can be specified as part of the same cluster, | ||
## this means that only ONE of the urls will be written to each interval. | ||
urls = [ "http://node1.es.example.com:9200" ] # required. | ||
## Set to true to ask Elasticsearch a list of all cluster nodes, | ||
## thus it is not necessary to list all nodes in the urls config option | ||
enable_sniffer = true | ||
## Set the interval to check if the nodes are available, in seconds | ||
## Setting to 0 will disable the health check (not recommended in production) | ||
health_check_interval = 10 | ||
## HTTP basic authentication details (eg. when using Shield) | ||
# username = "telegraf" | ||
# password = "mypassword" | ||
|
||
# Index Config | ||
## The target index for metrics (telegraf will create it if not exists) | ||
## You can use a different index per time frame using the patterns below | ||
# %Y - year (2016) | ||
# %y - last two digits of year (00..99) | ||
# %m - month (01..12) | ||
# %d - day of month (e.g., 01) | ||
# %H - hour (00..23) | ||
index_name = "telegraf-%Y.%m.%d" # required. | ||
|
||
## Template Config | ||
## set to true if you want telegraf to manage its index template | ||
manage_template = true | ||
## The template name used for telegraf indexes | ||
template_name = "telegraf" | ||
## set to true if you want to overwrite an existing template | ||
overwrite_template = false | ||
``` |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,246 @@ | ||
package elasticsearch | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"log" | ||
"strconv" | ||
"strings" | ||
"time" | ||
|
||
"github.com/influxdata/telegraf" | ||
"github.com/influxdata/telegraf/plugins/outputs" | ||
"gopkg.in/olivere/elastic.v5" | ||
) | ||
|
||
type Elasticsearch struct { | ||
URLs []string `toml:"urls"` | ||
IndexName string | ||
Username string | ||
Password string | ||
EnableSniffer bool | ||
HealthCheckInterval int | ||
ManageTemplate bool | ||
TemplateName string | ||
OverwriteTemplate bool | ||
Client *elastic.Client | ||
} | ||
|
||
var sampleConfig = ` | ||
## The full HTTP endpoint URL for your Elasticsearch instance | ||
## Multiple urls can be specified as part of the same cluster, | ||
## this means that only ONE of the urls will be written to each interval. | ||
urls = [ "http://node1.es.example.com:9200" ] # required. | ||
## Set to true to ask Elasticsearch a list of all cluster nodes, | ||
## thus it is not necessary to list all nodes in the urls config option | ||
enable_sniffer = true | ||
## Set the interval to check if the nodes are available, in seconds | ||
## Setting to 0 will disable the health check (not recommended in production) | ||
health_check_interval = 10 | ||
## HTTP basic authentication details (eg. when using Shield) | ||
# username = "telegraf" | ||
# password = "mypassword" | ||
|
||
# Index Config | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. when is this evaluated? only when telegraf start? or each metric "batch" under the same timestamp gets added to it's own index? (add that to the doc) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, the evaluation is done currently per metric inside the batch... there is a comment in the code about that. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nope, that is definitely not guaranteed, please leave it as-is! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. but also please document that the index is set per-metric in the README and sample config There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I will send later today There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've added additional information, let me know if it is enough. |
||
## The target index for metrics (telegraf will create it if not exists) | ||
## You can use a different index per time frame using the patterns below | ||
# %Y - year (2016) | ||
# %y - last two digits of year (00..99) | ||
# %m - month (01..12) | ||
# %d - day of month (e.g., 01) | ||
# %H - hour (00..23) | ||
index_name = "telegraf-%Y.%m.%d" # required. | ||
|
||
## Template Config | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can you provide more information on the template management? doesn't have to be here, maybe in the README would be sufficient. links to elasticsearch documentation would be extra good. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, template management is about creating a proper template for the index name provided. I will add more info in the doc... These config option names are inspired on similar logstash config options that should be familiar to who already uses ES |
||
## set to true if you want telegraf to manage its index template | ||
manage_template = true | ||
## The template name used for telegraf indexes | ||
template_name = "telegraf" | ||
## set to true if you want to overwrite an existing template | ||
overwrite_template = false | ||
` | ||
|
||
func (a *Elasticsearch) Connect() error { | ||
if a.URLs == nil || a.IndexName == "" { | ||
return fmt.Errorf("Elasticsearch urls or index_name is not defined") | ||
} | ||
|
||
ctx := context.Background() | ||
|
||
var clientOptions []elastic.ClientOptionFunc | ||
|
||
clientOptions = append(clientOptions, | ||
elastic.SetSniff(a.EnableSniffer), | ||
elastic.SetURL(a.URLs...), | ||
) | ||
|
||
if a.Username != "" && a.Password != "" { | ||
clientOptions = append(clientOptions, | ||
elastic.SetBasicAuth(a.Username, a.Password), | ||
) | ||
} | ||
|
||
if a.HealthCheckInterval > 0 { | ||
clientOptions = append(clientOptions, | ||
elastic.SetHealthcheckInterval(time.Duration(a.HealthCheckInterval)*time.Second), | ||
) | ||
} | ||
|
||
client, err := elastic.NewClient(clientOptions...) | ||
|
||
if err != nil { | ||
return fmt.Errorf("Elasticsearch connection failed: %s", err) | ||
} | ||
|
||
// check for version on first node | ||
esVersion, err := client.ElasticsearchVersion(a.URLs[0]) | ||
|
||
if err != nil { | ||
return fmt.Errorf("Elasticsearch version check failed: %s", err) | ||
} | ||
|
||
// warn about ES version | ||
if i, err := strconv.Atoi(strings.Split(esVersion, ".")[0]); err == nil { | ||
if i < 5 { | ||
log.Println("W! Elasticsearch version not supported: " + esVersion) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should this be an error? should we continue with the plugin or just return here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure here, I didn't test to really know it is not going to work on older versions. Maybe someone wants to take the risk, so there's the warning. But I agree to return if you think it is safer. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd prefer to return and not bother trying to support older versions of elastic |
||
} else { | ||
log.Println("I! Elasticsearch version: " + esVersion) | ||
} | ||
} | ||
|
||
a.Client = client | ||
|
||
if a.ManageTemplate { | ||
templateExists, errExists := a.Client.IndexTemplateExists(a.TemplateName).Do(ctx) | ||
|
||
if errExists != nil { | ||
return fmt.Errorf("Elasticsearch template check failed, template name: %s, error: %s", a.TemplateName, errExists) | ||
} | ||
|
||
if (a.OverwriteTemplate) || (!templateExists) { | ||
// Create or update the template | ||
tmpl := fmt.Sprintf(` | ||
{ "template":"%s*", | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should you include in the documentation what this template is? how do you know that this template will work for everyone? should the template be configurable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The template is kind of optional, the plugin will work without it and ES would detect correctly most of the types, and even if not optimal, it could still work for querying and graphing the data. One of the drawbacks to not use this template is to have the tags analyzed by ES. In this case, using the default mapping from ES, instead of using The idea is that the template provided will have sane defaults/types. Everything can be override by another template if the ES admin wants, or have this template updated manually. Maybe I can add in the doc that it is possible to check the template created by issuing BTW I need to check few additional things about the mapping of the number fields. I saw today some fields being mapped as float and others as long, and I need to check if this is correct and/or it causes any trouble. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've made few updates to the template and added more information in the README.md file. |
||
"mappings" : { | ||
"_default_" : { | ||
"_all": { "enabled": false }, | ||
"properties" : { | ||
"@timestamp" : { "type" : "date" }, | ||
"input_plugin" : { "type" : "keyword" } | ||
}, | ||
"dynamic_templates": [{ | ||
"tag": { | ||
"path_match": "tag.*", | ||
"mapping": { | ||
"ignore_above": 512, | ||
"type": "keyword" | ||
}, | ||
"match_mapping_type": "string" | ||
} | ||
}] | ||
} | ||
} | ||
}`, a.TemplateName) | ||
|
||
_, errCreateTemplate := a.Client.IndexPutTemplate(a.TemplateName).BodyString(tmpl).Do(ctx) | ||
|
||
if errCreateTemplate != nil { | ||
return fmt.Errorf("Elasticsearch failed to create index template %s : %s", a.TemplateName, errCreateTemplate) | ||
} | ||
|
||
log.Printf("D! Elasticsearch template %s created or updated\n", a.TemplateName) | ||
|
||
} else { | ||
|
||
log.Println("D! Found existing Elasticsearch template. Skipping template management") | ||
|
||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (a *Elasticsearch) Write(metrics []telegraf.Metric) error { | ||
if len(metrics) == 0 { | ||
return nil | ||
} | ||
|
||
ctx := context.Background() | ||
bulkRequest := a.Client.Bulk() | ||
|
||
for _, metric := range metrics { | ||
var name = metric.Name() | ||
|
||
// index name has to be re-evaluated each time for telegraf | ||
// to send the metric to the correct time-based index | ||
indexName := a.GetIndexName(a.IndexName, metric.Time()) | ||
|
||
m := make(map[string]interface{}) | ||
mName := make(map[string]interface{}) | ||
mTag := make(map[string]interface{}) | ||
|
||
m["@timestamp"] = metric.Time() | ||
m["input_plugin"] = name | ||
|
||
for key, value := range metric.Tags() { | ||
mTag[key] = value | ||
} | ||
|
||
for key, value := range metric.Fields() { | ||
mName[key] = value | ||
} | ||
|
||
m["tag"] = mTag | ||
m[name] = mName | ||
|
||
bulkRequest.Add(elastic.NewBulkIndexRequest(). | ||
Index(indexName). | ||
Type("metrics"). | ||
Doc(m)) | ||
|
||
} | ||
|
||
_, err := bulkRequest.Do(ctx) | ||
|
||
if err != nil { | ||
return fmt.Errorf("Error sending bulk request to Elasticsearch: %s", err) | ||
} | ||
|
||
return nil | ||
|
||
} | ||
|
||
func (a *Elasticsearch) GetIndexName(indexName string, eventTime time.Time) string { | ||
if strings.Contains(indexName, "%") { | ||
var dateReplacer = strings.NewReplacer( | ||
"%Y", strconv.Itoa(eventTime.Year()), | ||
"%m", fmt.Sprintf("%0.2d", eventTime.Month()), | ||
"%d", fmt.Sprintf("%0.2d", eventTime.Day()), | ||
"%H", fmt.Sprintf("%0.2d", eventTime.Hour()), | ||
) | ||
|
||
indexName = dateReplacer.Replace(indexName) | ||
} | ||
|
||
return indexName | ||
|
||
} | ||
|
||
func (a *Elasticsearch) SampleConfig() string { | ||
return sampleConfig | ||
} | ||
|
||
func (a *Elasticsearch) Description() string { | ||
return "Configuration for Elasticsearch to send metrics to." | ||
} | ||
|
||
func (a *Elasticsearch) Close() error { | ||
a.Client = nil | ||
return nil | ||
} | ||
|
||
func init() { | ||
outputs.Add("elasticsearch", func() telegraf.Output { | ||
return &Elasticsearch{} | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not super familiar with ES best practices, but this seems a bit redundant to me. Why put
"input_plugin": "cpu"
in the top-level of the metric if you already have the name of the plugin in"cpu": ...
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's to make possible/easier to query/filter for metrics from a particular input. It is not so easy/convenient to query for field names in ES. It can be done by issuing a terms query on _field_names, but I don't know how to do this in grafana/kibana for example (or even if it is possible to do).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In that case, shouldn't we put a "measurement_name" field in the ES metric? seems like this is more useful than the plugin name. There are many plugins that write more than one measurement name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, makes sense. Actually I think the current "input_plugin" is already the measurement name, it comes from
metric.Name()
. I will change the name of the field.