Skip to content

Commit

Permalink
feat: Add CSV serializer (#11307)
Browse files Browse the repository at this point in the history
  • Loading branch information
srebhan authored and MyaLongmire committed Jul 6, 2022
1 parent 3332827 commit a18ac54
Show file tree
Hide file tree
Showing 18 changed files with 512 additions and 2 deletions.
6 changes: 5 additions & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1676,8 +1676,11 @@ func (c *Config) buildSerializer(tbl *ast.Table) (serializers.Serializer, error)
c.getFieldStringSlice(tbl, "templates", &sc.Templates)
c.getFieldString(tbl, "carbon2_format", &sc.Carbon2Format)
c.getFieldString(tbl, "carbon2_sanitize_replace_char", &sc.Carbon2SanitizeReplaceChar)
c.getFieldBool(tbl, "csv_column_prefix", &sc.CSVPrefix)
c.getFieldBool(tbl, "csv_header", &sc.CSVHeader)
c.getFieldString(tbl, "csv_separator", &sc.CSVSeparator)
c.getFieldString(tbl, "csv_timestamp_format", &sc.TimestampFormat)
c.getFieldInt(tbl, "influx_max_line_bytes", &sc.InfluxMaxLineBytes)

c.getFieldBool(tbl, "influx_sort_fields", &sc.InfluxSortFields)
c.getFieldBool(tbl, "influx_uint_support", &sc.InfluxUintSupport)
c.getFieldBool(tbl, "graphite_tag_support", &sc.GraphiteTagSupport)
Expand Down Expand Up @@ -1744,6 +1747,7 @@ func (c *Config) missingTomlField(_ reflect.Type, key string) error {
case "alias", "carbon2_format", "carbon2_sanitize_replace_char", "collectd_auth_file",
"collectd_parse_multivalue", "collectd_security_level", "collectd_typesdb", "collection_jitter",
"collection_offset",
"csv_separator", "csv_header", "csv_column_prefix", "csv_timestamp_format",
"data_format", "data_type", "delay", "drop", "drop_original", "dropwizard_metric_registry_path",
"dropwizard_tag_paths", "dropwizard_tags_path", "dropwizard_time_format", "dropwizard_time_path",
"fielddrop", "fieldpass", "flush_interval", "flush_jitter", "form_urlencoded_tag_keys",
Expand Down
1 change: 1 addition & 0 deletions docs/DATA_FORMATS_OUTPUT.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ plugins.

1. [InfluxDB Line Protocol](/plugins/serializers/influx)
1. [Carbon2](/plugins/serializers/carbon2)
1. [CSV](/plugins/serializers/csv)
1. [Graphite](/plugins/serializers/graphite)
1. [JSON](/plugins/serializers/json)
1. [MessagePack](/plugins/serializers/msgpack)
Expand Down
55 changes: 55 additions & 0 deletions plugins/serializers/csv/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# CSV Serializer

The `csv` output data format converts metrics into CSV lines.

## Configuration

```toml
[[outputs.file]]
## Files to write to, "stdout" is a specially handled file.
files = ["stdout", "/tmp/metrics.out"]

## Data format to output.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_OUTPUT.md
data_format = "csv"

## The default timestamp format is Unix epoch time.
# Other timestamp layout can be configured using the Go language time
# layout specification from https://golang.org/pkg/time/#Time.Format
# e.g.: csv_timestamp_format = "2006-01-02T15:04:05Z07:00"
# csv_timestamp_format = "unix"

## The default separator for the CSV format.
# csv_separator = ","

## Output the CSV header in the first line.
## Enable the header when outputting metrics to a new file.
## Disable when appending to a file or when using a stateless
## output to prevent headers appearing between data lines.
# csv_header = false

## Prefix tag and field columns with "tag_" and "field_" respectively.
## This can be helpful if you need to know the "type" of a column.
# csv_column_prefix = false
```

## Examples

Standard form:

```csv
1458229140,docker,raynor,30,4,...,59,660
```

When an output plugin needs to emit multiple metrics at one time, it may use
the batch format. The use of batch format is determined by the plugin,
reference the documentation for the specific plugin. With `csv_header = true`
you get

```csv
timestamp,measurement,host,field_1,field_2,...,field_N,n_images
1458229140,docker,raynor,30,4,...,59,660
1458229143,docker,raynor,28,5,...,60,665
```
176 changes: 176 additions & 0 deletions plugins/serializers/csv/csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package csv

import (
"bytes"
"encoding/csv"
"fmt"
"runtime"
"sort"
"strconv"
"time"

"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
)

type Serializer struct {
TimestampFormat string `toml:"csv_timestamp_format"`
Separator string `toml:"csv_separator"`
Header bool `toml:"csv_header"`
Prefix bool `toml:"csv_column_prefix"`

buffer bytes.Buffer
writer *csv.Writer
}

func NewSerializer(timestampFormat, separator string, header, prefix bool) (*Serializer, error) {
// Setting defaults
if separator == "" {
separator = ","
}

// Check inputs
if len(separator) > 1 {
return nil, fmt.Errorf("invalid separator %q", separator)
}
switch timestampFormat {
case "":
timestampFormat = "unix"
case "unix", "unix_ms", "unix_us", "unix_ns":
default:
if time.Now().Format(timestampFormat) == timestampFormat {
return nil, fmt.Errorf("invalid timestamp format %q", timestampFormat)
}
}

s := &Serializer{
TimestampFormat: timestampFormat,
Separator: separator,
Header: header,
Prefix: prefix,
}

// Initialize the writer
s.writer = csv.NewWriter(&s.buffer)
s.writer.Comma = []rune(separator)[0]
s.writer.UseCRLF = runtime.GOOS == "windows"

return s, nil
}

func (s *Serializer) Serialize(metric telegraf.Metric) ([]byte, error) {
// Clear the buffer
s.buffer.Truncate(0)

// Write the header if the user wants us to
if s.Header {
if err := s.writeHeader(metric); err != nil {
return nil, fmt.Errorf("writing header failed: %w", err)
}
s.Header = false
}

// Write the data
if err := s.writeData(metric); err != nil {
return nil, fmt.Errorf("writing data failed: %w", err)
}

// Finish up
s.writer.Flush()
return s.buffer.Bytes(), nil
}

func (s *Serializer) SerializeBatch(metrics []telegraf.Metric) ([]byte, error) {
if len(metrics) < 1 {
return nil, nil
}

// Clear the buffer
s.buffer.Truncate(0)

// Write the header if the user wants us to
if s.Header {
if err := s.writeHeader(metrics[0]); err != nil {
return nil, fmt.Errorf("writing header failed: %w", err)
}
s.Header = false
}

for _, m := range metrics {
if err := s.writeData(m); err != nil {
return nil, fmt.Errorf("writing data failed: %w", err)
}
}

// Finish up
s.writer.Flush()
return s.buffer.Bytes(), nil
}

func (s *Serializer) writeHeader(metric telegraf.Metric) error {
columns := []string{
"timestamp",
"measurement",
}
for _, tag := range metric.TagList() {
if s.Prefix {
columns = append(columns, "tag_"+tag.Key)
} else {
columns = append(columns, tag.Key)
}
}

// Sort the fields by name
sort.Slice(metric.FieldList(), func(i, j int) bool {
return metric.FieldList()[i].Key < metric.FieldList()[j].Key
})
for _, field := range metric.FieldList() {
if s.Prefix {
columns = append(columns, "field_"+field.Key)
} else {
columns = append(columns, field.Key)
}
}

return s.writer.Write(columns)
}

func (s *Serializer) writeData(metric telegraf.Metric) error {
var timestamp string

// Format the time
switch s.TimestampFormat {
case "unix":
timestamp = strconv.FormatInt(metric.Time().Unix(), 10)
case "unix_ms":
timestamp = strconv.FormatInt(metric.Time().UnixNano()/1_000_000, 10)
case "unix_us":
timestamp = strconv.FormatInt(metric.Time().UnixNano()/1_000, 10)
case "unix_ns":
timestamp = strconv.FormatInt(metric.Time().UnixNano(), 10)
default:
timestamp = metric.Time().UTC().Format(s.TimestampFormat)
}

columns := []string{
timestamp,
metric.Name(),
}
for _, tag := range metric.TagList() {
columns = append(columns, tag.Value)
}

// Sort the fields by name
sort.Slice(metric.FieldList(), func(i, j int) bool {
return metric.FieldList()[i].Key < metric.FieldList()[j].Key
})
for _, field := range metric.FieldList() {
v, err := internal.ToString(field.Value)
if err != nil {
return fmt.Errorf("converting field %q to string failed: %w", field.Key, err)
}
columns = append(columns, v)
}

return s.writer.Write(columns)
}
Loading

0 comments on commit a18ac54

Please sign in to comment.