Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding possibility to set default tags #136

Merged
merged 2 commits into from
Jun 19, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## 1.3.0 [in progress]
### Features
1. [#131](https://github.com/influxdata/influxdb-client-go/pull/131) Labels API
1. [#136](https://github.com/influxdata/influxdb-client-go/pull/136) Possibility to specify default tags

### Bug fixes
1. [#132](https://github.com/influxdata/influxdb-client-go/pull/132) Handle unsupported write type as string instead of generating panic
Expand Down
20 changes: 19 additions & 1 deletion api/write/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Options struct {
precision time.Duration
// Whether to use GZip compression in requests. Default false
useGZip bool
// Tags added to each point during writing. If a point already has a tag with the same key, it is left unchanged.
defaultTags map[string]string
}

// BatchSize returns size of batch
Expand Down Expand Up @@ -104,7 +106,23 @@ func (o *Options) SetUseGZip(useGZip bool) *Options {
return o
}

// AddDefaultTag adds a default tag. DefaultTags are added to each written point.
// If a tag with the same key already exist it is overwritten.
// If a point already defines such a tag, it is left unchanged.
func (o *Options) AddDefaultTag(key, value string) *Options {
o.DefaultTags()[key] = value
return o
}

// DefaultTags returns set of default tags
func (o *Options) DefaultTags() map[string]string {
if o.defaultTags == nil {
o.defaultTags = make(map[string]string)
}
return o.defaultTags
}

// DefaultOptions returns Options object with default values
func DefaultOptions() *Options {
return &Options{batchSize: 5000, maxRetries: 3, retryInterval: 1000, flushInterval: 1000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 10000}
return &Options{batchSize: 5000, maxRetries: 3, retryInterval: 1000, flushInterval: 1000, precision: time.Nanosecond, useGZip: false, retryBufferLimit: 10000, defaultTags: make(map[string]string)}
}
6 changes: 5 additions & 1 deletion api/write/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestDefaultOptions(t *testing.T) {
assert.Equal(t, uint(10000), opts.RetryBufferLimit())
assert.Equal(t, uint(1000), opts.RetryInterval())
assert.Equal(t, uint(3), opts.MaxRetries())
assert.Len(t, opts.DefaultTags(), 0)
}

func TestSettingsOptions(t *testing.T) {
Expand All @@ -31,12 +32,15 @@ func TestSettingsOptions(t *testing.T) {
SetPrecision(time.Millisecond).
SetRetryBufferLimit(5).
SetRetryInterval(5000).
SetMaxRetries(7)
SetMaxRetries(7).
AddDefaultTag("a", "1").
AddDefaultTag("b", "2")
assert.Equal(t, uint(5), opts.BatchSize())
assert.Equal(t, true, opts.UseGZip())
assert.Equal(t, uint(5000), opts.FlushInterval())
assert.Equal(t, time.Millisecond, opts.Precision())
assert.Equal(t, uint(5), opts.RetryBufferLimit())
assert.Equal(t, uint(5000), opts.RetryInterval())
assert.Equal(t, uint(7), opts.MaxRetries())
assert.Len(t, opts.DefaultTags(), 2)
}
19 changes: 19 additions & 0 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,25 @@ func genRecords(num int) []string {
return lines
}

func TestWriteApiWriteDefaultTag(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
opts := write.DefaultOptions().
SetBatchSize(1)
opts.AddDefaultTag("dft", "a")
writeApi := NewWriteApiImpl("my-org", "my-bucket", service, opts)
point := write.NewPoint("test",
map[string]string{
"vendor": "AWS",
},
map[string]interface{}{
"mem_free": 1234567,
}, time.Unix(60, 60))
writeApi.WritePoint(point)
writeApi.Close()
require.Len(t, service.Lines(), 1)
assert.Equal(t, "test,dft=a,vendor=AWS mem_free=1234567i 60000000060", service.Lines()[0])
}

func TestWriteApiImpl_Write(t *testing.T) {
service := newTestService(t, "http://localhost:8888")
writeApi := NewWriteApiImpl("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
Expand Down
60 changes: 59 additions & 1 deletion internal/write/writeService.go → internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"net/http"
"net/url"
"sort"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -150,21 +151,78 @@ func (w *Service) WriteBatch(ctx context.Context, batch *Batch) error {
return nil
}

type pointWithDefaultTags struct {
point *write.Point
defaultTags map[string]string
}

// Name returns the name of measurement of a point.
func (p *pointWithDefaultTags) Name() string {
return p.point.Name()
}

// Time is the timestamp of a Point.
func (p *pointWithDefaultTags) Time() time.Time {
return p.point.Time()
}

// FieldList returns a slice containing the fields of a Point.
func (p *pointWithDefaultTags) FieldList() []*lp.Field {
return p.point.FieldList()
}

func (p *pointWithDefaultTags) TagList() []*lp.Tag {
tags := make([]*lp.Tag, 0, len(p.point.TagList())+len(p.defaultTags))
tags = append(tags, p.point.TagList()...)
for k, v := range p.defaultTags {
if !existTag(p.point.TagList(), k) {
tags = append(tags, &lp.Tag{
Key: k,
Value: v,
})
}
}
sort.Slice(tags, func(i, j int) bool { return tags[i].Key < tags[j].Key })
return tags
}

func existTag(tags []*lp.Tag, key string) bool {
for _, tag := range tags {
if key == tag.Key {
return true
}
}
return false
}

func (w *Service) EncodePoints(points ...*write.Point) (string, error) {
var buffer bytes.Buffer
e := lp.NewEncoder(&buffer)
e.SetFieldTypeSupport(lp.UintSupport)
e.FailOnFieldErr(true)
e.SetPrecision(w.writeOptions.Precision())
for _, point := range points {
_, err := e.Encode(point)
_, err := e.Encode(w.pointToEncode(point))
if err != nil {
return "", err
}
}
return buffer.String(), nil
}

func (w *Service) pointToEncode(point *write.Point) lp.Metric {
var m lp.Metric
if len(w.writeOptions.DefaultTags()) > 0 {
m = &pointWithDefaultTags{
point: point,
defaultTags: w.writeOptions.DefaultTags(),
}
} else {
m = point
}
return m
}

func (w *Service) WriteUrl() (string, error) {
if w.url == "" {
u, err := url.Parse(w.httpService.ServerApiUrl())
Expand Down
52 changes: 52 additions & 0 deletions internal/write/service_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2020 InfluxData, Inc. All rights reserved.
// Use of this source code is governed by MIT
// license that can be found in the LICENSE file.

package write

import (
"github.com/influxdata/influxdb-client-go/api/write"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"testing"
)

func TestAddDefaultTags(t *testing.T) {
opts := write.DefaultOptions()
assert.Len(t, opts.DefaultTags(), 0)

opts.AddDefaultTag("dt1", "val1")
opts.AddDefaultTag("zdt", "val2")
srv := NewService("org", "buc", nil, opts)

p := write.NewPointWithMeasurement("test")
p.AddTag("id", "101")

p.AddField("float32", float32(80.0))

s, err := srv.EncodePoints(p)
require.Nil(t, err)
assert.Equal(t, "test,dt1=val1,id=101,zdt=val2 float32=80\n", s)
assert.Len(t, p.TagList(), 1)

p = write.NewPointWithMeasurement("x")
p.AddTag("xt", "1")
p.AddField("i", 1)

s, err = srv.EncodePoints(p)
require.Nil(t, err)
assert.Equal(t, "x,dt1=val1,xt=1,zdt=val2 i=1i\n", s)
assert.Len(t, p.TagList(), 1)

p = write.NewPointWithMeasurement("d")
p.AddTag("id", "1")
// do not overwrite point tag
sranka marked this conversation as resolved.
Show resolved Hide resolved
p.AddTag("zdt", "val10")
p.AddField("i", -1)

s, err = srv.EncodePoints(p)
require.Nil(t, err)
assert.Equal(t, "d,dt1=val1,id=1,zdt=val10 i=-1i\n", s)

assert.Len(t, p.TagList(), 2)
}
8 changes: 8 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ func (o *Options) HttpOptions() *http.Options {
return o.httpOptions
}

// AddDefaultTag adds a default tag. DefaultTags are added to each written point.
// If a tag with the same key already exist it is overwritten.
// If a point already defines such a tag, it is left unchanged
func (o *Options) AddDefaultTag(key, value string) *Options {
o.WriteOptions().AddDefaultTag(key, value)
return o
}

// DefaultOptions returns Options object with default values
func DefaultOptions() *Options {
return &Options{logLevel: 0, writeOptions: write.DefaultOptions(), httpOptions: http.DefaultOptions()}
Expand Down
51 changes: 48 additions & 3 deletions options_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package influxdb2
package influxdb2_test

import (
"context"
"crypto/tls"
influxdb2 "github.com/influxdata/influxdb-client-go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"net/http"
Expand All @@ -11,6 +13,49 @@ import (
"time"
)

func TestDefaultOptions(t *testing.T) {
opts := influxdb2.DefaultOptions()
assert.Equal(t, uint(5000), opts.BatchSize())
assert.Equal(t, false, opts.UseGZip())
assert.Equal(t, uint(1000), opts.FlushInterval())
assert.Equal(t, time.Nanosecond, opts.Precision())
assert.Equal(t, uint(10000), opts.RetryBufferLimit())
assert.Equal(t, uint(1000), opts.RetryInterval())
assert.Equal(t, uint(3), opts.MaxRetries())
assert.Equal(t, (*tls.Config)(nil), opts.TlsConfig())
assert.Equal(t, uint(20), opts.HttpRequestTimeout())
assert.Equal(t, uint(0), opts.LogLevel())
}

func TestSettingsOptions(t *testing.T) {
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
}
opts := influxdb2.DefaultOptions().
SetBatchSize(5).
SetUseGZip(true).
SetFlushInterval(5000).
SetPrecision(time.Millisecond).
SetRetryBufferLimit(5).
SetRetryInterval(5000).
SetMaxRetries(7).
SetTlsConfig(tlsConfig).
SetHttpRequestTimeout(50).
SetLogLevel(3).
AddDefaultTag("t", "a")
assert.Equal(t, uint(5), opts.BatchSize())
assert.Equal(t, true, opts.UseGZip())
assert.Equal(t, uint(5000), opts.FlushInterval())
assert.Equal(t, time.Millisecond, opts.Precision())
assert.Equal(t, uint(5), opts.RetryBufferLimit())
assert.Equal(t, uint(5000), opts.RetryInterval())
assert.Equal(t, uint(7), opts.MaxRetries())
assert.Equal(t, tlsConfig, opts.TlsConfig())
assert.Equal(t, uint(50), opts.HttpRequestTimeout())
assert.Equal(t, uint(3), opts.LogLevel())
assert.Len(t, opts.WriteOptions().DefaultTags(), 1)
}

func TestTimeout(t *testing.T) {
response := `,result,table,_start,_stop,_time,_value,_field,_measurement,a,b,
,,0,2020-02-17T22:19:49.747562847Z,2020-02-18T22:19:49.747562847Z,2020-02-18T10:34:08.135814545Z,1.4,f,test,1,adsfasdf
Expand All @@ -32,14 +77,14 @@ func TestTimeout(t *testing.T) {
}
}))
defer server.Close()
client := NewClientWithOptions(server.URL, "a", DefaultOptions().SetHttpRequestTimeout(1))
client := influxdb2.NewClientWithOptions(server.URL, "a", influxdb2.DefaultOptions().SetHttpRequestTimeout(1))
queryApi := client.QueryApi("org")

_, err := queryApi.QueryRaw(context.Background(), "flux", nil)
require.NotNil(t, err)
assert.True(t, strings.Contains(err.Error(), "Client.Timeout exceeded"))

client = NewClientWithOptions(server.URL, "a", DefaultOptions().SetHttpRequestTimeout(5))
client = influxdb2.NewClientWithOptions(server.URL, "a", influxdb2.DefaultOptions().SetHttpRequestTimeout(5))
queryApi = client.QueryApi("org")

result, err := queryApi.QueryRaw(context.Background(), "flux", nil)
Expand Down