Skip to content

Commit

Permalink
Refactor handlers to separate transport and span format concerns (#1458)
Browse files Browse the repository at this point in the history
* Refactor handlers

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Fix tests

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Fix all-in-one main

Signed-off-by: Yuri Shkuro <ys@uber.com>

* Appease lint

Signed-off-by: Yuri Shkuro <ys@uber.com>

* de-lint

Signed-off-by: Yuri Shkuro <ys@uber.com>
  • Loading branch information
yurishkuro authored Apr 4, 2019
1 parent 1703bae commit 5c6fd91
Show file tree
Hide file tree
Showing 15 changed files with 175 additions and 56 deletions.
5 changes: 3 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ func startCollector(
logger.Fatal("Unable to create new TChannel", zap.Error(err))
}
server := thrift.NewServer(ch)
server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler))
server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler))
batchHandler := collectorApp.NewTChannelHandler(jaegerBatchesHandler, zipkinSpansHandler)
server.Register(jc.NewTChanCollectorServer(batchHandler))
server.Register(zc.NewTChanZipkinCollectorServer(batchHandler))
server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore)))
portStr := ":" + strconv.Itoa(cOpts.CollectorPort)
listener, err := net.Listen("tcp", portStr)
Expand Down
5 changes: 4 additions & 1 deletion cmd/collector/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
span.Process = r.Batch.Process
}
}
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, JaegerFormatType)
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{
InboundTransport: "grpc", // TODO do we have a constant?
SpanFormat: JaegerFormatType,
})
if err != nil {
g.logger.Error("cannot process spans", zap.Error(err))
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type mockSpanProcessor struct {
spans []*model.Span
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, spanFormat string) ([]bool, error) {
func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts ProcessSpansOptions) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = append(p.spans, spans...)
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/grpcserver/grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,6 @@ func (s mockSamplingStore) GetSamplingStrategy(serviceName string) (*sampling.Sa
type mockSpanProcessor struct {
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, spanFormat string) ([]bool, error) {
func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, _ app.ProcessSpansOptions) ([]bool, error) {
return []bool{}, nil
}
7 changes: 2 additions & 5 deletions cmd/collector/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"io/ioutil"
"mime"
"net/http"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/gorilla/mux"
tchanThrift "github.com/uber/tchannel-go/thrift"

tJaeger "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
)
Expand Down Expand Up @@ -86,10 +84,9 @@ func (aH *APIHandler) saveSpan(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf(UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}
ctx, cancel := tchanThrift.NewContext(time.Minute)
defer cancel()
batches := []*tJaeger.Batch{batch}
if _, err = aH.jaegerBatchesHandler.SubmitBatches(ctx, batches); err != nil {
opts := SubmitBatchOptions{InboundTransport: "http"} // TODO do we have a constant?
if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError)
return
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/collector/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/stretchr/testify/assert"
jaegerClient "github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/transport"
tchanThrift "github.com/uber/tchannel-go/thrift"

"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
)
Expand All @@ -42,7 +41,7 @@ type mockJaegerHandler struct {
batches []*jaeger.Batch
}

func (p *mockJaegerHandler) SubmitBatches(ctx tchanThrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
func (p *mockJaegerHandler) SubmitBatches(batches []*jaeger.Batch, _ SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.batches = append(p.batches, batches...)
Expand Down
16 changes: 14 additions & 2 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// ProcessSpansOptions additional options passed to processor along with the spans.
type ProcessSpansOptions struct {
SpanFormat string
InboundTransport string
}

// SpanProcessor handles model spans
type SpanProcessor interface {
// ProcessSpans processes model spans and return with either a list of true/false success or an error
ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error)
}

type spanProcessor struct {
queue *queue.BoundedQueue
metrics *SpanProcessorMetrics
Expand Down Expand Up @@ -109,12 +121,12 @@ func (sp *spanProcessor) saveSpan(span *model.Span) {
sp.metrics.SaveLatency.Record(time.Since(startTime))
}

func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, spanFormat string) ([]bool, error) {
func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error) {
sp.preProcessSpans(mSpans)
sp.metrics.BatchSize.Update(int64(len(mSpans)))
retMe := make([]bool, len(mSpans))
for i, mSpan := range mSpans {
ok := sp.enqueueSpan(mSpan, spanFormat)
ok := sp.enqueueSpan(mSpan, options.SpanFormat)
if !ok && sp.reportBusy {
return nil, tchannel.ErrServerBusy
}
Expand Down
16 changes: 6 additions & 10 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/jaeger-lib/metrics/metricstest"
"github.com/uber/tchannel-go/thrift"
"go.uber.org/zap"
"golang.org/x/net/context"

zipkinSanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -83,28 +81,26 @@ func TestBySvcMetrics(t *testing.T) {
Options.ReportBusy(false),
Options.SpanFilter(isSpanAllowed),
)
ctx := context.Background()
tctx := thrift.Wrap(ctx)
var metricPrefix, format string
switch test.format {
case ZipkinFormatType:
span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug)
zHandler := NewZipkinSpanHandler(logger, processor, zipkinSanitizer.NewParentIDSanitizer())
zHandler.SubmitZipkinBatch(tctx, []*zc.Span{span, span})
zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, SubmitBatchOptions{})
metricPrefix = "service"
format = "zipkin"
case JaegerFormatType:
span, process := makeJaegerSpan(test.serviceName, test.rootSpan, test.debug)
jHandler := NewJaegerSpanHandler(logger, processor)
jHandler.SubmitBatches(tctx, []*jaeger.Batch{
jHandler.SubmitBatches([]*jaeger.Batch{
{
Spans: []*jaeger.Span{
span,
span,
},
Process: process,
},
})
}, SubmitBatchOptions{})
metricPrefix = "service"
format = "jaeger"
default:
Expand Down Expand Up @@ -217,7 +213,7 @@ func TestSpanProcessor(t *testing.T) {
ServiceName: "x",
},
},
}, JaegerFormatType)
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)
}
Expand All @@ -240,7 +236,7 @@ func TestSpanProcessorErrors(t *testing.T) {
ServiceName: "x",
},
},
}, JaegerFormatType)
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})

assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)
Expand Down Expand Up @@ -299,7 +295,7 @@ func TestSpanProcessorBusy(t *testing.T) {
ServiceName: "x",
},
},
}, JaegerFormatType)
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})

assert.Error(t, err, "expcting busy error")
assert.Nil(t, res)
Expand Down
55 changes: 55 additions & 0 deletions cmd/collector/app/tchannel_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2019 The Jaeger Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"github.com/uber/tchannel-go/thrift"

"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

// TChannelHandler implements jaeger.TChanCollector and zipkincore.TChanZipkinCollector.
type TChannelHandler struct {
jaegerHandler JaegerBatchesHandler
zipkinHandler ZipkinSpansHandler
}

// NewTChannelHandler creates new handler that implements both Jaeger and Zipkin endpoints.
func NewTChannelHandler(
jaegerHandler JaegerBatchesHandler,
zipkinHandler ZipkinSpansHandler,
) *TChannelHandler {
return &TChannelHandler{
jaegerHandler: jaegerHandler,
zipkinHandler: zipkinHandler,
}
}

// SubmitZipkinBatch implements zipkincore.TChanZipkinCollector.
func (h *TChannelHandler) SubmitZipkinBatch(
_ thrift.Context,
spans []*zipkincore.Span,
) ([]*zipkincore.Response, error) {
return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: "tchannel"})
}

// SubmitBatches implements jaeger.TChanCollector.
func (h *TChannelHandler) SubmitBatches(
_ thrift.Context,
batches []*jaeger.Batch,
) ([]*jaeger.BatchSubmitResponse, error) {
return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: "tchannel"})
}
59 changes: 59 additions & 0 deletions cmd/collector/app/tchannel_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2019 The Jaeger Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

