From 5596ec02807830c91b62bef865fbcf9d06d45368 Mon Sep 17 00:00:00 2001 From: andig Date: Wed, 15 Jul 2020 10:25:57 +0200 Subject: [PATCH 1/5] More logging --- internal/write/service.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/internal/write/service.go b/internal/write/service.go index 64ec8ffe..32685482 100644 --- a/internal/write/service.go +++ b/internal/write/service.go @@ -132,16 +132,19 @@ func (w *Service) WriteBatch(ctx context.Context, batch *Batch) error { if perror != nil { if perror.StatusCode == http.StatusTooManyRequests || perror.StatusCode == http.StatusServiceUnavailable { - log.Log.Errorf("Write error: %s\nBatch kept for retrying\n", perror.Error()) + log.Log.Errorf("Write error: %s\nChecking retry\n", perror.Error()) if perror.RetryAfter > 0 { batch.retryInterval = perror.RetryAfter * 1000 } else { batch.retryInterval = w.writeOptions.RetryInterval() } if batch.retries < w.writeOptions.MaxRetries() { + log.Log.Errorf("Write error: \nBatch kept for retrying\n") if w.retryQueue.push(batch) { log.Log.Warn("Retry buffer full, discarding oldest batch") } + } else { + log.Log.Errorf("Write error: \nMax retrys, batch discarded\n") } } else { log.Log.Errorf("Write error: %s\n", perror.Error()) From 08eb92c9651d0a95240c2fb5f29a6832e1ec1b1f Mon Sep 17 00:00:00 2001 From: andig Date: Wed, 15 Jul 2020 10:29:45 +0200 Subject: [PATCH 2/5] Use idiomatic structs --- api/write.go | 51 ++++++++++++++++++++++++++------------------------- 1 file changed, 26 insertions(+), 25 deletions(-) diff --git a/api/write.go b/api/write.go index b1e23f9f..a5115154 100644 --- a/api/write.go +++ b/api/write.go @@ -6,12 +6,13 @@ package api import ( "context" + "strings" + "time" + "github.com/influxdata/influxdb-client-go/api/write" "github.com/influxdata/influxdb-client-go/internal/http" "github.com/influxdata/influxdb-client-go/internal/log" iwrite "github.com/influxdata/influxdb-client-go/internal/write" - "strings" - "time" ) // WriteApiBlocking is Write client interface with non-blocking methods for writing time series data asynchronously in batches into an InfluxDB server. @@ -38,13 +39,13 @@ type writeApiImpl struct { service *iwrite.Service writeBuffer []string + errCh chan error writeCh chan *iwrite.Batch bufferCh chan string - writeStop chan int - bufferStop chan int - bufferFlush chan int - doneCh chan int - errCh chan error + writeStop chan struct{} + bufferStop chan struct{} + doneCh chan struct{} + bufferFlush chan struct{} bufferInfoCh chan writeBuffInfoReq writeInfoCh chan writeBuffInfoReq writeOptions *write.Options @@ -59,15 +60,16 @@ func NewWriteApiImpl(org string, bucket string, service http.Service, writeOptio service: iwrite.NewService(org, bucket, service, writeOptions), writeBuffer: make([]string, 0, writeOptions.BatchSize()+1), writeCh: make(chan *iwrite.Batch), - doneCh: make(chan int), bufferCh: make(chan string), - bufferStop: make(chan int), - writeStop: make(chan int), - bufferFlush: make(chan int), + bufferStop: make(chan struct{}), + writeStop: make(chan struct{}), + doneCh: make(chan struct{}), + bufferFlush: make(chan struct{}), bufferInfoCh: make(chan writeBuffInfoReq), writeInfoCh: make(chan writeBuffInfoReq), writeOptions: writeOptions, } + go w.bufferProc() go w.writeProc() @@ -82,7 +84,7 @@ func (w *writeApiImpl) Errors() <-chan error { } func (w *writeApiImpl) Flush() { - w.bufferFlush <- 1 + w.bufferFlush <- struct{}{} w.waitForFlushing() } @@ -133,7 +135,7 @@ x: } } log.Log.Info("Buffer proc finished") - w.doneCh <- 1 + w.doneCh <- struct{}{} } func (w *writeApiImpl) flushBuffer() { @@ -168,32 +170,31 @@ x: } } log.Log.Info("Write proc finished") - w.doneCh <- 1 + w.doneCh <- struct{}{} } func (w *writeApiImpl) Close() { if w.writeCh != nil { // Flush outstanding metrics w.Flush() - w.bufferStop <- 1 - //wait for buffer proc - <-w.doneCh + + // stop and wait for buffer proc close(w.bufferStop) + <-w.doneCh + close(w.bufferFlush) close(w.bufferCh) - w.writeStop <- 1 - //wait for the write proc + + // stop and wait for write proc + close(w.writeStop) <-w.doneCh + close(w.writeCh) - close(w.writeStop) close(w.writeInfoCh) close(w.bufferInfoCh) - w.bufferInfoCh = nil - w.writeInfoCh = nil w.writeCh = nil - w.writeStop = nil - w.bufferFlush = nil - w.bufferStop = nil + + // close errors if open if w.errCh != nil { close(w.errCh) w.errCh = nil From 018082485dd5853260a4cd03122e4e9cf15bb643 Mon Sep 17 00:00:00 2001 From: andig Date: Wed, 15 Jul 2020 10:33:47 +0200 Subject: [PATCH 3/5] Update changelog --- CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 67d0008f..afaf492c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 1.4.0 [2020-06-19] +1. [#154](https://github.com/influxdata/influxdb-client-go/pull/152) Use idiomatic go style for write channels (internal) + ## 1.3.0 [2020-06-19] ### Features 1. [#131](https://github.com/influxdata/influxdb-client-go/pull/131) Labels API From c3e3b2d62d675bd842ac2bae7b93750ae0d85638 Mon Sep 17 00:00:00 2001 From: andig Date: Wed, 15 Jul 2020 10:34:40 +0200 Subject: [PATCH 4/5] Change order --- api/write.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/api/write.go b/api/write.go index a5115154..4e3f9ed8 100644 --- a/api/write.go +++ b/api/write.go @@ -44,8 +44,8 @@ type writeApiImpl struct { bufferCh chan string writeStop chan struct{} bufferStop chan struct{} - doneCh chan struct{} bufferFlush chan struct{} + doneCh chan struct{} bufferInfoCh chan writeBuffInfoReq writeInfoCh chan writeBuffInfoReq writeOptions *write.Options @@ -63,8 +63,8 @@ func NewWriteApiImpl(org string, bucket string, service http.Service, writeOptio bufferCh: make(chan string), bufferStop: make(chan struct{}), writeStop: make(chan struct{}), - doneCh: make(chan struct{}), bufferFlush: make(chan struct{}), + doneCh: make(chan struct{}), bufferInfoCh: make(chan writeBuffInfoReq), writeInfoCh: make(chan writeBuffInfoReq), writeOptions: writeOptions, From 753f79dac8a9c63cc4677e59fc934ceb3b6b55a3 Mon Sep 17 00:00:00 2001 From: andig Date: Wed, 15 Jul 2020 10:37:46 +0200 Subject: [PATCH 5/5] Undo logging changes --- internal/write/service.go | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/internal/write/service.go b/internal/write/service.go index 32685482..64ec8ffe 100644 --- a/internal/write/service.go +++ b/internal/write/service.go @@ -132,19 +132,16 @@ func (w *Service) WriteBatch(ctx context.Context, batch *Batch) error { if perror != nil { if perror.StatusCode == http.StatusTooManyRequests || perror.StatusCode == http.StatusServiceUnavailable { - log.Log.Errorf("Write error: %s\nChecking retry\n", perror.Error()) + log.Log.Errorf("Write error: %s\nBatch kept for retrying\n", perror.Error()) if perror.RetryAfter > 0 { batch.retryInterval = perror.RetryAfter * 1000 } else { batch.retryInterval = w.writeOptions.RetryInterval() } if batch.retries < w.writeOptions.MaxRetries() { - log.Log.Errorf("Write error: \nBatch kept for retrying\n") if w.retryQueue.push(batch) { log.Log.Warn("Retry buffer full, discarding oldest batch") } - } else { - log.Log.Errorf("Write error: \nMax retrys, batch discarded\n") } } else { log.Log.Errorf("Write error: %s\n", perror.Error())