-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
The implementation of FindTraceIDs function for ElasticSearch reader. #1280
Changes from 8 commits
23bbaf5
c55fde1
4350e3a
0ace6df
d4a2644
e90848d
ae75936
8a1ec2e
0c399da
424bc31
5aae75f
7a70584
2600634
b108809
debd4d6
66f1851
75c30d3
0e7139a
49a132a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -221,22 +221,34 @@ func (s *SpanReader) FindTraces(ctx context.Context, traceQuery *spanstore.Trace | |
span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraces") | ||
defer span.Finish() | ||
|
||
if err := validateQuery(traceQuery); err != nil { | ||
return nil, err | ||
} | ||
if traceQuery.NumTraces == 0 { | ||
traceQuery.NumTraces = defaultNumTraces | ||
} | ||
uniqueTraceIDs, err := s.findTraceIDs(ctx, traceQuery) | ||
uniqueTraceIDs, err := s.validateQueryAndFindTraceIDs(ctx, traceQuery) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return s.multiRead(ctx, uniqueTraceIDs, traceQuery.StartTimeMin, traceQuery.StartTimeMax) | ||
} | ||
|
||
// FindTraceIDs is not implemented. | ||
// FindTraceIDs retrieves traces IDs that match the traceQuery | ||
func (s *SpanReader) FindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]model.TraceID, error) { | ||
return nil, errors.New("not implemented") // TODO: Implement | ||
span, ctx := opentracing.StartSpanFromContext(ctx, "FindTraceIDs") | ||
defer span.Finish() | ||
|
||
esTraceIDs, err := s.validateQueryAndFindTraceIDs(ctx, traceQuery) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
traceIDs := make([]model.TraceID, len(esTraceIDs)) | ||
for i, ID := range esTraceIDs { | ||
traceID, err := model.TraceIDFromString(ID) | ||
if err != nil { | ||
return nil, errors.Wrap(err, fmt.Sprintf("Making traceID from string '%s' failed", ID)) | ||
} | ||
|
||
traceIDs[i] = traceID | ||
} | ||
|
||
return traceIDs, nil | ||
} | ||
|
||
func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime, endTime time.Time) ([]*model.Trace, error) { | ||
|
@@ -317,6 +329,20 @@ func (s *SpanReader) multiRead(ctx context.Context, traceIDs []string, startTime | |
return traces, nil | ||
} | ||
|
||
func (s *SpanReader) validateQueryAndFindTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]string, error) { | ||
childSpan, _ := opentracing.StartSpanFromContext(ctx, "validateQueryAndFindTraceIDs") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we need a span for this operation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, thanks, I will delete it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. please delete it |
||
defer childSpan.Finish() | ||
|
||
if err := validateQuery(traceQuery); err != nil { | ||
return nil, err | ||
} | ||
if traceQuery.NumTraces == 0 { | ||
traceQuery.NumTraces = defaultNumTraces | ||
} | ||
|
||
return s.findTraceIDs(ctx, traceQuery) | ||
} | ||
|
||
func validateQuery(p *spanstore.TraceQueryParameters) error { | ||
if p == nil { | ||
return ErrMalformedRequestObject | ||
|
@@ -339,6 +365,7 @@ func validateQuery(p *spanstore.TraceQueryParameters) error { | |
func (s *SpanReader) findTraceIDs(ctx context.Context, traceQuery *spanstore.TraceQueryParameters) ([]string, error) { | ||
childSpan, _ := opentracing.StartSpanFromContext(ctx, "findTraceIDs") | ||
defer childSpan.Finish() | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: extraneous change There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, thanks! |
||
// Below is the JSON body to our HTTP GET request to ElasticSearch. This function creates this. | ||
// { | ||
// "size": 0, | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -668,11 +668,142 @@ func TestFindTraceIDs(t *testing.T) { | |
testGet(traceIDAggregation, t) | ||
} | ||
|
||
func TestFindTraceIDNotImplemented(t *testing.T) { | ||
func TestSpanReader_TestFindTraceIDs(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These tests look very similar to the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, of course, I will make change soon. |
||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "1","doc_count": 16},{"key": "2","doc_count": 16},{"key": "3","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
// find trace IDs | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
NumTraces: 3, | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.NoError(t, err) | ||
assert.Len(t, traceIDs, 3) | ||
assert.EqualValues(t, 1, traceIDs[0].Low) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsInvalidQuery(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "1","doc_count": 16},{"key": "2","doc_count": 16},{"key": "3","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), nil) | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: "", | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.Error(t, err) | ||
assert.Nil(t, traceIDs) | ||
assert.EqualError(t, err, "not implemented") | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsAggregationFailure(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.Error(t, err) | ||
assert.Nil(t, traceIDs) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsNoTraceIDs(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": []}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.NoError(t, err) | ||
assert.Len(t, traceIDs, 0) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsReadTraceIDsFailure(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "1","doc_count": 16},{"key": "2","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(nil, errors.New("read error")) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.EqualError(t, err, "Search service failed: read error") | ||
assert.Len(t, traceIDs, 0) | ||
}) | ||
} | ||
|
||
func TestSpanReader_FindTraceIDsIncorrectTraceIDFailure(t *testing.T) { | ||
goodAggregations := make(map[string]*json.RawMessage) | ||
rawMessage := []byte(`{"buckets": [{"key": "sdfsdfdssd234nsdvsdfldjsf","doc_count": 16},{"key": "2","doc_count": 16}]}`) | ||
goodAggregations[traceIDAggregation] = (*json.RawMessage)(&rawMessage) | ||
|
||
withSpanReader(func(r *spanReaderTest) { | ||
mockSearchService(r).Return(&elastic.SearchResult{Aggregations: elastic.Aggregations(goodAggregations)}, nil) | ||
|
||
traceIDsQuery := &spanstore.TraceQueryParameters{ | ||
ServiceName: serviceName, | ||
Tags: map[string]string{ | ||
"hello": "world", | ||
}, | ||
StartTimeMin: time.Now().Add(-1 * time.Hour), | ||
StartTimeMax: time.Now(), | ||
} | ||
|
||
traceIDs, err := r.reader.FindTraceIDs(context.Background(), traceIDsQuery) | ||
require.EqualError(t, err, `Making traceID from string 'sdfsdfdssd234nsdvsdfldjsf' failed: strconv.ParseUint: parsing "sdfsdfdss": invalid syntax`) | ||
assert.Len(t, traceIDs, 0) | ||
}) | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!