Skip to content

Commit

Permalink
added force_document_id option to ES output enable resend data and av…
Browse files Browse the repository at this point in the history
…oiding duplicated ES documents, fix influxdata#7891 (influxdata#8019)
  • Loading branch information
Toni Moreno authored Sep 8, 2020
1 parent 99cd99f commit 683f613
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 1 deletion.
4 changes: 3 additions & 1 deletion etc/telegraf.conf
Original file line number Diff line number Diff line change
Expand Up @@ -512,7 +512,9 @@
# template_name = "telegraf"
# ## Set to true if you want telegraf to overwrite an existing template
# overwrite_template = false

# ## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string
# ## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's
# force_document_id = false

# # Send metrics to command as input over stdin
# [[outputs.exec]]
Expand Down
4 changes: 4 additions & 0 deletions plugins/outputs/elasticsearch/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,9 @@ This plugin will format the events in the following way:
template_name = "telegraf"
## Set to true if you want telegraf to overwrite an existing template
overwrite_template = false
## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string
## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's
force_document_id = false
```

#### Permissions
Expand Down Expand Up @@ -232,6 +235,7 @@ Additionally, you can specify dynamic index names by using tags with the notatio
* `manage_template`: Set to true if you want telegraf to manage its index template. If enabled it will create a recommended index template for telegraf indexes.
* `template_name`: The template name used for telegraf indexes.
* `overwrite_template`: Set to true if you want telegraf to overwrite an existing template.
* `force_document_id`: Set to true will compute a unique hash from as sha256(concat(timestamp,measurement,series-hash)),enables resend or update data withoud ES duplicated documents.

### Known issues

Expand Down
24 changes: 24 additions & 0 deletions plugins/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"text/template"
"time"

"crypto/sha256"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/common/tls"
Expand All @@ -31,6 +33,7 @@ type Elasticsearch struct {
ManageTemplate bool
TemplateName string
OverwriteTemplate bool
ForceDocumentId bool
MajorReleaseNumber int
tls.ClientConfig

Expand Down Expand Up @@ -86,6 +89,9 @@ var sampleConfig = `
template_name = "telegraf"
## Set to true if you want telegraf to overwrite an existing template
overwrite_template = false
## If set to true a unique ID hash will be sent as sha256(concat(timestamp,measurement,series-hash)) string
## it will enable data resend and update metric points avoiding duplicated metrics with diferent id's
force_document_id = false
`

const telegrafTemplate = `
Expand Down Expand Up @@ -242,6 +248,19 @@ func (a *Elasticsearch) Connect() error {
return nil
}

// GetPointID generates a unique ID for a Metric Point
func GetPointID(m telegraf.Metric) string {

var buffer bytes.Buffer
//Timestamp(ns),measurement name and Series Hash for compute the final SHA256 based hash ID

buffer.WriteString(strconv.FormatInt(m.Time().Local().UnixNano(), 10))
buffer.WriteString(m.Name())
buffer.WriteString(strconv.FormatUint(m.HashID(), 10))

return fmt.Sprintf("%x", sha256.Sum256(buffer.Bytes()))
}

func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {
if len(metrics) == 0 {
return nil
Expand All @@ -265,6 +284,11 @@ func (a *Elasticsearch) Write(metrics []telegraf.Metric) error {

br := elastic.NewBulkIndexRequest().Index(indexName).Doc(m)

if a.ForceDocumentId {
id := GetPointID(metric)
br.Id(id)
}

if a.MajorReleaseNumber <= 6 {
br.Type("metrics")
}
Expand Down

0 comments on commit 683f613

Please sign in to comment.