Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add graceful shutdown #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package main

import (
"context"
"flag"
"sync"
"time"

"github.com/dodopizza/jaeger-kusto/store"
"github.com/hashicorp/go-hclog"
Expand All @@ -10,6 +13,9 @@ import (

func main() {

ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
defer gracefulShutdown(&wg, cancel)
logger := hclog.New(&hclog.LoggerOptions{
Level: hclog.Warn,
Name: "jaeger-kusto",
Expand All @@ -23,6 +29,21 @@ func main() {

kustoConfig := store.InitConfig(configPath, logger)

kustoStore := store.NewStore(*kustoConfig, logger)
kustoStore := store.NewStore(*kustoConfig, logger, ctx, &wg)
grpc.Serve(kustoStore)
}

func gracefulShutdown(wg *sync.WaitGroup, cancel context.CancelFunc) {
cancel()
done := make(chan bool, 1)
go func() {
wg.Wait()
done <- true
}()
select {
case <-done:
return
case <-time.After(time.Second):
return
}
}
26 changes: 21 additions & 5 deletions store/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"context"
"fmt"
"sync"
"testing"
"time"

Expand All @@ -25,7 +26,10 @@ const testConfigPath = ".././jaeger-kusto-config.json"
func TestKustoSpanReader_GetTrace(tester *testing.T) {

testConfig := InitConfig(testConfigPath, logger)
kustoStore := NewStore(*testConfig, logger)
appCtx, appCancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
defer gracefulShutdown(&wg, appCancel)
kustoStore := NewStore(*testConfig, logger, appCtx, &wg)
trace, _ := model.TraceIDFromString("0232d7f26e2317b1")

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
Expand All @@ -41,7 +45,10 @@ func TestKustoSpanReader_GetTrace(tester *testing.T) {
func TestKustoSpanReader_GetServices(t *testing.T) {

testConfig := InitConfig(testConfigPath, logger)
kustoStore := NewStore(*testConfig, logger)
appCtx, appCancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
defer gracefulShutdown(&wg, appCancel)
kustoStore := NewStore(*testConfig, logger, appCtx, &wg)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand All @@ -56,7 +63,10 @@ func TestKustoSpanReader_GetServices(t *testing.T) {
func TestKustoSpanReader_GetOperations(t *testing.T) {

testConfig := InitConfig(testConfigPath, logger)
kustoStore := NewStore(*testConfig, logger)
appCtx, appCancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
defer gracefulShutdown(&wg, appCancel)
kustoStore := NewStore(*testConfig, logger, appCtx, &wg)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand Down Expand Up @@ -84,7 +94,10 @@ func TestFindTraces(tester *testing.T) {
}

testConfig := InitConfig(testConfigPath, logger)
kustoStore := NewStore(*testConfig, logger)
appCtx, appCancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
defer gracefulShutdown(&wg, appCancel)
kustoStore := NewStore(*testConfig, logger, appCtx, &wg)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
Expand All @@ -99,7 +112,10 @@ func TestFindTraces(tester *testing.T) {

func TestStore_DependencyReader(t *testing.T) {
testConfig := InitConfig(testConfigPath, logger)
kustoStore := NewStore(*testConfig, logger)
appCtx, appCancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
defer gracefulShutdown(&wg, appCancel)
kustoStore := NewStore(*testConfig, logger, appCtx, &wg)
dependencyLinks, err := kustoStore.DependencyReader().GetDependencies(time.Now(), 168*time.Hour)
if err != nil {
logger.Error("can't find dependencyLinks", err.Error())
Expand Down
6 changes: 4 additions & 2 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package store

import (
"context"
"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/ingest"
"github.com/Azure/go-autorest/autorest/azure/auth"
"github.com/hashicorp/go-hclog"
"github.com/jaegertracing/jaeger/plugin/storage/grpc/shared"
"github.com/jaegertracing/jaeger/storage/dependencystore"
"github.com/jaegertracing/jaeger/storage/spanstore"
"sync"
)

type store struct {
Expand All @@ -30,7 +32,7 @@ func (f *kustoFactory) Ingest(database string) (in kustoIngest, err error) {
}

// NewStore creates new Kusto store for Jaeger span storage
func NewStore(config KustoConfig, logger hclog.Logger) shared.StoragePlugin {
func NewStore(config KustoConfig, logger hclog.Logger, appCtx context.Context, shutdownWg *sync.WaitGroup) shared.StoragePlugin {

authorizer := kusto.Authorization{
Config: auth.NewClientCredentialsConfig(config.ClientID, config.ClientSecret, config.TenantID),
Expand All @@ -45,7 +47,7 @@ func NewStore(config KustoConfig, logger hclog.Logger) shared.StoragePlugin {

reader := newKustoSpanReader(&factory, config.Database)

writer := newKustoSpanWriter(&factory, logger, config.Database)
writer := newKustoSpanWriter(&factory, logger, config.Database, appCtx, shutdownWg)
store := &store{
dependencyStoreReader: reader,
reader: reader,
Expand Down
19 changes: 14 additions & 5 deletions store/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/Azure/azure-kusto-go/kusto/ingest"
Expand All @@ -18,19 +19,21 @@ type kustoIngest interface {
}

type kustoSpanWriter struct {
ingest kustoIngest
ch chan []string
logger hclog.Logger
ingest kustoIngest
ch chan []string
logger hclog.Logger
appCtx context.Context
shutdownWg *sync.WaitGroup
}

func newKustoSpanWriter(client *kustoFactory, logger hclog.Logger, database string) *kustoSpanWriter {
func newKustoSpanWriter(client *kustoFactory, logger hclog.Logger, database string, ctx context.Context, wg *sync.WaitGroup) *kustoSpanWriter {

in, err := client.Ingest(database)
if err != nil {
logger.Error(fmt.Sprintf("%#v", err))
}
ch := make(chan []string, 100)
writer := &kustoSpanWriter{in, ch, logger}
writer := &kustoSpanWriter{in, ch, logger, ctx, wg}

go writer.ingestCSV(ch)

Expand All @@ -47,6 +50,8 @@ func (k kustoSpanWriter) WriteSpan(span *model.Span) error {

func (k kustoSpanWriter) ingestCSV(ch <-chan []string) {

k.shutdownWg.Add(1)
defer func() { k.shutdownWg.Done() }()
ticker := time.NewTicker(5 * time.Second)

b := &bytes.Buffer{}
Expand All @@ -55,8 +60,12 @@ func (k kustoSpanWriter) ingestCSV(ch <-chan []string) {

for {
select {
case <-k.appCtx.Done():
ingestBatch(k, b)
return
case buf, ok := <-ch:
if !ok {
ingestBatch(k, b)
return
}
if b.Len() > 1048576 {
Expand Down
22 changes: 21 additions & 1 deletion store/writer_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package store

import (
"context"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -62,9 +64,27 @@ func TestWriteSpan(tester *testing.T) {
}

testConfig := InitConfig(testConfigPath, logger)
kustoStore := NewStore(*testConfig, logger)
ctx, cancel := context.WithCancel(context.Background())
wg := sync.WaitGroup{}
defer gracefulShutdown(&wg, cancel)
kustoStore := NewStore(*testConfig, logger, ctx, &wg)
assert.NoError(tester, kustoStore.SpanWriter().WriteSpan(span))
assert.NoError(tester, kustoStore.SpanWriter().WriteSpan(span2))
assert.NoError(tester, kustoStore.SpanWriter().WriteSpan(span3))

}

func gracefulShutdown(wg *sync.WaitGroup, cancel context.CancelFunc) {
cancel()
done := make(chan bool, 1)
go func() {
wg.Wait()
done <- true
}()
select {
case <-done:
return
case <-time.After(time.Second):
return
}
}