-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
gRPC post spans #1042
gRPC post spans #1042
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,7 +16,6 @@ package app | |
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"go.uber.org/zap" | ||
|
@@ -31,7 +30,7 @@ type GRPCHandler struct { | |
spanProcessor SpanProcessor | ||
} | ||
|
||
// NewGRPCHandler registers routes for this handler on the given router | ||
// NewGRPCHandler registers routes for this handler on the given router. | ||
func NewGRPCHandler(logger *zap.Logger, spanProcessor SpanProcessor) *GRPCHandler { | ||
return &GRPCHandler{ | ||
logger: logger, | ||
|
@@ -41,16 +40,24 @@ func NewGRPCHandler(logger *zap.Logger, spanProcessor SpanProcessor) *GRPCHandle | |
|
||
// PostSpans implements gRPC CollectorService. | ||
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) { | ||
// TODO | ||
fmt.Printf("PostSpans(%+v)\n", *r) | ||
for _, s := range r.Batch.Spans { | ||
println(s.OperationName) | ||
oks, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, JaegerFormatType) | ||
if err != nil { | ||
g.logger.Error("cannot process spans", zap.Error(err)) | ||
return nil, err | ||
} | ||
success := true | ||
for _, ok := range oks { | ||
if !ok { | ||
success = false | ||
break | ||
} | ||
} | ||
return &api_v2.PostSpansResponse{Ok: true}, nil | ||
return &api_v2.PostSpansResponse{Ok: success}, nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may want to rethink this API. What are we trying to communicate back to the caller with this Boolean flag? At least (accepted, rejected) counts would make more sense. When can the processor return not ok? Should it be returning specific errors instead? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that the API as it stands isn't exactly perfect. If anything, a boolean vector (or even bit vector) might make more sense here. Count works too if we don't care which spans failed. An optional error message might be better than the boolean response. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any thoughts @yurishkuro? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The question is: what is the sender going to do with the errors? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Overall, I agree with you here. The only reason to provide an error is if we believe in some far-fetched scenario it would be retried. Also, an empty response sort of seems weird, but IDK what else to do here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we want to support retries we should be using gRPC error code for that. Can you check what openCensus is doing in this case? |
||
} | ||
|
||
// GetTrace gets trace | ||
func (g *GRPCHandler) GetTrace(ctx context.Context, req *api_v2.GetTraceRequest) (*api_v2.GetTraceResponse, error) { | ||
// TODO | ||
return &api_v2.GetTraceResponse{ | ||
Trace: &model.Trace{ | ||
Spans: []*model.Span{ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,112 @@ | ||
// Copyright (c) 2018 The Jaeger Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package app | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"net" | ||
"sync" | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
"go.uber.org/zap" | ||
"google.golang.org/grpc" | ||
|
||
"github.com/jaegertracing/jaeger/model" | ||
"github.com/jaegertracing/jaeger/proto-gen/api_v2" | ||
) | ||
|
||
type mockSpanProcessor struct { | ||
expectedError error | ||
mux sync.Mutex | ||
spans []*model.Span | ||
} | ||
|
||
func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, spanFormat string) ([]bool, error) { | ||
p.mux.Lock() | ||
defer p.mux.Unlock() | ||
p.spans = append(p.spans, spans...) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. is this all necessary? It looks like your tests call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's needed since read and write are in different goroutines There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes. Luckily for me, the only code I had to write here was essentially copied from the equivalent Thrift test. It does locking the same way IIRC. |
||
oks := make([]bool, len(spans)) | ||
return oks, p.expectedError | ||
} | ||
|
||
func (p *mockSpanProcessor) getSpans() []*model.Span { | ||
p.mux.Lock() | ||
defer p.mux.Unlock() | ||
return p.spans | ||
} | ||
|
||
func initializeGRPCTestServer(t *testing.T, expectedError error) (*grpc.Server, *mockSpanProcessor, net.Addr) { | ||
server := grpc.NewServer() | ||
processor := &mockSpanProcessor{expectedError: expectedError} | ||
logger, err := zap.NewDevelopment() | ||
require.NoError(t, err) | ||
handler := NewGRPCHandler(logger, processor) | ||
api_v2.RegisterCollectorServiceServer(server, handler) | ||
lis, err := net.Listen("tcp", "localhost:0") | ||
require.NoError(t, err) | ||
go func() { | ||
err := server.Serve(lis) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. doesn't this introduce a potential race condition where the client is initialized before the server is ready? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, I believe that this is somewhat unavoidable. The issue is that |
||
require.NoError(t, err) | ||
}() | ||
return server, processor, lis.Addr() | ||
} | ||
|
||
func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grpc.ClientConn) { | ||
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure()) | ||
require.NoError(t, err) | ||
return api_v2.NewCollectorServiceClient(conn), conn | ||
} | ||
|
||
func TestPostSpans(t *testing.T) { | ||
server, processor, addr := initializeGRPCTestServer(t, nil) | ||
defer server.Stop() | ||
client, conn := newClient(t, addr) | ||
defer conn.Close() | ||
r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{ | ||
Batch: model.Batch{ | ||
Spans: []*model.Span{ | ||
{ | ||
OperationName: "test-operation", | ||
}, | ||
}, | ||
}, | ||
}) | ||
require.NoError(t, err) | ||
require.False(t, r.GetOk()) | ||
require.Len(t, processor.getSpans(), 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so when you do this, you're essentially testing a function on the mock, kinda weird. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It tests sending spans over the wire, no? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ya again, Thrift handler does this. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I mean if spans go over the wire then we're not just testing mock, we're testing the transmission and marshalling. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes exactly. |
||
} | ||
|
||
func TestPostSpansWithError(t *testing.T) { | ||
expectedError := errors.New("test-error") | ||
server, processor, addr := initializeGRPCTestServer(t, expectedError) | ||
defer server.Stop() | ||
client, conn := newClient(t, addr) | ||
defer conn.Close() | ||
r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{ | ||
Batch: model.Batch{ | ||
Spans: []*model.Span{ | ||
{ | ||
OperationName: "fake-operation", | ||
}, | ||
}, | ||
}, | ||
}) | ||
require.Error(t, err) | ||
require.Nil(t, r) | ||
require.Contains(t, err.Error(), expectedError.Error()) | ||
require.Len(t, processor.getSpans(), 1) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is losing Batch.Process data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the spans have pointers to the process. I'm not sure when that back-reference is set. If it is already set, there should not be a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by convention when a bunch of spans arrive in a Batch and logically they are all from the same Process then Span.Process will be nil, but Batch.Process will be defined. This is done to minimize the amount of data sent over the wire. So you need to loop through spans and copy the Process pointer from Batch if Span.Process == nil