From 3111b62f82e24a8d0799d6dec743f8130a1bd1cd Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 3 Apr 2019 16:33:41 -0400 Subject: [PATCH 1/5] Refactor handlers Signed-off-by: Yuri Shkuro --- cmd/collector/app/grpc_handler.go | 5 +- cmd/collector/app/http_handler.go | 7 +-- cmd/collector/app/span_processor.go | 16 +++++- cmd/collector/app/tchannel_handler.go | 52 +++++++++++++++++++ ...span_handler.go => thrift_span_handler.go} | 29 ++++++----- ...er_test.go => thrift_span_handler_test.go} | 0 cmd/collector/app/zipkin/http_handler.go | 6 +-- cmd/collector/main.go | 5 +- 8 files changed, 93 insertions(+), 27 deletions(-) create mode 100644 cmd/collector/app/tchannel_handler.go rename cmd/collector/app/{span_handler.go => thrift_span_handler.go} (82%) rename cmd/collector/app/{span_handler_test.go => thrift_span_handler_test.go} (100%) diff --git a/cmd/collector/app/grpc_handler.go b/cmd/collector/app/grpc_handler.go index 6a6adead87b..b3496bc2a18 100644 --- a/cmd/collector/app/grpc_handler.go +++ b/cmd/collector/app/grpc_handler.go @@ -43,7 +43,10 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest) span.Process = r.Batch.Process } } - _, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, JaegerFormatType) + _, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{ + InboundTransport: "grpc", // TODO do we have a constant? + SpanFormat: JaegerFormatType, + }) if err != nil { g.logger.Error("cannot process spans", zap.Error(err)) return nil, err diff --git a/cmd/collector/app/http_handler.go b/cmd/collector/app/http_handler.go index 7a381b29e69..a12527d7b82 100644 --- a/cmd/collector/app/http_handler.go +++ b/cmd/collector/app/http_handler.go @@ -19,11 +19,9 @@ import ( "io/ioutil" "mime" "net/http" - "time" "github.com/apache/thrift/lib/go/thrift" "github.com/gorilla/mux" - tchanThrift "github.com/uber/tchannel-go/thrift" tJaeger "github.com/jaegertracing/jaeger/thrift-gen/jaeger" ) @@ -86,10 +84,9 @@ func (aH *APIHandler) saveSpan(w http.ResponseWriter, r *http.Request) { http.Error(w, fmt.Sprintf(UnableToReadBodyErrFormat, err), http.StatusBadRequest) return } - ctx, cancel := tchanThrift.NewContext(time.Minute) - defer cancel() batches := []*tJaeger.Batch{batch} - if _, err = aH.jaegerBatchesHandler.SubmitBatches(ctx, batches); err != nil { + opts := SubmitBatchOptions{InboundTransport: "http"} // TODO do we have a constant? + if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil { http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError) return } diff --git a/cmd/collector/app/span_processor.go b/cmd/collector/app/span_processor.go index 66467201a0f..c0bfa7b404c 100644 --- a/cmd/collector/app/span_processor.go +++ b/cmd/collector/app/span_processor.go @@ -26,6 +26,18 @@ import ( "github.com/jaegertracing/jaeger/storage/spanstore" ) +// ProcessSpansOptions additional options passed to processor along with the spans. +type ProcessSpansOptions struct { + SpanFormat string + InboundTransport string +} + +// SpanProcessor handles model spans +type SpanProcessor interface { + // ProcessSpans processes model spans and return with either a list of true/false success or an error + ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error) +} + type spanProcessor struct { queue *queue.BoundedQueue metrics *SpanProcessorMetrics @@ -109,12 +121,12 @@ func (sp *spanProcessor) saveSpan(span *model.Span) { sp.metrics.SaveLatency.Record(time.Since(startTime)) } -func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, spanFormat string) ([]bool, error) { +func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error) { sp.preProcessSpans(mSpans) sp.metrics.BatchSize.Update(int64(len(mSpans))) retMe := make([]bool, len(mSpans)) for i, mSpan := range mSpans { - ok := sp.enqueueSpan(mSpan, spanFormat) + ok := sp.enqueueSpan(mSpan, options.SpanFormat) if !ok && sp.reportBusy { return nil, tchannel.ErrServerBusy } diff --git a/cmd/collector/app/tchannel_handler.go b/cmd/collector/app/tchannel_handler.go new file mode 100644 index 00000000000..41329184e3c --- /dev/null +++ b/cmd/collector/app/tchannel_handler.go @@ -0,0 +1,52 @@ +// Copyright (c) 2019 The Jaeger Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "github.com/uber/tchannel-go/thrift" + + "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" +) + +type tchannelHandler struct { + jaegerHandler JaegerBatchesHandler + zipkinHandler ZipkinSpansHandler +} + +// NewTChannelHandler creates new handler that implements both Jaeger and Zipkin endpoints. +func NewTChannelHandler( + jaegerHandler JaegerBatchesHandler, + zipkinHandler ZipkinSpansHandler, +) *tchannelHandler { + return &tchannelHandler{ + jaegerHandler: jaegerHandler, + zipkinHandler: zipkinHandler, + } +} + +func (h *tchannelHandler) SubmitZipkinBatch( + _ thrift.Context, + spans []*zipkincore.Span, +) ([]*zipkincore.Response, error) { + return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: "tchannel"}) +} + +func (h *tchannelHandler) SubmitBatches( + _ thrift.Context, + batches []*jaeger.Batch, +) ([]*jaeger.BatchSubmitResponse, error) { + return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: "tchannel"}) +} diff --git a/cmd/collector/app/span_handler.go b/cmd/collector/app/thrift_span_handler.go similarity index 82% rename from cmd/collector/app/span_handler.go rename to cmd/collector/app/thrift_span_handler.go index 5777459ba66..a4b486f6077 100644 --- a/cmd/collector/app/span_handler.go +++ b/cmd/collector/app/thrift_span_handler.go @@ -15,7 +15,6 @@ package app import ( - "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" zipkinS "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" @@ -35,22 +34,20 @@ const ( UnknownFormatType = "unknown" ) +type SubmitBatchOptions struct { + InboundTransport string +} + // ZipkinSpansHandler consumes and handles zipkin spans type ZipkinSpansHandler interface { // SubmitZipkinBatch records a batch of spans in Zipkin Thrift format - SubmitZipkinBatch(ctx thrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error) + SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) } // JaegerBatchesHandler consumes and handles Jaeger batches type JaegerBatchesHandler interface { // SubmitBatches records a batch of spans in Jaeger Thrift format - SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) -} - -// SpanProcessor handles model spans -type SpanProcessor interface { - // ProcessSpans processes model spans and return with either a list of true/false success or an error - ProcessSpans(mSpans []*model.Span, spanFormat string) ([]bool, error) + SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) } type jaegerBatchesHandler struct { @@ -66,7 +63,7 @@ func NewJaegerSpanHandler(logger *zap.Logger, modelProcessor SpanProcessor) Jaeg } } -func (jbh *jaegerBatchesHandler) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) { +func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) { responses := make([]*jaeger.BatchSubmitResponse, 0, len(batches)) for _, batch := range batches { mSpans := make([]*model.Span, 0, len(batch.Spans)) @@ -74,7 +71,10 @@ func (jbh *jaegerBatchesHandler) SubmitBatches(ctx thrift.Context, batches []*ja mSpan := jConv.ToDomainSpan(span, batch.Process) mSpans = append(mSpans, mSpan) } - oks, err := jbh.modelProcessor.ProcessSpans(mSpans, JaegerFormatType) + oks, err := jbh.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{ + InboundTransport: options.InboundTransport, + SpanFormat: JaegerFormatType, + }) if err != nil { jbh.logger.Error("Collector failed to process span batch", zap.Error(err)) return nil, err @@ -112,13 +112,16 @@ func NewZipkinSpanHandler(logger *zap.Logger, modelHandler SpanProcessor, saniti } // SubmitZipkinBatch records a batch of spans already in Zipkin Thrift format. -func (h *zipkinSpanHandler) SubmitZipkinBatch(ctx thrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error) { +func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) { mSpans := make([]*model.Span, 0, len(spans)) for _, span := range spans { sanitized := h.sanitizer.Sanitize(span) mSpans = append(mSpans, convertZipkinToModel(sanitized, h.logger)...) } - bools, err := h.modelProcessor.ProcessSpans(mSpans, ZipkinFormatType) + bools, err := h.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{ + InboundTransport: options.InboundTransport, + SpanFormat: ZipkinFormatType, + }) if err != nil { h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err)) return nil, err diff --git a/cmd/collector/app/span_handler_test.go b/cmd/collector/app/thrift_span_handler_test.go similarity index 100% rename from cmd/collector/app/span_handler_test.go rename to cmd/collector/app/thrift_span_handler_test.go diff --git a/cmd/collector/app/zipkin/http_handler.go b/cmd/collector/app/zipkin/http_handler.go index 807b463da04..8085c7ee06c 100644 --- a/cmd/collector/app/zipkin/http_handler.go +++ b/cmd/collector/app/zipkin/http_handler.go @@ -22,13 +22,11 @@ import ( "mime" "net/http" "strings" - "time" "github.com/go-openapi/loads" "github.com/go-openapi/strfmt" "github.com/go-openapi/swag" "github.com/gorilla/mux" - tchanThrift "github.com/uber/tchannel-go/thrift" "github.com/jaegertracing/jaeger/cmd/collector/app" "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" @@ -175,8 +173,8 @@ func gunzip(r io.ReadCloser) (*gzip.Reader, error) { func (aH *APIHandler) saveThriftSpans(tSpans []*zipkincore.Span) error { if len(tSpans) > 0 { - ctx, _ := tchanThrift.NewContext(time.Minute) - if _, err := aH.zipkinSpansHandler.SubmitZipkinBatch(ctx, tSpans); err != nil { + opts := app.SubmitBatchOptions{InboundTransport: "http"} // TODO do we have a constant? + if _, err := aH.zipkinSpansHandler.SubmitZipkinBatch(tSpans, opts); err != nil { return err } } diff --git a/cmd/collector/main.go b/cmd/collector/main.go index ed13673aec4..e1d20784e42 100644 --- a/cmd/collector/main.go +++ b/cmd/collector/main.go @@ -111,8 +111,9 @@ func main() { logger.Fatal("Unable to create new TChannel", zap.Error(err)) } server := thrift.NewServer(ch) - server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler)) - server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler)) + batchHandler := app.NewTChannelHandler(jaegerBatchesHandler, zipkinSpansHandler) + server.Register(jc.NewTChanCollectorServer(batchHandler)) + server.Register(zc.NewTChanZipkinCollectorServer(batchHandler)) server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore))) portStr := ":" + strconv.Itoa(builderOpts.CollectorPort) listener, err := net.Listen("tcp", portStr) From b42046e9ad83e9af1ab5f7ac314cda38b110be73 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 3 Apr 2019 16:58:33 -0400 Subject: [PATCH 2/5] Fix tests Signed-off-by: Yuri Shkuro --- cmd/collector/app/grpc_handler_test.go | 2 +- .../app/grpcserver/grpc_server_test.go | 2 +- cmd/collector/app/http_handler_test.go | 3 +- cmd/collector/app/span_processor_test.go | 16 +++--- cmd/collector/app/tchannel_handler_test.go | 53 +++++++++++++++++++ cmd/collector/app/thrift_span_handler_test.go | 16 ++---- cmd/collector/app/zipkin/http_handler_test.go | 4 +- 7 files changed, 69 insertions(+), 27 deletions(-) create mode 100644 cmd/collector/app/tchannel_handler_test.go diff --git a/cmd/collector/app/grpc_handler_test.go b/cmd/collector/app/grpc_handler_test.go index 50680a94595..329a83a05e4 100644 --- a/cmd/collector/app/grpc_handler_test.go +++ b/cmd/collector/app/grpc_handler_test.go @@ -36,7 +36,7 @@ type mockSpanProcessor struct { spans []*model.Span } -func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, spanFormat string) ([]bool, error) { +func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts ProcessSpansOptions) ([]bool, error) { p.mux.Lock() defer p.mux.Unlock() p.spans = append(p.spans, spans...) diff --git a/cmd/collector/app/grpcserver/grpc_server_test.go b/cmd/collector/app/grpcserver/grpc_server_test.go index f76f80c4bf8..d5cb790e4c1 100644 --- a/cmd/collector/app/grpcserver/grpc_server_test.go +++ b/cmd/collector/app/grpcserver/grpc_server_test.go @@ -86,6 +86,6 @@ func (s mockSamplingStore) GetSamplingStrategy(serviceName string) (*sampling.Sa type mockSpanProcessor struct { } -func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, spanFormat string) ([]bool, error) { +func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, _ app.ProcessSpansOptions) ([]bool, error) { return []bool{}, nil } diff --git a/cmd/collector/app/http_handler_test.go b/cmd/collector/app/http_handler_test.go index f0c8a9c4d01..58c0cba3e6d 100644 --- a/cmd/collector/app/http_handler_test.go +++ b/cmd/collector/app/http_handler_test.go @@ -29,7 +29,6 @@ import ( "github.com/stretchr/testify/assert" jaegerClient "github.com/uber/jaeger-client-go" "github.com/uber/jaeger-client-go/transport" - tchanThrift "github.com/uber/tchannel-go/thrift" "github.com/jaegertracing/jaeger/thrift-gen/jaeger" ) @@ -42,7 +41,7 @@ type mockJaegerHandler struct { batches []*jaeger.Batch } -func (p *mockJaegerHandler) SubmitBatches(ctx tchanThrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) { +func (p *mockJaegerHandler) SubmitBatches(batches []*jaeger.Batch, _ SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) { p.mux.Lock() defer p.mux.Unlock() p.batches = append(p.batches, batches...) diff --git a/cmd/collector/app/span_processor_test.go b/cmd/collector/app/span_processor_test.go index 736a73f41a1..4891bfbcf1f 100644 --- a/cmd/collector/app/span_processor_test.go +++ b/cmd/collector/app/span_processor_test.go @@ -23,9 +23,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/uber/jaeger-lib/metrics" "github.com/uber/jaeger-lib/metrics/metricstest" - "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" - "golang.org/x/net/context" zipkinSanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" "github.com/jaegertracing/jaeger/model" @@ -83,20 +81,18 @@ func TestBySvcMetrics(t *testing.T) { Options.ReportBusy(false), Options.SpanFilter(isSpanAllowed), ) - ctx := context.Background() - tctx := thrift.Wrap(ctx) var metricPrefix, format string switch test.format { case ZipkinFormatType: span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug) zHandler := NewZipkinSpanHandler(logger, processor, zipkinSanitizer.NewParentIDSanitizer()) - zHandler.SubmitZipkinBatch(tctx, []*zc.Span{span, span}) + zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, SubmitBatchOptions{}) metricPrefix = "service" format = "zipkin" case JaegerFormatType: span, process := makeJaegerSpan(test.serviceName, test.rootSpan, test.debug) jHandler := NewJaegerSpanHandler(logger, processor) - jHandler.SubmitBatches(tctx, []*jaeger.Batch{ + jHandler.SubmitBatches([]*jaeger.Batch{ { Spans: []*jaeger.Span{ span, @@ -104,7 +100,7 @@ func TestBySvcMetrics(t *testing.T) { }, Process: process, }, - }) + }, SubmitBatchOptions{}) metricPrefix = "service" format = "jaeger" default: @@ -217,7 +213,7 @@ func TestSpanProcessor(t *testing.T) { ServiceName: "x", }, }, - }, JaegerFormatType) + }, ProcessSpansOptions{SpanFormat: JaegerFormatType}) assert.NoError(t, err) assert.Equal(t, []bool{true}, res) } @@ -240,7 +236,7 @@ func TestSpanProcessorErrors(t *testing.T) { ServiceName: "x", }, }, - }, JaegerFormatType) + }, ProcessSpansOptions{SpanFormat: JaegerFormatType}) assert.NoError(t, err) assert.Equal(t, []bool{true}, res) @@ -299,7 +295,7 @@ func TestSpanProcessorBusy(t *testing.T) { ServiceName: "x", }, }, - }, JaegerFormatType) + }, ProcessSpansOptions{SpanFormat: JaegerFormatType}) assert.Error(t, err, "expcting busy error") assert.Nil(t, res) diff --git a/cmd/collector/app/tchannel_handler_test.go b/cmd/collector/app/tchannel_handler_test.go new file mode 100644 index 00000000000..8cd0a5d0a2a --- /dev/null +++ b/cmd/collector/app/tchannel_handler_test.go @@ -0,0 +1,53 @@ +// Copyright (c) 2019 The Jaeger Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package app + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/jaegertracing/jaeger/thrift-gen/jaeger" + "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" +) + +type mockZipkinHandler struct { + spans []*zipkincore.Span +} + +func (p *mockZipkinHandler) SubmitZipkinBatch(spans []*zipkincore.Span, opts SubmitBatchOptions) ([]*zipkincore.Response, error) { + p.spans = append(p.spans, spans...) + return nil, nil +} + +func TestTChannelHandler(t *testing.T) { + jh := &mockJaegerHandler{} + zh := &mockZipkinHandler{} + h := NewTChannelHandler(jh, zh) + h.SubmitBatches(nil, []*jaeger.Batch{ + &jaeger.Batch{ + Spans: []*jaeger.Span{ + &jaeger.Span{OperationName: "jaeger"}, + }, + }, + }) + assert.Len(t, jh.getBatches(), 1) + h.SubmitZipkinBatch(nil, []*zipkincore.Span{ + &zipkincore.Span{ + Name: "zipkin", + }, + }) + assert.Len(t, zh.spans, 1) +} diff --git a/cmd/collector/app/thrift_span_handler_test.go b/cmd/collector/app/thrift_span_handler_test.go index 651a9319568..99bdebde60f 100644 --- a/cmd/collector/app/thrift_span_handler_test.go +++ b/cmd/collector/app/thrift_span_handler_test.go @@ -17,10 +17,8 @@ package app import ( "errors" "testing" - "time" "github.com/stretchr/testify/assert" - "github.com/uber/tchannel-go/thrift" "go.uber.org/zap" "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" @@ -43,14 +41,12 @@ func TestJaegerSpanHandler(t *testing.T) { for _, tc := range testChunks { logger := zap.NewNop() h := NewJaegerSpanHandler(logger, &shouldIErrorProcessor{tc.expectedErr != nil}) - ctx, cancel := thrift.NewContext(time.Minute) - defer cancel() - res, err := h.SubmitBatches(ctx, []*jaeger.Batch{ + res, err := h.SubmitBatches([]*jaeger.Batch{ { Process: &jaeger.Process{ServiceName: "someServiceName"}, Spans: []*jaeger.Span{{SpanId: 21345}}, }, - }) + }, SubmitBatchOptions{}) if tc.expectedErr != nil { assert.Nil(t, res) assert.Equal(t, tc.expectedErr, err) @@ -68,7 +64,7 @@ type shouldIErrorProcessor struct { var errTestError = errors.New("Whoops") -func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, format string) ([]bool, error) { +func (s *shouldIErrorProcessor) ProcessSpans(mSpans []*model.Span, _ ProcessSpansOptions) ([]bool, error) { if s.shouldError { return nil, errTestError } @@ -93,13 +89,11 @@ func TestZipkinSpanHandler(t *testing.T) { for _, tc := range testChunks { logger := zap.NewNop() h := NewZipkinSpanHandler(logger, &shouldIErrorProcessor{tc.expectedErr != nil}, zipkin.NewParentIDSanitizer()) - ctx, cancel := thrift.NewContext(time.Minute) - defer cancel() - res, err := h.SubmitZipkinBatch(ctx, []*zipkincore.Span{ + res, err := h.SubmitZipkinBatch([]*zipkincore.Span{ { ID: 12345, }, - }) + }, SubmitBatchOptions{}) if tc.expectedErr != nil { assert.Nil(t, res) assert.Equal(t, tc.expectedErr, err) diff --git a/cmd/collector/app/zipkin/http_handler_test.go b/cmd/collector/app/zipkin/http_handler_test.go index 12c65256526..c18ae007438 100644 --- a/cmd/collector/app/zipkin/http_handler_test.go +++ b/cmd/collector/app/zipkin/http_handler_test.go @@ -30,8 +30,8 @@ import ( "github.com/stretchr/testify/require" jaegerClient "github.com/uber/jaeger-client-go" zipkinTransport "github.com/uber/jaeger-client-go/transport/zipkin" - tchanThrift "github.com/uber/tchannel-go/thrift" + "github.com/jaegertracing/jaeger/cmd/collector/app" zipkinTrift "github.com/jaegertracing/jaeger/model/converter/thrift/zipkin" "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) @@ -44,7 +44,7 @@ type mockZipkinHandler struct { spans []*zipkincore.Span } -func (p *mockZipkinHandler) SubmitZipkinBatch(ctx tchanThrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error) { +func (p *mockZipkinHandler) SubmitZipkinBatch(spans []*zipkincore.Span, opts app.SubmitBatchOptions) ([]*zipkincore.Response, error) { p.mux.Lock() defer p.mux.Unlock() p.spans = append(p.spans, spans...) From 94782c8281217e820287377f51f1e492b45cad73 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 3 Apr 2019 17:06:31 -0400 Subject: [PATCH 3/5] Fix all-in-one main Signed-off-by: Yuri Shkuro --- cmd/all-in-one/main.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cmd/all-in-one/main.go b/cmd/all-in-one/main.go index 33aa0b29af4..1c825c661ba 100644 --- a/cmd/all-in-one/main.go +++ b/cmd/all-in-one/main.go @@ -219,8 +219,9 @@ func startCollector( logger.Fatal("Unable to create new TChannel", zap.Error(err)) } server := thrift.NewServer(ch) - server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler)) - server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler)) + batchHandler := collectorApp.NewTChannelHandler(jaegerBatchesHandler, zipkinSpansHandler) + server.Register(jc.NewTChanCollectorServer(batchHandler)) + server.Register(zc.NewTChanZipkinCollectorServer(batchHandler)) server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore))) portStr := ":" + strconv.Itoa(cOpts.CollectorPort) listener, err := net.Listen("tcp", portStr) From 6fd78dcf025c06cc6977fe14861cb02dfabe46db Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 3 Apr 2019 17:29:57 -0400 Subject: [PATCH 4/5] Appease lint Signed-off-by: Yuri Shkuro --- cmd/collector/app/tchannel_handler.go | 13 ++++++++----- cmd/collector/app/tchannel_handler_test.go | 6 ++++++ 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/cmd/collector/app/tchannel_handler.go b/cmd/collector/app/tchannel_handler.go index 41329184e3c..6a9ca42d2b5 100644 --- a/cmd/collector/app/tchannel_handler.go +++ b/cmd/collector/app/tchannel_handler.go @@ -21,7 +21,8 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) -type tchannelHandler struct { +// TChannelHandler implements jaeger.TChanCollector and zipkincore.TChanZipkinCollector. +type TChannelHandler struct { jaegerHandler JaegerBatchesHandler zipkinHandler ZipkinSpansHandler } @@ -30,21 +31,23 @@ type tchannelHandler struct { func NewTChannelHandler( jaegerHandler JaegerBatchesHandler, zipkinHandler ZipkinSpansHandler, -) *tchannelHandler { - return &tchannelHandler{ +) *TChannelHandler { + return &TChannelHandler{ jaegerHandler: jaegerHandler, zipkinHandler: zipkinHandler, } } -func (h *tchannelHandler) SubmitZipkinBatch( +// SubmitZipkinBatch implements zipkincore.TChanZipkinCollector. +func (h *TChannelHandler) SubmitZipkinBatch( _ thrift.Context, spans []*zipkincore.Span, ) ([]*zipkincore.Response, error) { return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: "tchannel"}) } -func (h *tchannelHandler) SubmitBatches( +// SubmitBatches implements jaeger.TChanCollector. +func (h *TChannelHandler) SubmitBatches( _ thrift.Context, batches []*jaeger.Batch, ) ([]*jaeger.BatchSubmitResponse, error) { diff --git a/cmd/collector/app/tchannel_handler_test.go b/cmd/collector/app/tchannel_handler_test.go index 8cd0a5d0a2a..a0563b498e6 100644 --- a/cmd/collector/app/tchannel_handler_test.go +++ b/cmd/collector/app/tchannel_handler_test.go @@ -23,6 +23,12 @@ import ( "github.com/jaegertracing/jaeger/thrift-gen/zipkincore" ) +var ( + // verify API compliance + _ jaeger.TChanCollector = new(TChannelHandler) + _ zipkincore.TChanZipkinCollector = new(TChannelHandler) +) + type mockZipkinHandler struct { spans []*zipkincore.Span } From 60cbb718082581f1054849480a3ba23444fae3d6 Mon Sep 17 00:00:00 2001 From: Yuri Shkuro Date: Wed, 3 Apr 2019 17:44:45 -0400 Subject: [PATCH 5/5] de-lint Signed-off-by: Yuri Shkuro --- cmd/collector/app/thrift_span_handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/cmd/collector/app/thrift_span_handler.go b/cmd/collector/app/thrift_span_handler.go index a4b486f6077..1d44359d9e3 100644 --- a/cmd/collector/app/thrift_span_handler.go +++ b/cmd/collector/app/thrift_span_handler.go @@ -34,6 +34,7 @@ const ( UnknownFormatType = "unknown" ) +// SubmitBatchOptions are passed to Submit methods of the handlers. type SubmitBatchOptions struct { InboundTransport string }