Skip to content

Commit

Permalink
fix: Added flushing also retry queue
Browse files Browse the repository at this point in the history
  • Loading branch information
vlastahajek committed Jul 27, 2022
1 parent 35c2391 commit ed20047
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 2 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## [unreleased]
### Bug fixes
- [#341](https://github.com/influxdata/influxdb-client-go/issues/341) Changing logging level of messages about discarding batch to Error.
- [#341](https://github.com/influxdata/influxdb-client-go/issues/341) Changing logging level of messages about discarding batch to Error.TestRetryIntervalAccumulation
- [#344](https://github.com/influxdata/influxdb-client-go/issues/344) `WriteAPI.Flush()` writes also batches from the retry queue

## 2.9.1 [2022-06-24]
### Bug fixes
Expand Down
4 changes: 3 additions & 1 deletion api/write.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,12 @@ func (w *WriteAPIImpl) Errors() <-chan error {
return w.errCh
}

// Flush forces all pending writes from the buffer to be sent
// Flush forces all pending writes from the buffer to be sent.
// Flush also tries sending batches from retry queue without additional retrying.
func (w *WriteAPIImpl) Flush() {
w.bufferFlush <- struct{}{}
w.waitForFlushing()
w.service.Flush()
}

func (w *WriteAPIImpl) waitForFlushing() {
Expand Down
33 changes: 33 additions & 0 deletions api/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package api

import (
"fmt"
"io"
"math"
"runtime"
"strings"
Expand Down Expand Up @@ -232,3 +233,35 @@ func TestClosing(t *testing.T) {
assert.Len(t, service.Lines(), 0)

}

func TestFlushWithRetries(t *testing.T) {
service := test.NewTestService(t, "http://localhost:8888")
log.Log.SetLogLevel(log.DebugLevel)
writeAPI := NewWriteAPI("my-org", "my-bucket", service, write.DefaultOptions().SetRetryInterval(200).SetBatchSize(1))
points := test.GenPoints(5)
fails := 0

var mu sync.Mutex

service.SetRequestHandler(func(url string, body io.Reader) error {
mu.Lock()
defer mu.Unlock()
// fail 4 times, then succeed on the 5th try - maxRetries default is 5
if fails >= 4 {
_ = service.DecodeLines(body)
return nil
}
fails++
return fmt.Errorf("spurious failure")
})
// write will try first batch and others will be put to the retry queue of retry delay caused by first write error
for i := 0; i < len(points); i++ {
writeAPI.WritePoint(points[i])
}
// Flush will try sending first batch again and then others
// 1st, 2nd and 3rd will fail, because test service rejects 4 writes
writeAPI.Flush()
writeAPI.Close()
// two remained
assert.Equal(t, 2, len(service.Lines()))
}
14 changes: 14 additions & 0 deletions internal/write/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,20 @@ func (w *Service) WriteBatch(ctx context.Context, batch *Batch) *http2.Error {
return perror
}

// Flush sends batches from retry queue immediately, without retrying
func (w *Service) Flush() {
for !w.retryQueue.isEmpty() {
b := w.retryQueue.pop()
if time.Now().After(b.Expires) {
log.Error("Oldest batch in retry queue expired, discarding")
continue
}
if err := w.WriteBatch(context.Background(), b); err != nil {
log.Errorf("Error flushing batch from retry queue: %w", err.Unwrap())
}
}
}

// pointWithDefaultTags encapsulates Point with default tags
type pointWithDefaultTags struct {
point *write.Point
Expand Down
54 changes: 54 additions & 0 deletions internal/write/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -593,3 +593,57 @@ func TestRetryIntervalAccumulation(t *testing.T) {
// Debug line to capture output of successful test
// assert.True(t, false)
}

func TestFlush(t *testing.T) {
log.Log.SetLogLevel(log.DebugLevel)
hs := test.NewTestService(t, "http://localhost:8086")
//
opts := write.DefaultOptions().SetRetryInterval(1)
ctx := context.Background()
srv := NewService("my-org", "my-bucket", hs, opts)

hs.SetReplyError(&http.Error{
Err: errors.New("connection refused"),
})

lines := test.GenRecords(5)
// Test flush will fail all batches
for _, line := range lines {
b := NewBatch(line, 20)
_ = srv.HandleWrite(ctx, b)
}
assert.Equal(t, 5, srv.retryQueue.list.Len())
srv.Flush()
assert.Len(t, hs.Lines(), 0)

// Test flush will find all batches expired
for _, line := range lines {
b := NewBatch(line, 5)
_ = srv.HandleWrite(ctx, b)
}

assert.Equal(t, 5, srv.retryQueue.list.Len())
<-time.After(5 * time.Millisecond)

hs.SetReplyError(nil)
// all batches should expire
srv.Flush()
assert.Len(t, hs.Lines(), 0)
assert.Equal(t, 0, srv.retryQueue.list.Len())

// Test flush will succeed
hs.SetReplyError(&http.Error{
Err: errors.New("connection refused"),
})
for _, line := range lines {
b := NewBatch(line, 5)
_ = srv.HandleWrite(ctx, b)
}

assert.Equal(t, 5, srv.retryQueue.list.Len())
hs.SetReplyError(nil)
// all batches should expire
srv.Flush()
assert.Len(t, hs.Lines(), 5)
assert.Equal(t, 0, srv.retryQueue.list.Len())
}

0 comments on commit ed20047

Please sign in to comment.