From 69d3565e5b23bacd55dff3cb5dec0607a0baddc0 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Thu, 20 Jun 2019 16:48:01 +0200 Subject: [PATCH 1/6] Fix archive reader Signed-off-by: Pavol Loffay --- plugin/storage/es/spanstore/reader.go | 32 +++++-- .../storage/integration/elasticsearch_test.go | 86 +++++++++++++++---- 2 files changed, 91 insertions(+), 27 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 494c445383f..17858539771 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -95,12 +95,12 @@ type SpanReader struct { // The age of the oldest service/operation we will look for. Because indices in ElasticSearch are by day, // this will be rounded down to UTC 00:00 of that day. maxSpanAge time.Duration - maxNumSpans int serviceOperationStorage *ServiceOperationStorage spanIndexPrefix []string serviceIndexPrefix []string spanConverter dbmodel.ToDomain timeRangeIndices timeRangeIndexFn + sourceFn sourceFn } // SpanReaderParams holds constructor params for NewSpanReader @@ -124,17 +124,19 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { client: p.Client, logger: p.Logger, maxSpanAge: p.MaxSpanAge, - maxNumSpans: p.MaxNumSpans, serviceOperationStorage: NewServiceOperationStorage(ctx, p.Client, p.Logger, 0), // the decorator takes care of metrics spanIndexPrefix: indexNames(p.IndexPrefix, spanIndex), serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), + sourceFn: getSourceFn(p.Archive, p.MaxNumSpans), } } type timeRangeIndexFn func(indexName []string, startTime time.Time, endTime time.Time) []string +type sourceFn func(query elastic.Query, nextTime uint64) *elastic.SearchSource + func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { if archive { var archivePrefix string @@ -159,6 +161,20 @@ func getTimeRangeIndexFn(archive, useReadWriteAliases bool) timeRangeIndexFn { return timeRangeIndices } +func getSourceFn(archive bool, maxNumSpans int) sourceFn { + return func(query elastic.Query, nextTime uint64) *elastic.SearchSource { + s := elastic.NewSearchSource(). + Query(query). + Size(defaultDocCount). + TerminateAfter(maxNumSpans). + Sort("startTime", true) + if !archive { + s.SearchAfter(nextTime) + } + return s + } +} + // timeRangeIndices returns the array of indices that we need to query, based on query params func timeRangeIndices(indexNames []string, startTime time.Time, endTime time.Time) []string { var indices []string @@ -305,6 +321,7 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st indices := s.timeRangeIndices(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour)) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) + fmt.Println(indices) searchAfterTime := make(map[model.TraceID]uint64) totalDocumentsFetched := make(map[model.TraceID]int) tracesMap := make(map[model.TraceID]*model.Trace) @@ -318,16 +335,13 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st if val, ok := searchAfterTime[traceID]; ok { nextTime = val } + + s := s.sourceFn(query, nextTime) + searchRequests[i] = elastic.NewSearchRequest(). IgnoreUnavailable(true). Type(spanType). - Source( - elastic.NewSearchSource(). - Query(query). - Size(defaultDocCount). - TerminateAfter(s.maxNumSpans). - Sort("startTime", true). - SearchAfter(nextTime)) + Source(s) } // set traceIDs to empty traceIDs = nil diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 8b14cf4f4a5..eefa23d447a 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -16,8 +16,9 @@ package integration import ( "context" + "github.com/jaegertracing/jaeger/model" + "github.com/stretchr/testify/assert" "net/http" - "os" "testing" "time" @@ -53,7 +54,7 @@ type ESStorageIntegration struct { logger *zap.Logger } -func (s *ESStorageIntegration) initializeES(allTagsAsFields bool) error { +func (s *ESStorageIntegration) initializeES(allTagsAsFields, archive bool) error { rawClient, err := elastic.NewClient( elastic.SetURL(queryURL), elastic.SetBasicAuth(username, password), @@ -70,22 +71,22 @@ func (s *ESStorageIntegration) initializeES(allTagsAsFields bool) error { dependencyStore := dependencystore.NewDependencyStore(client, s.logger, indexPrefix) s.DependencyReader = dependencyStore s.DependencyWriter = dependencyStore - s.initSpanstore(allTagsAsFields) + s.initSpanstore(allTagsAsFields, archive) s.CleanUp = func() error { - return s.esCleanUp(allTagsAsFields) + return s.esCleanUp(allTagsAsFields, archive) } s.Refresh = s.esRefresh - s.esCleanUp(allTagsAsFields) + s.esCleanUp(allTagsAsFields, archive) return nil } -func (s *ESStorageIntegration) esCleanUp(allTagsAsFields bool) error { +func (s *ESStorageIntegration) esCleanUp(allTagsAsFields, archive bool) error { _, err := s.client.DeleteIndex("*").Do(context.Background()) - s.initSpanstore(allTagsAsFields) + s.initSpanstore(allTagsAsFields, archive) return err } -func (s *ESStorageIntegration) initSpanstore(allTagsAsFields bool) { +func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) { bp, _ := s.client.BulkProcessor().BulkActions(1).FlushInterval(time.Nanosecond).Do(context.Background()) client := eswrapper.WrapESClient(s.client, bp) spanMapping, serviceMapping := es.GetMappings(5, 1) @@ -99,6 +100,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields bool) { TagDotReplacement: tagKeyDeDotChar, SpanMapping: spanMapping, ServiceMapping: serviceMapping, + Archive: archive, }) s.SpanReader = spanstore.NewSpanReader(spanstore.SpanReaderParams{ Client: client, @@ -107,6 +109,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields bool) { IndexPrefix: indexPrefix, MaxSpanAge: 72 * time.Hour, TagDotReplacement: tagKeyDeDotChar, + Archive: archive, }) } @@ -129,22 +132,69 @@ func healthCheck() error { return errors.New("elastic search is not ready") } -func testElasticsearchStorage(t *testing.T, allTagsAsFields bool) { - if os.Getenv("STORAGE") != "elasticsearch" { - t.Skip("Integration test against ElasticSearch skipped; set STORAGE env var to elasticsearch to run this") - } +func testElasticsearchStorage(t *testing.T, allTagsAsFields, archive bool) { + //if os.Getenv("STORAGE") != "elasticsearch" { + // t.Skip("Integration test against ElasticSearch skipped; set STORAGE env var to elasticsearch to run this") + //} if err := healthCheck(); err != nil { t.Fatal(err) } s := &ESStorageIntegration{} - require.NoError(t, s.initializeES(allTagsAsFields)) - s.IntegrationTestAll(t) + //require.NoError(t, s.initializeES(allTagsAsFields, archive)) + //s.IntegrationTestAll(t) + + if archive { + require.NoError(t, s.initializeES(allTagsAsFields, true)) + t.Run("ArchiveTrace", s.testArchiveTrace) + } } -func TestElasticsearchStorage(t *testing.T) { - testElasticsearchStorage(t, false) +//func TestElasticsearchStorage(t *testing.T) { +// testElasticsearchStorage(t, false, false) +//} +// +//func TestElasticsearchStorage_AllTagsAsObjectFields(t *testing.T) { +// testElasticsearchStorage(t, true, false) +//} + +func TestElasticsearchStorage_Archive(t *testing.T) { + testElasticsearchStorage(t, false, true) } -func TestElasticsearchStorageAllTagsAsObjectFields(t *testing.T) { - testElasticsearchStorage(t, true) +func (s *StorageIntegration) testArchiveTrace(t *testing.T) { + //defer s.cleanUp(t) + tId := model.NewTraceID(uint64(22), uint64(44)) + //expected := s.loadParseAndWriteExampleTrace(t) + //expectedTraceID := expected.Spans[0].TraceID + //for i := 0; i < len(expected.Spans); i++ { + // expected.Spans[i].StartTime = time.Now().Add(-time.Hour*24*150) + // require.NoError(t, s.SpanWriter.WriteSpan(expected.Spans[i])) + //} + expected := &model.Span{ + OperationName: "archive_span", + StartTime: time.Now(), + //StartTime: time.Now().Add(- time.Hour*24*5), + + TraceID: tId, + SpanID: model.NewSpanID(1111), + Process: model.NewProcess("archived_service", model.KeyValues{}), + } + + //var actual *model.Trace + //found := s.waitForCondition(t, func(t *testing.T) bool { + // var err error + // actual, err = s.SpanReader.GetTrace(context.Background(), expectedTraceID) + // if err != nil { + // t.Log(err) + // } + // return err == nil && len(actual.Spans) == len(expected.Spans) + //}) + //if !assert.True(t, found) { + // CompareTraces(t, expected, actual) + //} + + s.refresh(t) + actual, err := s.SpanReader.GetTrace(context.Background(), tId) + require.NoError(t, err) + assert.EqualValues(t, expected, actual) } From ee6919069d0a7a0f3e1cad6b49fdf1791d3dac03 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Thu, 20 Jun 2019 17:49:51 +0200 Subject: [PATCH 2/6] Add itest Signed-off-by: Pavol Loffay --- plugin/storage/es/spanstore/reader.go | 4 +- .../storage/integration/elasticsearch_test.go | 75 ++++++++----------- 2 files changed, 34 insertions(+), 45 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 17858539771..d2cee212a63 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -100,7 +100,7 @@ type SpanReader struct { serviceIndexPrefix []string spanConverter dbmodel.ToDomain timeRangeIndices timeRangeIndexFn - sourceFn sourceFn + sourceFn sourceFn } // SpanReaderParams holds constructor params for NewSpanReader @@ -129,7 +129,7 @@ func NewSpanReader(p SpanReaderParams) *SpanReader { serviceIndexPrefix: indexNames(p.IndexPrefix, serviceIndex), spanConverter: dbmodel.NewToDomain(p.TagDotReplacement), timeRangeIndices: getTimeRangeIndexFn(p.Archive, p.UseReadWriteAliases), - sourceFn: getSourceFn(p.Archive, p.MaxNumSpans), + sourceFn: getSourceFn(p.Archive, p.MaxNumSpans), } } diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index eefa23d447a..34d4634f328 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -16,18 +16,19 @@ package integration import ( "context" - "github.com/jaegertracing/jaeger/model" - "github.com/stretchr/testify/assert" "net/http" + "os" "testing" "time" + "github.com/stretchr/testify/assert" "github.com/pkg/errors" "github.com/stretchr/testify/require" "github.com/uber/jaeger-lib/metrics" "go.uber.org/zap" "gopkg.in/olivere/elastic.v5" + "github.com/jaegertracing/jaeger/model" "github.com/jaegertracing/jaeger/pkg/es/wrapper" "github.com/jaegertracing/jaeger/pkg/testutils" "github.com/jaegertracing/jaeger/plugin/storage/es" @@ -133,68 +134,56 @@ func healthCheck() error { } func testElasticsearchStorage(t *testing.T, allTagsAsFields, archive bool) { - //if os.Getenv("STORAGE") != "elasticsearch" { - // t.Skip("Integration test against ElasticSearch skipped; set STORAGE env var to elasticsearch to run this") - //} + if os.Getenv("STORAGE") != "elasticsearch" { + t.Skip("Integration test against ElasticSearch skipped; set STORAGE env var to elasticsearch to run this") + } if err := healthCheck(); err != nil { t.Fatal(err) } s := &ESStorageIntegration{} - //require.NoError(t, s.initializeES(allTagsAsFields, archive)) - //s.IntegrationTestAll(t) + require.NoError(t, s.initializeES(allTagsAsFields, archive)) if archive { - require.NoError(t, s.initializeES(allTagsAsFields, true)) t.Run("ArchiveTrace", s.testArchiveTrace) + } else { + s.IntegrationTestAll(t) } } -//func TestElasticsearchStorage(t *testing.T) { -// testElasticsearchStorage(t, false, false) -//} -// -//func TestElasticsearchStorage_AllTagsAsObjectFields(t *testing.T) { -// testElasticsearchStorage(t, true, false) -//} +func TestElasticsearchStorage(t *testing.T) { + testElasticsearchStorage(t, false, false) +} + +func TestElasticsearchStorage_AllTagsAsObjectFields(t *testing.T) { + testElasticsearchStorage(t, true, false) +} func TestElasticsearchStorage_Archive(t *testing.T) { testElasticsearchStorage(t, false, true) } func (s *StorageIntegration) testArchiveTrace(t *testing.T) { - //defer s.cleanUp(t) - tId := model.NewTraceID(uint64(22), uint64(44)) - //expected := s.loadParseAndWriteExampleTrace(t) - //expectedTraceID := expected.Spans[0].TraceID - //for i := 0; i < len(expected.Spans); i++ { - // expected.Spans[i].StartTime = time.Now().Add(-time.Hour*24*150) - // require.NoError(t, s.SpanWriter.WriteSpan(expected.Spans[i])) - //} + defer s.cleanUp(t) + tId := model.NewTraceID(uint64(11), uint64(22)) expected := &model.Span{ OperationName: "archive_span", - StartTime: time.Now(), - //StartTime: time.Now().Add(- time.Hour*24*5), - + StartTime: time.Now().Add(- time.Hour*24*150), TraceID: tId, - SpanID: model.NewSpanID(1111), + SpanID: model.NewSpanID(55), + References: []model.SpanRef{}, Process: model.NewProcess("archived_service", model.KeyValues{}), } - //var actual *model.Trace - //found := s.waitForCondition(t, func(t *testing.T) bool { - // var err error - // actual, err = s.SpanReader.GetTrace(context.Background(), expectedTraceID) - // if err != nil { - // t.Log(err) - // } - // return err == nil && len(actual.Spans) == len(expected.Spans) - //}) - //if !assert.True(t, found) { - // CompareTraces(t, expected, actual) - //} - + require.NoError(t, s.SpanWriter.WriteSpan(expected)) s.refresh(t) - actual, err := s.SpanReader.GetTrace(context.Background(), tId) - require.NoError(t, err) - assert.EqualValues(t, expected, actual) + + var actual *model.Trace + found := s.waitForCondition(t, func(t *testing.T) bool { + var err error + actual, err = s.SpanReader.GetTrace(context.Background(), tId) + return err == nil && len(actual.Spans) == 1 + }) + if !assert.True(t, found) { + CompareTraces(t, &model.Trace{Spans: []*model.Span{expected}}, actual) + } } From 1e645b45194449996f3cf08e5e977ac80de6eccc Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Thu, 20 Jun 2019 17:55:01 +0200 Subject: [PATCH 3/6] Use constant Signed-off-by: Pavol Loffay --- plugin/storage/integration/elasticsearch_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/plugin/storage/integration/elasticsearch_test.go b/plugin/storage/integration/elasticsearch_test.go index 34d4634f328..619a67ff507 100644 --- a/plugin/storage/integration/elasticsearch_test.go +++ b/plugin/storage/integration/elasticsearch_test.go @@ -45,6 +45,7 @@ const ( password = "changeme" // the elasticsearch default password indexPrefix = "integration-test" tagKeyDeDotChar = "@" + maxSpanAge = time.Hour * 72 ) type ESStorageIntegration struct { @@ -108,7 +109,7 @@ func (s *ESStorageIntegration) initSpanstore(allTagsAsFields, archive bool) { Logger: s.logger, MetricsFactory: metrics.NullFactory, IndexPrefix: indexPrefix, - MaxSpanAge: 72 * time.Hour, + MaxSpanAge: maxSpanAge, TagDotReplacement: tagKeyDeDotChar, Archive: archive, }) @@ -167,7 +168,7 @@ func (s *StorageIntegration) testArchiveTrace(t *testing.T) { tId := model.NewTraceID(uint64(11), uint64(22)) expected := &model.Span{ OperationName: "archive_span", - StartTime: time.Now().Add(- time.Hour*24*150), + StartTime: time.Now().Add(-maxSpanAge*5), TraceID: tId, SpanID: model.NewSpanID(55), References: []model.SpanRef{}, From ee7d4eba76c83e3f5da61599cdce053a8cff8635 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Fri, 21 Jun 2019 11:12:44 +0200 Subject: [PATCH 4/6] Fmt Signed-off-by: Pavol Loffay --- plugin/storage/es/spanstore/reader.go | 1 - 1 file changed, 1 deletion(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index d2cee212a63..b3df959dbe7 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -321,7 +321,6 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []model.TraceID, st indices := s.timeRangeIndices(s.spanIndexPrefix, startTime.Add(-time.Hour), endTime.Add(time.Hour)) nextTime := model.TimeAsEpochMicroseconds(startTime.Add(-time.Hour)) - fmt.Println(indices) searchAfterTime := make(map[model.TraceID]uint64) totalDocumentsFetched := make(map[model.TraceID]int) tracesMap := make(map[model.TraceID]*model.Trace) From e96bce1d7fd935e9b76b7848d85b4ee4d0c0bf89 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Fri, 21 Jun 2019 12:03:15 +0200 Subject: [PATCH 5/6] Move sort to not archive search Signed-off-by: Pavol Loffay --- plugin/storage/es/spanstore/reader.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index b3df959dbe7..0863567fc07 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -166,10 +166,10 @@ func getSourceFn(archive bool, maxNumSpans int) sourceFn { s := elastic.NewSearchSource(). Query(query). Size(defaultDocCount). - TerminateAfter(maxNumSpans). - Sort("startTime", true) + TerminateAfter(maxNumSpans) if !archive { - s.SearchAfter(nextTime) + s.Sort("startTime", true). + SearchAfter(nextTime) } return s } From b7a0eae78c670cf482ffca7cf74569212fde99a0 Mon Sep 17 00:00:00 2001 From: Pavol Loffay Date: Fri, 21 Jun 2019 12:12:16 +0200 Subject: [PATCH 6/6] fmt Signed-off-by: Pavol Loffay --- plugin/storage/es/spanstore/reader.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin/storage/es/spanstore/reader.go b/plugin/storage/es/spanstore/reader.go index 0863567fc07..8eabf3dfc49 100644 --- a/plugin/storage/es/spanstore/reader.go +++ b/plugin/storage/es/spanstore/reader.go @@ -169,7 +169,7 @@ func getSourceFn(archive bool, maxNumSpans int) sourceFn { TerminateAfter(maxNumSpans) if !archive { s.Sort("startTime", true). - SearchAfter(nextTime) + SearchAfter(nextTime) } return s }