diff --git a/CHANGELOG.md b/CHANGELOG.md index 3f320a88..aaff91bf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,8 @@ +## 2.2.3 [in progress] +### Bug fixes +1. [#236](https://github.com/influxdata/influxdb-client-go/pull/236) Setting MaxRetries to zero value disables retry strategy. +1. [#239](https://github.com/influxdata/influxdb-client-go/pull/239) Blocking write client doesn't use retry handling. + ## 2.2.2 [2021-01-29] ### Bug fixes 1. [#229](https://github.com/influxdata/influxdb-client-go/pull/229) Connection errors are also subject for retrying. diff --git a/api/delete_e2e_test.go b/api/delete_e2e_test.go index 59e03047..eee9361a 100644 --- a/api/delete_e2e_test.go +++ b/api/delete_e2e_test.go @@ -34,7 +34,7 @@ func TestDeleteAPI(t *testing.T) { map[string]interface{}{"f": f, "i": i}, tm) err := writeAPI.WritePoint(ctx, p) - require.Nil(t, err, err) + require.NoError(t, err) f += 1.2 tm = tm.Add(time.Minute) } diff --git a/api/http/error.go b/api/http/error.go index e55095fd..6f4a5c80 100644 --- a/api/http/error.go +++ b/api/http/error.go @@ -5,6 +5,7 @@ package http import ( + "errors" "fmt" "strconv" ) @@ -30,6 +31,10 @@ func (e *Error) Error() string { } } +func (e *Error) Unwrap() error { + return errors.New(e.Error()) +} + // NewError returns newly created Error initialised with nested error and default values func NewError(err error) *Error { return &Error{ diff --git a/api/users_e2e_test.go b/api/users_e2e_test.go index e50e6ea5..236511ea 100644 --- a/api/users_e2e_test.go +++ b/api/users_e2e_test.go @@ -170,7 +170,7 @@ func TestSignInOut(t *testing.T) { // try authorized calls err = client.WriteAPIBlocking("my-org", "my-bucket").WriteRecord(ctx, "test,a=rock,b=local f=1.2,i=-5i") - assert.Nil(t, err) + assert.NoError(t, err) res, err := client.QueryAPI("my-org").QueryRaw(context.Background(), `from(bucket:"my-bucket")|> range(start: -24h) |> filter(fn: (r) => r._measurement == "test")`, influxdb2.DefaultDialect()) assert.Nil(t, err) diff --git a/api/write/options.go b/api/write/options.go index 8a194834..8bd13c85 100644 --- a/api/write/options.go +++ b/api/write/options.go @@ -69,7 +69,8 @@ func (o *Options) MaxRetries() uint { return o.maxRetries } -// SetMaxRetries sets maximum count of retry attempts of failed writes +// SetMaxRetries sets maximum count of retry attempts of failed writes. +// Setting zero value disables retry strategy. func (o *Options) SetMaxRetries(maxRetries uint) *Options { o.maxRetries = maxRetries return o diff --git a/api/writeAPIBlocking.go b/api/writeAPIBlocking.go index 29b7a17b..0cf5f0db 100644 --- a/api/writeAPIBlocking.go +++ b/api/writeAPIBlocking.go @@ -7,7 +7,6 @@ package api import ( "context" "strings" - "sync" http2 "github.com/influxdata/influxdb-client-go/v2/api/http" "github.com/influxdata/influxdb-client-go/v2/api/write" @@ -61,7 +60,6 @@ type WriteAPIBlocking interface { type writeAPIBlocking struct { service *iwrite.Service writeOptions *write.Options - lock sync.Mutex } // NewWriteAPIBlocking creates new WriteAPIBlocking instance for org and bucket with underlying client @@ -70,10 +68,11 @@ func NewWriteAPIBlocking(org string, bucket string, service http2.Service, write } func (w *writeAPIBlocking) write(ctx context.Context, line string) error { - w.lock.Lock() - defer w.lock.Unlock() - err := w.service.HandleWrite(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval())) - return err + err := w.service.WriteBatch(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval())) + if err != nil { + return err.Unwrap() + } + return nil } func (w *writeAPIBlocking) WriteRecord(ctx context.Context, line ...string) error { diff --git a/api/writeAPIBlocking_test.go b/api/writeAPIBlocking_test.go index e5a35e5f..d3269378 100644 --- a/api/writeAPIBlocking_test.go +++ b/api/writeAPIBlocking_test.go @@ -6,6 +6,8 @@ package api import ( "context" + "net" + "net/http" "sync" "testing" "time" @@ -20,7 +22,7 @@ import ( func TestWritePoint(t *testing.T) { service := test.NewTestService(t, "http://localhost:8888") writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) - points := genPoints(10) + points := test.GenPoints(10) err := writeAPI.WritePoint(context.Background(), points...) require.Nil(t, err) require.Len(t, service.Lines(), 10) @@ -35,7 +37,7 @@ func TestWritePoint(t *testing.T) { func TestWriteRecord(t *testing.T) { service := test.NewTestService(t, "http://localhost:8888") writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) - lines := genRecords(10) + lines := test.GenRecords(10) err := writeAPI.WriteRecord(context.Background(), lines...) require.Nil(t, err) require.Len(t, service.Lines(), 10) @@ -54,29 +56,10 @@ func TestWriteRecord(t *testing.T) { require.Equal(t, "invalid: data", err.Error()) } -func TestWriteContextCancel(t *testing.T) { - service := test.NewTestService(t, "http://localhost:8888") - writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) - lines := genRecords(10) - ctx, cancel := context.WithCancel(context.Background()) - var err error - var wg sync.WaitGroup - wg.Add(1) - go func() { - <-time.After(time.Second) - err = writeAPI.WriteRecord(ctx, lines...) - wg.Done() - }() - cancel() - wg.Wait() - require.Equal(t, context.Canceled, err) - assert.Len(t, service.Lines(), 0) -} - func TestWriteParallel(t *testing.T) { service := test.NewTestService(t, "http://localhost:8888") writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) - lines := genRecords(1000) + lines := test.GenRecords(1000) chanLine := make(chan string) var wg sync.WaitGroup @@ -99,3 +82,25 @@ func TestWriteParallel(t *testing.T) { service.Close() } + +func TestWriteErrors(t *testing.T) { + service := http2.NewService("http://locl:866", "", http2.DefaultOptions().SetHTTPClient(&http.Client{ + Timeout: 100 * time.Millisecond, + Transport: &http.Transport{ + DialContext: (&net.Dialer{ + Timeout: 100 * time.Millisecond, + }).DialContext, + }, + })) + writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) + points := test.GenPoints(10) + errors := 0 + for _, p := range points { + err := writeAPI.WritePoint(context.Background(), p) + if assert.Error(t, err) { + errors++ + } + } + require.Equal(t, 10, errors) + +} diff --git a/api/write_test.go b/api/write_test.go index a0d9642b..a2ff0afe 100644 --- a/api/write_test.go +++ b/api/write_test.go @@ -6,7 +6,6 @@ package api import ( "fmt" - "math/rand" "strings" "sync" "testing" @@ -20,49 +19,6 @@ import ( "github.com/stretchr/testify/require" ) -func genPoints(num int) []*write.Point { - points := make([]*write.Point, num) - rand.Seed(321) - - t := time.Now() - for i := 0; i < len(points); i++ { - points[i] = write.NewPoint( - "test", - map[string]string{ - "id": fmt.Sprintf("rack_%v", i%10), - "vendor": "AWS", - "hostname": fmt.Sprintf("host_%v", i%100), - }, - map[string]interface{}{ - "temperature": rand.Float64() * 80.0, - "disk_free": rand.Float64() * 1000.0, - "disk_total": (i/10 + 1) * 1000000, - "mem_total": (i/100 + 1) * 10000000, - "mem_free": rand.Uint64(), - }, - t) - if i%10 == 0 { - t = t.Add(time.Second) - } - } - return points -} - -func genRecords(num int) []string { - lines := make([]string, num) - rand.Seed(321) - - t := time.Now() - for i := 0; i < len(lines); i++ { - lines[i] = fmt.Sprintf("test,id=rack_%v,vendor=AWS,hostname=host_%v temperature=%v,disk_free=%v,disk_total=%vi,mem_total=%vi,mem_free=%vu %v", - i%10, i%100, rand.Float64()*80.0, rand.Float64()*1000.0, (i/10+1)*1000000, (i/100+1)*10000000, rand.Uint64(), t.UnixNano()) - if i%10 == 0 { - t = t.Add(time.Second) - } - } - return lines -} - func TestWriteAPIWriteDefaultTag(t *testing.T) { service := test.NewTestService(t, "http://localhost:8888") opts := write.DefaultOptions(). @@ -85,7 +41,7 @@ func TestWriteAPIWriteDefaultTag(t *testing.T) { func TestWriteAPIImpl_Write(t *testing.T) { service := test.NewTestService(t, "http://localhost:8888") writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) - points := genPoints(10) + points := test.GenPoints(10) for _, p := range points { writeAPI.WritePoint(p) } @@ -103,7 +59,7 @@ func TestGzipWithFlushing(t *testing.T) { service := test.NewTestService(t, "http://localhost:8888") log.Log.SetLogLevel(log.DebugLevel) writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetUseGZip(true)) - points := genPoints(5) + points := test.GenPoints(5) for _, p := range points { writeAPI.WritePoint(p) } @@ -128,7 +84,7 @@ func TestGzipWithFlushing(t *testing.T) { func TestFlushInterval(t *testing.T) { service := test.NewTestService(t, "http://localhost:8888") writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(10).SetFlushInterval(500)) - points := genPoints(5) + points := test.GenPoints(5) for _, p := range points { writeAPI.WritePoint(p) } @@ -153,7 +109,7 @@ func TestRetry(t *testing.T) { service := test.NewTestService(t, "http://localhost:8888") log.Log.SetLogLevel(log.DebugLevel) writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5).SetRetryInterval(10000)) - points := genPoints(15) + points := test.GenPoints(15) for i := 0; i < 5; i++ { writeAPI.WritePoint(points[i]) } @@ -203,7 +159,7 @@ func TestWriteError(t *testing.T) { recErr = <-errCh wg.Done() }() - points := genPoints(15) + points := test.GenPoints(15) for i := 0; i < 5; i++ { writeAPI.WritePoint(points[i]) } diff --git a/client_e2e_test.go b/client_e2e_test.go index 1ce0bb4c..9f25baf9 100644 --- a/client_e2e_test.go +++ b/client_e2e_test.go @@ -115,7 +115,7 @@ func TestWrite(t *testing.T) { err := client.WriteAPIBlocking("my-org", "my-bucket").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("test"). AddTag("a", "3").AddField("i", 20).AddField("f", 4.4)) - assert.Nil(t, err) + assert.NoError(t, err) client.Close() wg.Wait() @@ -203,7 +203,7 @@ func TestWriteV1Compatibility(t *testing.T) { err := client.WriteAPIBlocking("", "mydb/autogen").WritePoint(context.Background(), influxdb2.NewPointWithMeasurement("testv1"). AddTag("a", "3").AddField("i", 20).AddField("f", 4.4)) - assert.Nil(t, err) + assert.NoError(t, err) client.Close() wg.Wait() @@ -273,7 +273,7 @@ func TestHTTPService(t *testing.T) { org, err := client.OrganizationsAPI().FindOrganizationByName(context.Background(), "my-org") if err != nil { //return err - t.Error(err) + t.Fatal(err) } taskDescription := "Example task" taskFlux := `option task = { diff --git a/client_test.go b/client_test.go index 9577ac6c..4066d9ab 100644 --- a/client_test.go +++ b/client_test.go @@ -12,7 +12,6 @@ import ( "testing" "time" - http3 "github.com/influxdata/influxdb-client-go/v2/api/http" http2 "github.com/influxdata/influxdb-client-go/v2/internal/http" iwrite "github.com/influxdata/influxdb-client-go/v2/internal/write" "github.com/stretchr/testify/assert" @@ -39,8 +38,7 @@ func TestUrls(t *testing.T) { assert.Equal(t, url.serverURL, ci.serverURL) assert.Equal(t, url.serverAPIURL, ci.httpService.ServerAPIURL()) ws := iwrite.NewService("org", "bucket", ci.httpService, c.Options().WriteOptions()) - wu, err := ws.WriteURL() - require.Nil(t, err) + wu := ws.WriteURL() assert.Equal(t, url.writeURLPrefix+"?bucket=bucket&org=org&precision=ns", wu) }) } @@ -94,7 +92,7 @@ func TestUserAgent(t *testing.T) { assert.Nil(t, err) err = c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i") - assert.Nil(t, err) + assert.NoError(t, err) } func TestServerError429(t *testing.T) { @@ -109,12 +107,8 @@ func TestServerError429(t *testing.T) { defer server.Close() c := NewClient(server.URL, "x") err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i") - require.NotNil(t, err) - perror, ok := err.(*http3.Error) - require.True(t, ok) - require.NotNil(t, perror) - assert.Equal(t, "too many requests", perror.Code) - assert.Equal(t, "exceeded rate limit", perror.Message) + require.Error(t, err) + assert.Equal(t, "too many requests: exceeded rate limit", err.Error()) } func TestServerOnPath(t *testing.T) { @@ -130,7 +124,7 @@ func TestServerOnPath(t *testing.T) { defer server.Close() c := NewClient(server.URL+"/proxy/0:0/influx/", "x") err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i") - require.Nil(t, err) + require.NoError(t, err) } func TestServerErrorNonJSON(t *testing.T) { @@ -143,12 +137,8 @@ func TestServerErrorNonJSON(t *testing.T) { defer server.Close() c := NewClient(server.URL, "x") err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i") - require.NotNil(t, err) - perror, ok := err.(*http3.Error) - require.True(t, ok) - require.NotNil(t, perror) - assert.Equal(t, "500 Internal Server Error", perror.Code) - assert.Equal(t, "internal server error", perror.Message) + require.Error(t, err) + assert.Equal(t, "500 Internal Server Error: internal server error", err.Error()) } func TestServerErrorInflux1_8(t *testing.T) { @@ -162,12 +152,8 @@ func TestServerErrorInflux1_8(t *testing.T) { defer server.Close() c := NewClient(server.URL, "x") err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i") - require.NotNil(t, err) - perror, ok := err.(*http3.Error) - require.True(t, ok) - require.NotNil(t, perror) - assert.Equal(t, "404 Not Found", perror.Code) - assert.Equal(t, "bruh moment", perror.Message) + require.Error(t, err) + assert.Equal(t, "404 Not Found: bruh moment", err.Error()) } func TestServerErrorEmptyBody(t *testing.T) { @@ -178,6 +164,6 @@ func TestServerErrorEmptyBody(t *testing.T) { defer server.Close() c := NewClient(server.URL, "x") err := c.WriteAPIBlocking("o", "b").WriteRecord(context.Background(), "a,a=a a=1i") - require.NotNil(t, err) + require.Error(t, err) assert.Equal(t, "Unexpected status code 404", err.Error()) } diff --git a/internal/test/generators.go b/internal/test/generators.go new file mode 100644 index 00000000..8d3dd684 --- /dev/null +++ b/internal/test/generators.go @@ -0,0 +1,52 @@ +package test + +import ( + "fmt" + "math/rand" + "time" + + "github.com/influxdata/influxdb-client-go/v2/api/write" +) + +func GenPoints(num int) []*write.Point { + points := make([]*write.Point, num) + rand.Seed(321) + + t := time.Now() + for i := 0; i < len(points); i++ { + points[i] = write.NewPoint( + "test", + map[string]string{ + "id": fmt.Sprintf("rack_%v", i%10), + "vendor": "AWS", + "hostname": fmt.Sprintf("host_%v", i%100), + }, + map[string]interface{}{ + "temperature": rand.Float64() * 80.0, + "disk_free": rand.Float64() * 1000.0, + "disk_total": (i/10 + 1) * 1000000, + "mem_total": (i/100 + 1) * 10000000, + "mem_free": rand.Uint64(), + }, + t) + if i%10 == 0 { + t = t.Add(time.Second) + } + } + return points +} + +func GenRecords(num int) []string { + lines := make([]string, num) + rand.Seed(321) + + t := time.Now() + for i := 0; i < len(lines); i++ { + lines[i] = fmt.Sprintf("test,id=rack_%v,vendor=AWS,hostname=host_%v temperature=%v,disk_free=%v,disk_total=%vi,mem_total=%vi,mem_free=%vu %v", + i%10, i%100, rand.Float64()*80.0, rand.Float64()*1000.0, (i/10+1)*1000000, (i/100+1)*10000000, rand.Uint64(), t.UnixNano()) + if i%10 == 0 { + t = t.Add(time.Second) + } + } + return lines +} diff --git a/internal/write/service.go b/internal/write/service.go index 80397b35..84154ce8 100644 --- a/internal/write/service.go +++ b/internal/write/service.go @@ -55,7 +55,15 @@ func NewService(org string, bucket string, httpService http2.Service, options *w if retryBufferLimit == 0 { retryBufferLimit = 1 } - return &Service{org: org, bucket: bucket, httpService: httpService, writeOptions: options, retryQueue: newQueue(int(retryBufferLimit)), retryExponentialBase: 5} + u, _ := url.Parse(httpService.ServerAPIURL()) + u, _ = u.Parse("write") + params := u.Query() + params.Set("org", org) + params.Set("bucket", bucket) + params.Set("precision", precisionToString(options.Precision())) + u.RawQuery = params.Encode() + writeURL := u.String() + return &Service{org: org, bucket: bucket, httpService: httpService, url: writeURL, writeOptions: options, retryQueue: newQueue(int(retryBufferLimit)), retryExponentialBase: 5} } func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { @@ -138,12 +146,8 @@ func (w *Service) HandleWrite(ctx context.Context, batch *Batch) error { } func (w *Service) WriteBatch(ctx context.Context, batch *Batch) *http2.Error { - wURL, err := w.WriteURL() - if err != nil { - log.Errorf("%s\n", err.Error()) - return http2.NewError(err) - } var body io.Reader + var err error body = strings.NewReader(batch.batch) log.Debugf("Writing batch: %s", batch.batch) if w.writeOptions.UseGZip() { @@ -152,8 +156,10 @@ func (w *Service) WriteBatch(ctx context.Context, batch *Batch) *http2.Error { return http2.NewError(err) } } + w.lock.Lock() w.lastWriteAttempt = time.Now() - perror := w.httpService.DoPostRequest(ctx, wURL, body, func(req *http.Request) { + w.lock.Unlock() + perror := w.httpService.DoPostRequest(ctx, w.url, body, func(req *http.Request) { if w.writeOptions.UseGZip() { req.Header.Set("Content-Encoding", "gzip") } @@ -238,26 +244,8 @@ func (w *Service) pointToEncode(point *write.Point) lp.Metric { return m } -func (w *Service) WriteURL() (string, error) { - if w.url == "" { - u, err := url.Parse(w.httpService.ServerAPIURL()) - if err != nil { - return "", err - } - u, err = u.Parse("write") - if err != nil { - return "", err - } - params := u.Query() - params.Set("org", w.org) - params.Set("bucket", w.bucket) - params.Set("precision", precisionToString(w.writeOptions.Precision())) - u.RawQuery = params.Encode() - w.lock.Lock() - w.url = u.String() - w.lock.Unlock() - } - return w.url, nil +func (w *Service) WriteURL() string { + return w.url } func precisionToString(precision time.Duration) string { diff --git a/internal/write/service_test.go b/internal/write/service_test.go index cfad875d..b89b0fc7 100644 --- a/internal/write/service_test.go +++ b/internal/write/service_test.go @@ -7,6 +7,8 @@ package write import ( "context" "errors" + "strings" + "sync" "testing" "time" @@ -19,12 +21,13 @@ import ( ) func TestAddDefaultTags(t *testing.T) { + hs := test.NewTestService(t, "http://localhost:8888") opts := write.DefaultOptions() assert.Len(t, opts.DefaultTags(), 0) opts.AddDefaultTag("dt1", "val1") opts.AddDefaultTag("zdt", "val2") - srv := NewService("org", "buc", nil, opts) + srv := NewService("org", "buc", hs, opts) p := write.NewPointWithMeasurement("test") p.AddTag("id", "101") @@ -135,7 +138,7 @@ func TestCustomRetryDelayWithFLush(t *testing.T) { func TestBufferOverwrite(t *testing.T) { log.Log.SetLogLevel(log.DebugLevel) hs := test.NewTestService(t, "http://localhost:8086") - // + // Buffer limit 15000, bach ii 5000 => buffer for 3 batches opts := write.DefaultOptions().SetRetryInterval(1).SetRetryBufferLimit(15000) ctx := context.Background() srv := NewService("my-org", "my-bucket", hs, opts) @@ -162,25 +165,33 @@ func TestBufferOverwrite(t *testing.T) { assert.Equal(t, uint(25), b1.retryDelay) assert.Equal(t, 3, srv.retryQueue.list.Len()) - // now it should drop b1 - <-time.After(time.Millisecond * time.Duration(b1.retryDelay)) + // Write early + <-time.After(time.Millisecond * time.Duration(b1.retryDelay) / 2) b4 := NewBatch("4\n", opts.RetryInterval()) err = srv.HandleWrite(ctx, b4) - assert.NotNil(t, err) + assert.NoError(t, err) assert.Equal(t, uint(1), b2.retryDelay) assert.Equal(t, 3, srv.retryQueue.list.Len()) + // Overwrite + <-time.After(time.Millisecond * time.Duration(b1.retryDelay) / 2) + b5 := NewBatch("5\n", opts.RetryInterval()) + err = srv.HandleWrite(ctx, b5) + assert.Error(t, err) + assert.Equal(t, uint(5), b2.retryDelay) + assert.Equal(t, 3, srv.retryQueue.list.Len()) + // let write pass and it will clear queue <-time.After(time.Millisecond * time.Duration(b1.retryDelay)) hs.SetReplyError(nil) - err = srv.HandleWrite(ctx, NewBatch("5\n", opts.RetryInterval())) + err = srv.HandleWrite(ctx, NewBatch("6\n", opts.RetryInterval())) assert.Nil(t, err) assert.Equal(t, 0, srv.retryQueue.list.Len()) require.Len(t, hs.Lines(), 4) - assert.Equal(t, "2", hs.Lines()[0]) - assert.Equal(t, "3", hs.Lines()[1]) - assert.Equal(t, "4", hs.Lines()[2]) - assert.Equal(t, "5", hs.Lines()[3]) + assert.Equal(t, "3", hs.Lines()[0]) + assert.Equal(t, "4", hs.Lines()[1]) + assert.Equal(t, "5", hs.Lines()[2]) + assert.Equal(t, "6", hs.Lines()[3]) } func TestMaxRetryInterval(t *testing.T) { @@ -277,3 +288,23 @@ func TestNoRetryIfMaxRetriesIsZero(t *testing.T) { assert.NotNil(t, err) assert.Equal(t, 0, srv.retryQueue.list.Len()) } + +func TestWriteContextCancel(t *testing.T) { + hs := test.NewTestService(t, "http://localhost:8888") + opts := write.DefaultOptions() + srv := NewService("my-org", "my-bucket", hs, opts) + lines := test.GenRecords(10) + ctx, cancel := context.WithCancel(context.Background()) + var err error + var wg sync.WaitGroup + wg.Add(1) + go func() { + <-time.After(time.Second) + err = srv.HandleWrite(ctx, NewBatch(strings.Join(lines, "\n"), opts.RetryInterval())) + wg.Done() + }() + cancel() + wg.Wait() + require.Equal(t, context.Canceled, err) + assert.Len(t, hs.Lines(), 0) +} diff --git a/options.go b/options.go index ccbb868f..43a3dffd 100644 --- a/options.go +++ b/options.go @@ -61,7 +61,8 @@ func (o *Options) MaxRetries() uint { return o.WriteOptions().MaxRetries() } -// SetMaxRetries sets maximum count of retry attempts of failed writes +// SetMaxRetries sets maximum count of retry attempts of failed writes. +// Setting zero value disables retry strategy. func (o *Options) SetMaxRetries(maxRetries uint) *Options { o.WriteOptions().SetMaxRetries(maxRetries) return o