From 8be588af305ca1c511322642476416f095131e98 Mon Sep 17 00:00:00 2001 From: Anton Onikiychuk Date: Wed, 17 Jun 2020 21:03:22 +0200 Subject: [PATCH] Add graceful shutdown --- main.go | 23 ++++++++++++++++++++++- store/reader_test.go | 26 +++++++++++++++++++++----- store/store.go | 6 ++++-- store/writer.go | 19 ++++++++++++++----- store/writer_test.go | 22 +++++++++++++++++++++- 5 files changed, 82 insertions(+), 14 deletions(-) diff --git a/main.go b/main.go index d951d09..b85fcbc 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,10 @@ package main import ( + "context" "flag" + "sync" + "time" "github.com/dodopizza/jaeger-kusto/store" "github.com/hashicorp/go-hclog" @@ -10,6 +13,9 @@ import ( func main() { + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + defer gracefulShutdown(&wg, cancel) logger := hclog.New(&hclog.LoggerOptions{ Level: hclog.Warn, Name: "jaeger-kusto", @@ -23,6 +29,21 @@ func main() { kustoConfig := store.InitConfig(configPath, logger) - kustoStore := store.NewStore(*kustoConfig, logger) + kustoStore := store.NewStore(*kustoConfig, logger, ctx, &wg) grpc.Serve(kustoStore) } + +func gracefulShutdown(wg *sync.WaitGroup, cancel context.CancelFunc) { + cancel() + done := make(chan bool, 1) + go func() { + wg.Wait() + done <- true + }() + select { + case <-done: + return + case <-time.After(time.Second): + return + } +} diff --git a/store/reader_test.go b/store/reader_test.go index 367037e..aa5fb70 100644 --- a/store/reader_test.go +++ b/store/reader_test.go @@ -3,6 +3,7 @@ package store import ( "context" "fmt" + "sync" "testing" "time" @@ -25,7 +26,10 @@ const testConfigPath = ".././jaeger-kusto-config.json" func TestKustoSpanReader_GetTrace(tester *testing.T) { testConfig := InitConfig(testConfigPath, logger) - kustoStore := NewStore(*testConfig, logger) + appCtx, appCancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + defer gracefulShutdown(&wg, appCancel) + kustoStore := NewStore(*testConfig, logger, appCtx, &wg) trace, _ := model.TraceIDFromString("0232d7f26e2317b1") ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) @@ -41,7 +45,10 @@ func TestKustoSpanReader_GetTrace(tester *testing.T) { func TestKustoSpanReader_GetServices(t *testing.T) { testConfig := InitConfig(testConfigPath, logger) - kustoStore := NewStore(*testConfig, logger) + appCtx, appCancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + defer gracefulShutdown(&wg, appCancel) + kustoStore := NewStore(*testConfig, logger, appCtx, &wg) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -56,7 +63,10 @@ func TestKustoSpanReader_GetServices(t *testing.T) { func TestKustoSpanReader_GetOperations(t *testing.T) { testConfig := InitConfig(testConfigPath, logger) - kustoStore := NewStore(*testConfig, logger) + appCtx, appCancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + defer gracefulShutdown(&wg, appCancel) + kustoStore := NewStore(*testConfig, logger, appCtx, &wg) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -84,7 +94,10 @@ func TestFindTraces(tester *testing.T) { } testConfig := InitConfig(testConfigPath, logger) - kustoStore := NewStore(*testConfig, logger) + appCtx, appCancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + defer gracefulShutdown(&wg, appCancel) + kustoStore := NewStore(*testConfig, logger, appCtx, &wg) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() @@ -99,7 +112,10 @@ func TestFindTraces(tester *testing.T) { func TestStore_DependencyReader(t *testing.T) { testConfig := InitConfig(testConfigPath, logger) - kustoStore := NewStore(*testConfig, logger) + appCtx, appCancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + defer gracefulShutdown(&wg, appCancel) + kustoStore := NewStore(*testConfig, logger, appCtx, &wg) dependencyLinks, err := kustoStore.DependencyReader().GetDependencies(time.Now(), 168*time.Hour) if err != nil { logger.Error("can't find dependencyLinks", err.Error()) diff --git a/store/store.go b/store/store.go index 17d7256..f873cec 100644 --- a/store/store.go +++ b/store/store.go @@ -1,6 +1,7 @@ package store import ( + "context" "github.com/Azure/azure-kusto-go/kusto" "github.com/Azure/azure-kusto-go/kusto/ingest" "github.com/Azure/go-autorest/autorest/azure/auth" @@ -8,6 +9,7 @@ import ( "github.com/jaegertracing/jaeger/plugin/storage/grpc/shared" "github.com/jaegertracing/jaeger/storage/dependencystore" "github.com/jaegertracing/jaeger/storage/spanstore" + "sync" ) type store struct { @@ -30,7 +32,7 @@ func (f *kustoFactory) Ingest(database string) (in kustoIngest, err error) { } // NewStore creates new Kusto store for Jaeger span storage -func NewStore(config KustoConfig, logger hclog.Logger) shared.StoragePlugin { +func NewStore(config KustoConfig, logger hclog.Logger, appCtx context.Context, shutdownWg *sync.WaitGroup) shared.StoragePlugin { authorizer := kusto.Authorization{ Config: auth.NewClientCredentialsConfig(config.ClientID, config.ClientSecret, config.TenantID), @@ -45,7 +47,7 @@ func NewStore(config KustoConfig, logger hclog.Logger) shared.StoragePlugin { reader := newKustoSpanReader(&factory, config.Database) - writer := newKustoSpanWriter(&factory, logger, config.Database) + writer := newKustoSpanWriter(&factory, logger, config.Database, appCtx, shutdownWg) store := &store{ dependencyStoreReader: reader, reader: reader, diff --git a/store/writer.go b/store/writer.go index 96addd6..821037f 100644 --- a/store/writer.go +++ b/store/writer.go @@ -5,6 +5,7 @@ import ( "context" "fmt" "io" + "sync" "time" "github.com/Azure/azure-kusto-go/kusto/ingest" @@ -18,19 +19,21 @@ type kustoIngest interface { } type kustoSpanWriter struct { - ingest kustoIngest - ch chan []string - logger hclog.Logger + ingest kustoIngest + ch chan []string + logger hclog.Logger + appCtx context.Context + shutdownWg *sync.WaitGroup } -func newKustoSpanWriter(client *kustoFactory, logger hclog.Logger, database string) *kustoSpanWriter { +func newKustoSpanWriter(client *kustoFactory, logger hclog.Logger, database string, ctx context.Context, wg *sync.WaitGroup) *kustoSpanWriter { in, err := client.Ingest(database) if err != nil { logger.Error(fmt.Sprintf("%#v", err)) } ch := make(chan []string, 100) - writer := &kustoSpanWriter{in, ch, logger} + writer := &kustoSpanWriter{in, ch, logger, ctx, wg} go writer.ingestCSV(ch) @@ -47,6 +50,8 @@ func (k kustoSpanWriter) WriteSpan(span *model.Span) error { func (k kustoSpanWriter) ingestCSV(ch <-chan []string) { + k.shutdownWg.Add(1) + defer func() { k.shutdownWg.Done() }() ticker := time.NewTicker(5 * time.Second) b := &bytes.Buffer{} @@ -55,8 +60,12 @@ func (k kustoSpanWriter) ingestCSV(ch <-chan []string) { for { select { + case <-k.appCtx.Done(): + ingestBatch(k, b) + return case buf, ok := <-ch: if !ok { + ingestBatch(k, b) return } if b.Len() > 1048576 { diff --git a/store/writer_test.go b/store/writer_test.go index a302b58..482591d 100644 --- a/store/writer_test.go +++ b/store/writer_test.go @@ -1,6 +1,8 @@ package store import ( + "context" + "sync" "testing" "time" @@ -62,9 +64,27 @@ func TestWriteSpan(tester *testing.T) { } testConfig := InitConfig(testConfigPath, logger) - kustoStore := NewStore(*testConfig, logger) + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + defer gracefulShutdown(&wg, cancel) + kustoStore := NewStore(*testConfig, logger, ctx, &wg) assert.NoError(tester, kustoStore.SpanWriter().WriteSpan(span)) assert.NoError(tester, kustoStore.SpanWriter().WriteSpan(span2)) assert.NoError(tester, kustoStore.SpanWriter().WriteSpan(span3)) } + +func gracefulShutdown(wg *sync.WaitGroup, cancel context.CancelFunc) { + cancel() + done := make(chan bool, 1) + go func() { + wg.Wait() + done <- true + }() + select { + case <-done: + return + case <-time.After(time.Second): + return + } +}