Skip to content

Commit

Permalink
Merge pull request #239 from bonitoo-io/fix/sync_write
Browse files Browse the repository at this point in the history
fix: Removing retrying from blocking write
  • Loading branch information
vlastahajek authored Mar 25, 2021
2 parents 3577563 + 40de55b commit 7469a6a
Show file tree
Hide file tree
Showing 14 changed files with 174 additions and 145 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 2.2.3 [in progress]
### Bug fixes
1. [#236](https://github.com/influxdata/influxdb-client-go/pull/236) Setting MaxRetries to zero value disables retry strategy.
1. [#239](https://github.com/influxdata/influxdb-client-go/pull/239) Blocking write client doesn't use retry handling.

## 2.2.2 [2021-01-29]
### Bug fixes
1. [#229](https://github.com/influxdata/influxdb-client-go/pull/229) Connection errors are also subject for retrying.
Expand Down
2 changes: 1 addition & 1 deletion api/delete_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestDeleteAPI(t *testing.T) {
map[string]interface{}{"f": f, "i": i},
tm)
err := writeAPI.WritePoint(ctx, p)
require.Nil(t, err, err)
require.NoError(t, err)
f += 1.2
tm = tm.Add(time.Minute)
}
Expand Down
5 changes: 5 additions & 0 deletions api/http/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package http

import (
"errors"
"fmt"
"strconv"
)
Expand All @@ -30,6 +31,10 @@ func (e *Error) Error() string {
}
}

func (e *Error) Unwrap() error {
return errors.New(e.Error())
}

// NewError returns newly created Error initialised with nested error and default values
func NewError(err error) *Error {
return &Error{
Expand Down
2 changes: 1 addition & 1 deletion api/users_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestSignInOut(t *testing.T) {

// try authorized calls
err = client.WriteAPIBlocking("my-org", "my-bucket").WriteRecord(ctx, "test,a=rock,b=local f=1.2,i=-5i")
assert.Nil(t, err)
assert.NoError(t, err)

res, err := client.QueryAPI("my-org").QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "test")`, influxdb2.DefaultDialect())
assert.Nil(t, err)
Expand Down
3 changes: 2 additions & 1 deletion api/write/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func (o *Options) MaxRetries() uint {
return o.maxRetries
}

// SetMaxRetries sets maximum count of retry attempts of failed writes
// SetMaxRetries sets maximum count of retry attempts of failed writes.
// Setting zero value disables retry strategy.
func (o *Options) SetMaxRetries(maxRetries uint) *Options {
o.maxRetries = maxRetries
return o
Expand Down
11 changes: 5 additions & 6 deletions api/writeAPIBlocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package api
import (
"context"
"strings"
"sync"

http2 "github.com/influxdata/influxdb-client-go/v2/api/http"
"github.com/influxdata/influxdb-client-go/v2/api/write"
Expand Down Expand Up @@ -61,7 +60,6 @@ type WriteAPIBlocking interface {
type writeAPIBlocking struct {
service *iwrite.Service
writeOptions *write.Options
lock sync.Mutex
}

// NewWriteAPIBlocking creates new WriteAPIBlocking instance for org and bucket with underlying client
Expand All @@ -70,10 +68,11 @@ func NewWriteAPIBlocking(org string, bucket string, service http2.Service, write
}

func (w *writeAPIBlocking) write(ctx context.Context, line string) error {
w.lock.Lock()
defer w.lock.Unlock()
err := w.service.HandleWrite(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval()))
return err
err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval()))
if err != nil {
return err.Unwrap()
}
return nil
}

func (w *writeAPIBlocking) WriteRecord(ctx context.Context, line ...string) error {
Expand Down
49 changes: 27 additions & 22 deletions api/writeAPIBlocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package api

import (
"context"
"net"
"net/http"
"sync"
"testing"
"time"
Expand All @@ -20,7 +22,7 @@ import (
func TestWritePoint(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
points := genPoints(10)
points := test.GenPoints(10)
err := writeAPI.WritePoint(context.Background(), points...)
require.Nil(t, err)
require.Len(t, service.Lines(), 10)
Expand All @@ -35,7 +37,7 @@ func TestWritePoint(t *testing.T) {
func TestWriteRecord(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
lines := genRecords(10)
lines := test.GenRecords(10)
err := writeAPI.WriteRecord(context.Background(), lines...)
require.Nil(t, err)
require.Len(t, service.Lines(), 10)
Expand All @@ -54,29 +56,10 @@ func TestWriteRecord(t *testing.T) {
require.Equal(t, "invalid: data", err.Error())
}

func TestWriteContextCancel(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
lines := genRecords(10)
ctx, cancel := context.WithCancel(context.Background())
var err error
var wg sync.WaitGroup
wg.Add(1)
go func() {
<-time.After(time.Second)
err = writeAPI.WriteRecord(ctx, lines...)
wg.Done()
}()
cancel()
wg.Wait()
require.Equal(t, context.Canceled, err)
assert.Len(t, service.Lines(), 0)
}

func TestWriteParallel(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
lines := genRecords(1000)
lines := test.GenRecords(1000)

chanLine := make(chan string)
var wg sync.WaitGroup
Expand All @@ -99,3 +82,25 @@ func TestWriteParallel(t *testing.T) {

service.Close()
}

func TestWriteErrors(t *testing.T) {
service := http2.NewService("http://locl:866", "", http2.DefaultOptions().SetHTTPClient(&http.Client{
Timeout: 100 * time.Millisecond,
Transport: &http.Transport{
DialContext: (&net.Dialer{
Timeout: 100 * time.Millisecond,
}).DialContext,
},
}))
writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
points := test.GenPoints(10)
errors := 0
for _, p := range points {
err := writeAPI.WritePoint(context.Background(), p)
if assert.Error(t, err) {
errors++
}
}
require.Equal(t, 10, errors)

}
54 changes: 5 additions & 49 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package api

import (
"fmt"
"math/rand"
"strings"
"sync"
"testing"
Expand All @@ -20,49 +19,6 @@ import (
"github.com/stretchr/testify/require"
)

func genPoints(num int) []*write.Point {
points := make([]*write.Point, num)
rand.Seed(321)

t := time.Now()
for i := 0; i < len(points); i++ {
points[i] = write.NewPoint(
"test",
map[string]string{
"id": fmt.Sprintf("rack_%v", i%10),
"vendor": "AWS",
"hostname": fmt.Sprintf("host_%v", i%100),
},
map[string]interface{}{
"temperature": rand.Float64() * 80.0,
"disk_free": rand.Float64() * 1000.0,
"disk_total": (i/10 + 1) * 1000000,
"mem_total": (i/100 + 1) * 10000000,
"mem_free": rand.Uint64(),
},
t)
if i%10 == 0 {
t = t.Add(time.Second)
}
}
return points
}

func genRecords(num int) []string {
lines := make([]string, num)
rand.Seed(321)

t := time.Now()
for i := 0; i < len(lines); i++ {
lines[i] = fmt.Sprintf("test,id=rack_%v,vendor=AWS,hostname=host_%v temperature=%v,disk_free=%v,disk_total=%vi,mem_total=%vi,mem_free=%vu %v",
i%10, i%100, rand.Float64()*80.0, rand.Float64()*1000.0, (i/10+1)*1000000, (i/100+1)*10000000, rand.Uint64(), t.UnixNano())
if i%10 == 0 {
t = t.Add(time.Second)
}
}
return lines
}

func TestWriteAPIWriteDefaultTag(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
opts := write.DefaultOptions().
Expand All @@ -85,7 +41,7 @@ func TestWriteAPIWriteDefaultTag(t *testing.T) {
func TestWriteAPIImpl_Write(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5))
points := genPoints(10)
points := test.GenPoints(10)
for _, p := range points {
writeAPI.WritePoint(p)
}
Expand All @@ -103,7 +59,7 @@ func TestGzipWithFlushing(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetLogLevel(log.DebugLevel)
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetUseGZip(true))
points := genPoints(5)
points := test.GenPoints(5)
for _, p := range points {
writeAPI.WritePoint(p)
}
Expand All @@ -128,7 +84,7 @@ func TestGzipWithFlushing(t *testing.T) {
func TestFlushInterval(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(10).SetFlushInterval(500))
points := genPoints(5)
points := test.GenPoints(5)
for _, p := range points {
writeAPI.WritePoint(p)
}
Expand All @@ -153,7 +109,7 @@ func TestRetry(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetLogLevel(log.DebugLevel)
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000))
points := genPoints(15)
points := test.GenPoints(15)
for i := 0; i < 5; i++ {
writeAPI.WritePoint(points[i])
}
Expand Down Expand Up @@ -203,7 +159,7 @@ func TestWriteError(t *testing.T) {
recErr = <-errCh
wg.Done()
}()
points := genPoints(15)
points := test.GenPoints(15)
for i := 0; i < 5; i++ {
writeAPI.WritePoint(points[i])
}
Expand Down
6 changes: 3 additions & 3 deletions client_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestWrite(t *testing.T) {

err := client.WriteAPIBlocking("my-org", "my-bucket").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("test").
AddTag("a", "3").AddField("i", 20).AddField("f", 4.4))
assert.Nil(t, err)
assert.NoError(t, err)

client.Close()
wg.Wait()
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestWriteV1Compatibility(t *testing.T) {

err := client.WriteAPIBlocking("", "mydb/autogen").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("testv1").
AddTag("a", "3").AddField("i", 20).AddField("f", 4.4))
assert.Nil(t, err)
assert.NoError(t, err)

client.Close()
wg.Wait()
Expand Down Expand Up @@ -273,7 +273,7 @@ func TestHTTPService(t *testing.T) {
org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org")
if err != nil {
//return err
t.Error(err)
t.Fatal(err)
}
taskDescription := "Example task"
taskFlux := `option task = {
Expand Down
34 changes: 10 additions & 24 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"testing"
"time"

http3 "github.com/influxdata/influxdb-client-go/v2/api/http"
http2 "github.com/influxdata/influxdb-client-go/v2/internal/http"
iwrite "github.com/influxdata/influxdb-client-go/v2/internal/write"
"github.com/stretchr/testify/assert"
Expand All @@ -39,8 +38,7 @@ func TestUrls(t *testing.T) {
assert.Equal(t, url.serverURL, ci.serverURL)
assert.Equal(t, url.serverAPIURL, ci.httpService.ServerAPIURL())
ws := iwrite.NewService("org", "bucket", ci.httpService, c.Options().WriteOptions())
wu, err := ws.WriteURL()
require.Nil(t, err)
wu := ws.WriteURL()
assert.Equal(t, url.writeURLPrefix+"?bucket=bucket&org=org&precision=ns", wu)
})
}
Expand Down Expand Up @@ -94,7 +92,7 @@ func TestUserAgent(t *testing.T) {
assert.Nil(t, err)

err = c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
assert.Nil(t, err)
assert.NoError(t, err)
}

func TestServerError429(t *testing.T) {
Expand All @@ -109,12 +107,8 @@ func TestServerError429(t *testing.T) {
defer server.Close()
c := NewClient(server.URL, "x")
err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
require.NotNil(t, err)
perror, ok := err.(*http3.Error)
require.True(t, ok)
require.NotNil(t, perror)
assert.Equal(t, "too many requests", perror.Code)
assert.Equal(t, "exceeded rate limit", perror.Message)
require.Error(t, err)
assert.Equal(t, "too many requests: exceeded rate limit", err.Error())
}

func TestServerOnPath(t *testing.T) {
Expand All @@ -130,7 +124,7 @@ func TestServerOnPath(t *testing.T) {
defer server.Close()
c := NewClient(server.URL+"/proxy/0:0/influx/", "x")
err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
require.Nil(t, err)
require.NoError(t, err)
}

func TestServerErrorNonJSON(t *testing.T) {
Expand All @@ -143,12 +137,8 @@ func TestServerErrorNonJSON(t *testing.T) {
defer server.Close()
c := NewClient(server.URL, "x")
err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
require.NotNil(t, err)
perror, ok := err.(*http3.Error)
require.True(t, ok)
require.NotNil(t, perror)
assert.Equal(t, "500 Internal Server Error", perror.Code)
assert.Equal(t, "internal server error", perror.Message)
require.Error(t, err)
assert.Equal(t, "500 Internal Server Error: internal server error", err.Error())
}

func TestServerErrorInflux1_8(t *testing.T) {
Expand All @@ -162,12 +152,8 @@ func TestServerErrorInflux1_8(t *testing.T) {
defer server.Close()
c := NewClient(server.URL, "x")
err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
require.NotNil(t, err)
perror, ok := err.(*http3.Error)
require.True(t, ok)
require.NotNil(t, perror)
assert.Equal(t, "404 Not Found", perror.Code)
assert.Equal(t, "bruh moment", perror.Message)
require.Error(t, err)
assert.Equal(t, "404 Not Found: bruh moment", err.Error())
}

func TestServerErrorEmptyBody(t *testing.T) {
Expand All @@ -178,6 +164,6 @@ func TestServerErrorEmptyBody(t *testing.T) {
defer server.Close()
c := NewClient(server.URL, "x")
err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i")
require.NotNil(t, err)
require.Error(t, err)
assert.Equal(t, "Unexpected status code 404", err.Error())
}
Loading

0 comments on commit 7469a6a

Please sign in to comment.