From 2df75aa06549fdeb5c88b68db7d610814e309144 Mon Sep 17 00:00:00 2001 From: vlastahajek Date: Fri, 30 Oct 2020 14:33:54 +0100 Subject: [PATCH] fix: synchronizing access to write service (#208) --- CHANGELOG.md | 1 + api/writeAPIBlocking.go | 4 ++++ api/writeAPIBlocking_test.go | 27 +++++++++++++++++++++++++++ 3 files changed, 32 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6a1e0e39..1c244fc3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ 1. [#206](https://github.com/influxdata/influxdb-client-go/pull/206) Adding TasksAPI for managing tasks and associated logs and runs. ### Bug fixes +1. [#209](https://github.com/influxdata/influxdb-client-go/pull/209) Synchronizing access to the write service in WriteAPIBlocking. ### Documentation diff --git a/api/writeAPIBlocking.go b/api/writeAPIBlocking.go index f73609b9..d50aa48d 100644 --- a/api/writeAPIBlocking.go +++ b/api/writeAPIBlocking.go @@ -7,6 +7,7 @@ package api import ( "context" "strings" + "sync" http2 "github.com/influxdata/influxdb-client-go/v2/api/http" "github.com/influxdata/influxdb-client-go/v2/api/write" @@ -60,6 +61,7 @@ type WriteAPIBlocking interface { type writeAPIBlocking struct { service *iwrite.Service writeOptions *write.Options + lock sync.Mutex } // NewWriteAPIBlocking creates new WriteAPIBlocking instance for org and bucket with underlying client @@ -68,6 +70,8 @@ func NewWriteAPIBlocking(org string, bucket string, service http2.Service, write } func (w *writeAPIBlocking) write(ctx context.Context, line string) error { + w.lock.Lock() + defer w.lock.Unlock() err := w.service.HandleWrite(ctx, iwrite.NewBatch(line, w.writeOptions.RetryInterval())) return err } diff --git a/api/writeAPIBlocking_test.go b/api/writeAPIBlocking_test.go index fd46a8e4..407903b9 100644 --- a/api/writeAPIBlocking_test.go +++ b/api/writeAPIBlocking_test.go @@ -72,3 +72,30 @@ func TestWriteContextCancel(t *testing.T) { require.Equal(t, context.Canceled, err) assert.Len(t, service.Lines(), 0) } + +func TestWriteParallel(t *testing.T) { + service := test.NewTestService(t, "http://localhost:8888") + writeAPI := NewWriteAPIBlocking("my-org", "my-bucket", service, write.DefaultOptions().SetBatchSize(5)) + lines := genRecords(1000) + + chanLine := make(chan string) + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + for l := range chanLine { + err := writeAPI.WriteRecord(context.Background(), l) + assert.Nil(t, err) + } + wg.Done() + }() + } + for _, l := range lines { + chanLine <- l + } + close(chanLine) + wg.Wait() + assert.Len(t, service.Lines(), len(lines)) + + service.Close() +}