Skip to content

Commit

Permalink
Copy process if missing
Browse files Browse the repository at this point in the history
Signed-off-by: Pavol Loffay <ploffay@redhat.com>
  • Loading branch information
pavolloffay committed Nov 8, 2018
1 parent d0ac010 commit 3e10880
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 22 deletions.
6 changes: 5 additions & 1 deletion cmd/collector/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,11 @@ func NewGRPCHandler(logger *zap.Logger, spanProcessor SpanProcessor) *GRPCHandle

// PostSpans implements gRPC CollectorService.
func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) (*api_v2.PostSpansResponse, error) {
// TODO what if span does not have process
for _, span := range r.GetBatch().Spans {
if span.GetProcess() == nil {
span.Process = &r.Batch.Process
}
}
oks, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, JaegerFormatType)
if err != nil {
g.logger.Error("cannot process spans", zap.Error(err))
Expand Down
62 changes: 41 additions & 21 deletions cmd/collector/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"sync"
"testing"

"github.com/crossdock/crossdock-go/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"google.golang.org/grpc"
Expand Down Expand Up @@ -49,20 +50,22 @@ func (p *mockSpanProcessor) getSpans() []*model.Span {
return p.spans
}

func initializeGRPCTestServer(t *testing.T, expectedError error) (*grpc.Server, *mockSpanProcessor, net.Addr) {
func (p *mockSpanProcessor) reset() {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = nil
}

func initializeGRPCTestServer(t *testing.T, beforeServe func(s *grpc.Server)) (*grpc.Server, net.Addr) {
server := grpc.NewServer()
processor := &mockSpanProcessor{expectedError: expectedError}
logger, err := zap.NewDevelopment()
require.NoError(t, err)
handler := NewGRPCHandler(logger, processor)
api_v2.RegisterCollectorServiceServer(server, handler)
beforeServe(server)
lis, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
go func() {
err := server.Serve(lis)
require.NoError(t, err)
}()
return server, processor, lis.Addr()
return server, lis.Addr()
}

func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grpc.ClientConn) {
Expand All @@ -72,27 +75,44 @@ func newClient(t *testing.T, addr net.Addr) (api_v2.CollectorServiceClient, *grp
}

func TestPostSpans(t *testing.T) {
server, processor, addr := initializeGRPCTestServer(t, nil)
processor := &mockSpanProcessor{}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor)
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
client, conn := newClient(t, addr)
defer conn.Close()
r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{
Batch: model.Batch{
Spans: []*model.Span{
{
OperationName: "test-operation",
},
},
},
})
require.NoError(t, err)
require.False(t, r.GetOk())
require.Len(t, processor.getSpans(), 1)

tests := []struct {
batch model.Batch
expected []*model.Span
}{
{batch: model.Batch{Process: model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},
expected: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "bar"}}}},
{batch: model.Batch{Process: model.Process{ServiceName: "batch-process"}, Spans: []*model.Span{{OperationName: "test-op"}}},
expected: []*model.Span{{OperationName: "test-op", Process: &model.Process{ServiceName: "batch-process"}}}},
}
for _, test := range tests {
r, err := client.PostSpans(context.Background(), &api_v2.PostSpansRequest{
Batch: test.batch,
})
require.NoError(t, err)
require.False(t, r.GetOk())
got := processor.getSpans()
require.Equal(t, len(test.batch.GetSpans()), len(got))
assert.Equal(t, test.expected, got)
processor.reset()
}
}

func TestPostSpansWithError(t *testing.T) {
expectedError := errors.New("test-error")
server, processor, addr := initializeGRPCTestServer(t, expectedError)
processor := &mockSpanProcessor{expectedError: expectedError}
server, addr := initializeGRPCTestServer(t, func(s *grpc.Server) {
handler := NewGRPCHandler(zap.NewNop(), processor)
api_v2.RegisterCollectorServiceServer(s, handler)
})
defer server.Stop()
client, conn := newClient(t, addr)
defer conn.Close()
Expand Down

0 comments on commit 3e10880

Please sign in to comment.