var (
// verify API compliance
_ jaeger.TChanCollector = new(TChannelHandler)
_ zipkincore.TChanZipkinCollector = new(TChannelHandler)
)

type mockZipkinHandler struct {
spans []*zipkincore.Span
}

func (p *mockZipkinHandler) SubmitZipkinBatch(spans []*zipkincore.Span, opts SubmitBatchOptions) ([]*zipkincore.Response, error) {
p.spans = append(p.spans, spans...)
return nil, nil
}

func TestTChannelHandler(t *testing.T) {
jh := &mockJaegerHandler{}
zh := &mockZipkinHandler{}
h := NewTChannelHandler(jh, zh)
h.SubmitBatches(nil, []*jaeger.Batch{
&jaeger.Batch{
Spans: []*jaeger.Span{
&jaeger.Span{OperationName: "jaeger"},
},
},
})
assert.Len(t, jh.getBatches(), 1)
h.SubmitZipkinBatch(nil, []*zipkincore.Span{
&zipkincore.Span{
Name: "zipkin",
},
})
assert.Len(t, zh.spans, 1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package app

import (
"github.com/uber/tchannel-go/thrift"
"go.uber.org/zap"

zipkinS "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
Expand All @@ -35,22 +34,21 @@ const (
UnknownFormatType = "unknown"
)

// SubmitBatchOptions are passed to Submit methods of the handlers.
type SubmitBatchOptions struct {
InboundTransport string
}

// ZipkinSpansHandler consumes and handles zipkin spans
type ZipkinSpansHandler interface {
// SubmitZipkinBatch records a batch of spans in Zipkin Thrift format
SubmitZipkinBatch(ctx thrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error)
SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error)
}

// JaegerBatchesHandler consumes and handles Jaeger batches
type JaegerBatchesHandler interface {
// SubmitBatches records a batch of spans in Jaeger Thrift format
SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error)
}

// SpanProcessor handles model spans
type SpanProcessor interface {
// ProcessSpans processes model spans and return with either a list of true/false success or an error
ProcessSpans(mSpans []*model.Span, spanFormat string) ([]bool, error)
SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error)
}

type jaegerBatchesHandler struct {
Expand All @@ -66,15 +64,18 @@ func NewJaegerSpanHandler(logger *zap.Logger, modelProcessor SpanProcessor) Jaeg
}
}

func (jbh *jaegerBatchesHandler) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
responses := make([]*jaeger.BatchSubmitResponse, 0, len(batches))
for _, batch := range batches {
mSpans := make([]*model.Span, 0, len(batch.Spans))
for _, span := range batch.Spans {
mSpan := jConv.ToDomainSpan(span, batch.Process)
mSpans = append(mSpans, mSpan)
}
oks, err := jbh.modelProcessor.ProcessSpans(mSpans, JaegerFormatType)
oks, err := jbh.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: JaegerFormatType,
})
if err != nil {
jbh.logger.Error("Collector failed to process span batch", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -112,13 +113,16 @@ func NewZipkinSpanHandler(logger *zap.Logger, modelHandler SpanProcessor, saniti
}

// SubmitZipkinBatch records a batch of spans already in Zipkin Thrift format.
func (h *zipkinSpanHandler) SubmitZipkinBatch(ctx thrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error) {
func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) {
mSpans := make([]*model.Span, 0, len(spans))
for _, span := range spans {
sanitized := h.sanitizer.Sanitize(span)
mSpans = append(mSpans, convertZipkinToModel(sanitized, h.logger)...)
}
bools, err := h.modelProcessor.ProcessSpans(mSpans, ZipkinFormatType)
bools, err := h.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: ZipkinFormatType,
})
if err != nil {
h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err))
return nil, err
Expand Down
Loading

0 comments on commit 5c6fd91

Please sign in to comment.