Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gRPC post spans #1042

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 14 additions & 7 deletions cmd/collector/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package app

import (
"context"
"fmt"
"time"

"go.uber.org/zap"
Expand All @@ -31,7 +30,7 @@ type GRPCHandler struct {
spanProcessor SpanProcessor
}

// NewGRPCHandler registers routes for this handler on the given router
// NewGRPCHandler registers routes for this handler on the given router.
func NewGRPCHandler(logger *zap.Logger, spanProcessor SpanProcessor) *GRPCHandler {
return &GRPCHandler{
logger: logger,
Expand All @@ -41,16 +40,24 @@ func NewGRPCHandler(logger *zap.Logger, spanProcessor SpanProcessor) *GRPCHandle

// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
// TODO
fmt.Printf("PostSpans(%+v)\n", *r)
for _, s := range r.Batch.Spans {
println(s.OperationName)
oks, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, JaegerFormatType)
Copy link
Member

@yurishkuro yurishkuro Oct 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is losing Batch.Process data.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the spans have pointers to the process. I'm not sure when that back-reference is set. If it is already set, there should not be a problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by convention when a bunch of spans arrive in a Batch and logically they are all from the same Process then Span.Process will be nil, but Batch.Process will be defined. This is done to minimize the amount of data sent over the wire. So you need to loop through spans and copy the Process pointer from Batch if Span.Process == nil

if err != nil {
g.logger.Error("cannot process spans", zap.Error(err))
return nil, err
}
success := true
for _, ok := range oks {
if !ok {
success = false
break
}
}
return &api_v2.PostSpansResponse{Ok: true}, nil
return &api_v2.PostSpansResponse{Ok: success}, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to rethink this API. What are we trying to communicate back to the caller with this Boolean flag? At least (accepted, rejected) counts would make more sense. When can the processor return not ok? Should it be returning specific errors instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that the API as it stands isn't exactly perfect. If anything, a boolean vector (or even bit vector) might make more sense here. Count works too if we don't care which spans failed. An optional error message might be better than the boolean response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts @yurishkuro?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The question is: what is the sender going to do with the errors?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I agree with you here. The only reason to provide an error is if we believe in some far-fetched scenario it would be retried. Also, an empty response sort of seems weird, but IDK what else to do here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we want to support retries we should be using gRPC error code for that. Can you check what openCensus is doing in this case?

}

// GetTrace gets trace
func (g *GRPCHandler) GetTrace(ctx context.Context, req *api_v2.GetTraceRequest) (*api_v2.GetTraceResponse, error) {
// TODO
return &api_v2.GetTraceResponse{
Trace: &model.Trace{
Spans: []*model.Span{
Expand Down
112 changes: 112 additions & 0 deletions cmd/collector/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) 2018 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"context"
"errors"
"net"
"sync"
"testing"

"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"

"github.com/jaegertracing/jaeger/model"
"github.com/jaegertracing/jaeger/proto-gen/api_v2"
)

type mockSpanProcessor struct {
expectedError error
mux sync.Mutex
spans []*model.Span
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, spanFormat string) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = append(p.spans, spans...)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this all necessary? It looks like your tests call ProcessSpans just once, the extra complexity (mutex, keeping track of spans, etc) doesn't look required.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's needed since read and write are in different goroutines

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Luckily for me, the only code I had to write here was essentially copied from the equivalent Thrift test. It does locking the same way IIRC.

oks := make([]bool, len(spans))
return oks, p.expectedError
}

func (p *mockSpanProcessor) getSpans() []*model.Span {
p.mux.Lock()
defer p.mux.Unlock()
return p.spans
}

func initializeGRPCTestServer(t *testing.T, expectedError error) (*grpc.Server, *mockSpanProcessor, net.Addr) {
server := grpc.NewServer()
processor := &mockSpanProcessor{expectedError: expectedError}
logger, err := zap.NewDevelopment()
require.NoError(t, err)
handler := NewGRPCHandler(logger, processor)
api_v2.RegisterCollectorServiceServer(server, handler)
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
go func() {
err := server.Serve(lis)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't this introduce a potential race condition where the client is initialized before the server is ready?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I believe that this is somewhat unavoidable. The issue is that Serve will block indefinitely. Using a sync.WaitGroup would help, if I could call the Done method before the the server blocks, but there is no good place to do that.

require.NoError(t, err)
}()
return server, processor, lis.Addr()
}

func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grpc.ClientConn) {
conn, err := grpc.Dial(addr.String(), grpc.WithInsecure())
require.NoError(t, err)
return api_v2.NewCollectorServiceClient(conn), conn
}

func TestPostSpans(t *testing.T) {
server, processor, addr := initializeGRPCTestServer(t, nil)
defer server.Stop()
client, conn := newClient(t, addr)
defer conn.Close()
r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{
Batch: model.Batch{
Spans: []*model.Span{
{
OperationName: "test-operation",
},
},
},
})
require.NoError(t, err)
require.False(t, r.GetOk())
require.Len(t, processor.getSpans(), 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so when you do this, you're essentially testing a function on the mock, kinda weird.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It tests sending spans over the wire, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya again, Thrift handler does this.

Copy link
Member

@yurishkuro yurishkuro Oct 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean if spans go over the wire then we're not just testing mock, we're testing the transmission and marshalling.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes exactly.

}

func TestPostSpansWithError(t *testing.T) {
expectedError := errors.New("test-error")
server, processor, addr := initializeGRPCTestServer(t, expectedError)
defer server.Stop()
client, conn := newClient(t, addr)
defer conn.Close()
r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{
Batch: model.Batch{
Spans: []*model.Span{
{
OperationName: "fake-operation",
},
},
},
})
require.Error(t, err)
require.Nil(t, r)
require.Contains(t, err.Error(), expectedError.Error())
require.Len(t, processor.getSpans(), 1)
}
2 changes: 1 addition & 1 deletion jaeger-ui
Submodule jaeger-ui updated 130 files