Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor handlers to separate transport and span format concerns #1458

Merged
merged 5 commits into from
Apr 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions cmd/all-in-one/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,9 @@ func startCollector(
logger.Fatal("Unable to create new TChannel", zap.Error(err))
}
server := thrift.NewServer(ch)
server.Register(jc.NewTChanCollectorServer(jaegerBatchesHandler))
server.Register(zc.NewTChanZipkinCollectorServer(zipkinSpansHandler))
batchHandler := collectorApp.NewTChannelHandler(jaegerBatchesHandler, zipkinSpansHandler)
server.Register(jc.NewTChanCollectorServer(batchHandler))
server.Register(zc.NewTChanZipkinCollectorServer(batchHandler))
server.Register(sc.NewTChanSamplingManagerServer(sampling.NewHandler(strategyStore)))
portStr := ":" + strconv.Itoa(cOpts.CollectorPort)
listener, err := net.Listen("tcp", portStr)
Expand Down
5 changes: 4 additions & 1 deletion cmd/collector/app/grpc_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ func (g *GRPCHandler) PostSpans(ctx context.Context, r *api_v2.PostSpansRequest)
span.Process = r.Batch.Process
}
}
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, JaegerFormatType)
_, err := g.spanProcessor.ProcessSpans(r.GetBatch().Spans, ProcessSpansOptions{
InboundTransport: "grpc", // TODO do we have a constant?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we create constants if they don't exist?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jude is doing it in the next PR. Right now these strings are not used anywhere.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have the followup story to address that.

SpanFormat: JaegerFormatType,
})
if err != nil {
g.logger.Error("cannot process spans", zap.Error(err))
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/grpc_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ type mockSpanProcessor struct {
spans []*model.Span
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, spanFormat string) ([]bool, error) {
func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, opts ProcessSpansOptions) ([]bool, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.spans = append(p.spans, spans...)
Expand Down
2 changes: 1 addition & 1 deletion cmd/collector/app/grpcserver/grpc_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,6 @@ func (s mockSamplingStore) GetSamplingStrategy(serviceName string) (*sampling.Sa
type mockSpanProcessor struct {
}

func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, spanFormat string) ([]bool, error) {
func (p *mockSpanProcessor) ProcessSpans(spans []*model.Span, _ app.ProcessSpansOptions) ([]bool, error) {
return []bool{}, nil
}
7 changes: 2 additions & 5 deletions cmd/collector/app/http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,9 @@ import (
"io/ioutil"
"mime"
"net/http"
"time"

"github.com/apache/thrift/lib/go/thrift"
"github.com/gorilla/mux"
tchanThrift "github.com/uber/tchannel-go/thrift"

tJaeger "github.com/jaegertracing/jaeger/thrift-gen/jaeger"
)
Expand Down Expand Up @@ -86,10 +84,9 @@ func (aH *APIHandler) saveSpan(w http.ResponseWriter, r *http.Request) {
http.Error(w, fmt.Sprintf(UnableToReadBodyErrFormat, err), http.StatusBadRequest)
return
}
ctx, cancel := tchanThrift.NewContext(time.Minute)
defer cancel()
batches := []*tJaeger.Batch{batch}
if _, err = aH.jaegerBatchesHandler.SubmitBatches(ctx, batches); err != nil {
opts := SubmitBatchOptions{InboundTransport: "http"} // TODO do we have a constant?
if _, err = aH.jaegerBatchesHandler.SubmitBatches(batches, opts); err != nil {
http.Error(w, fmt.Sprintf("Cannot submit Jaeger batch: %v", err), http.StatusInternalServerError)
return
}
Expand Down
3 changes: 1 addition & 2 deletions cmd/collector/app/http_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/stretchr/testify/assert"
jaegerClient "github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/transport"
tchanThrift "github.com/uber/tchannel-go/thrift"

"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
)
Expand All @@ -42,7 +41,7 @@ type mockJaegerHandler struct {
batches []*jaeger.Batch
}

func (p *mockJaegerHandler) SubmitBatches(ctx tchanThrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
func (p *mockJaegerHandler) SubmitBatches(batches []*jaeger.Batch, _ SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
p.mux.Lock()
defer p.mux.Unlock()
p.batches = append(p.batches, batches...)
Expand Down
16 changes: 14 additions & 2 deletions cmd/collector/app/span_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ import (
"github.com/jaegertracing/jaeger/storage/spanstore"
)

// ProcessSpansOptions additional options passed to processor along with the spans.
type ProcessSpansOptions struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would happen if neither SpanFormat nor InboundTransport are set?

SpanFormat string
InboundTransport string
}

// SpanProcessor handles model spans
type SpanProcessor interface {
// ProcessSpans processes model spans and return with either a list of true/false success or an error
ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error)
}

type spanProcessor struct {
queue *queue.BoundedQueue
metrics *SpanProcessorMetrics
Expand Down Expand Up @@ -109,12 +121,12 @@ func (sp *spanProcessor) saveSpan(span *model.Span) {
sp.metrics.SaveLatency.Record(time.Since(startTime))
}

func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, spanFormat string) ([]bool, error) {
func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error) {
sp.preProcessSpans(mSpans)
sp.metrics.BatchSize.Update(int64(len(mSpans)))
retMe := make([]bool, len(mSpans))
for i, mSpan := range mSpans {
ok := sp.enqueueSpan(mSpan, spanFormat)
ok := sp.enqueueSpan(mSpan, options.SpanFormat)
if !ok && sp.reportBusy {
return nil, tchannel.ErrServerBusy
}
Expand Down
16 changes: 6 additions & 10 deletions cmd/collector/app/span_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/uber/jaeger-lib/metrics"
"github.com/uber/jaeger-lib/metrics/metricstest"
"github.com/uber/tchannel-go/thrift"
"go.uber.org/zap"
"golang.org/x/net/context"

zipkinSanitizer "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
"github.com/jaegertracing/jaeger/model"
Expand Down Expand Up @@ -83,28 +81,26 @@ func TestBySvcMetrics(t *testing.T) {
Options.ReportBusy(false),
Options.SpanFilter(isSpanAllowed),
)
ctx := context.Background()
tctx := thrift.Wrap(ctx)
var metricPrefix, format string
switch test.format {
case ZipkinFormatType:
span := makeZipkinSpan(test.serviceName, test.rootSpan, test.debug)
zHandler := NewZipkinSpanHandler(logger, processor, zipkinSanitizer.NewParentIDSanitizer())
zHandler.SubmitZipkinBatch(tctx, []*zc.Span{span, span})
zHandler.SubmitZipkinBatch([]*zc.Span{span, span}, SubmitBatchOptions{})
metricPrefix = "service"
format = "zipkin"
case JaegerFormatType:
span, process := makeJaegerSpan(test.serviceName, test.rootSpan, test.debug)
jHandler := NewJaegerSpanHandler(logger, processor)
jHandler.SubmitBatches(tctx, []*jaeger.Batch{
jHandler.SubmitBatches([]*jaeger.Batch{
{
Spans: []*jaeger.Span{
span,
span,
},
Process: process,
},
})
}, SubmitBatchOptions{})
metricPrefix = "service"
format = "jaeger"
default:
Expand Down Expand Up @@ -217,7 +213,7 @@ func TestSpanProcessor(t *testing.T) {
ServiceName: "x",
},
},
}, JaegerFormatType)
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})
assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)
}
Expand All @@ -240,7 +236,7 @@ func TestSpanProcessorErrors(t *testing.T) {
ServiceName: "x",
},
},
}, JaegerFormatType)
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})

assert.NoError(t, err)
assert.Equal(t, []bool{true}, res)
Expand Down Expand Up @@ -299,7 +295,7 @@ func TestSpanProcessorBusy(t *testing.T) {
ServiceName: "x",
},
},
}, JaegerFormatType)
}, ProcessSpansOptions{SpanFormat: JaegerFormatType})

assert.Error(t, err, "expcting busy error")
assert.Nil(t, res)
Expand Down
55 changes: 55 additions & 0 deletions cmd/collector/app/tchannel_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright (c) 2019 The Jaeger Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"github.com/uber/tchannel-go/thrift"

"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

// TChannelHandler implements jaeger.TChanCollector and zipkincore.TChanZipkinCollector.
type TChannelHandler struct {
jaegerHandler JaegerBatchesHandler
zipkinHandler ZipkinSpansHandler
}

// NewTChannelHandler creates new handler that implements both Jaeger and Zipkin endpoints.
func NewTChannelHandler(
jaegerHandler JaegerBatchesHandler,
zipkinHandler ZipkinSpansHandler,
) *TChannelHandler {
return &TChannelHandler{
jaegerHandler: jaegerHandler,
zipkinHandler: zipkinHandler,
}
}

// SubmitZipkinBatch implements zipkincore.TChanZipkinCollector.
func (h *TChannelHandler) SubmitZipkinBatch(
_ thrift.Context,
spans []*zipkincore.Span,
) ([]*zipkincore.Response, error) {
return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: "tchannel"})
}

// SubmitBatches implements jaeger.TChanCollector.
func (h *TChannelHandler) SubmitBatches(
_ thrift.Context,
batches []*jaeger.Batch,
) ([]*jaeger.BatchSubmitResponse, error) {
return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: "tchannel"})
}
59 changes: 59 additions & 0 deletions cmd/collector/app/tchannel_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright (c) 2019 The Jaeger Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package app

import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/jaegertracing/jaeger/thrift-gen/jaeger"
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore"
)

var (
// verify API compliance
_ jaeger.TChanCollector = new(TChannelHandler)
_ zipkincore.TChanZipkinCollector = new(TChannelHandler)
)

type mockZipkinHandler struct {
spans []*zipkincore.Span
}

func (p *mockZipkinHandler) SubmitZipkinBatch(spans []*zipkincore.Span, opts SubmitBatchOptions) ([]*zipkincore.Response, error) {
p.spans = append(p.spans, spans...)
return nil, nil
}

func TestTChannelHandler(t *testing.T) {
jh := &mockJaegerHandler{}
zh := &mockZipkinHandler{}
h := NewTChannelHandler(jh, zh)
h.SubmitBatches(nil, []*jaeger.Batch{
&jaeger.Batch{
Spans: []*jaeger.Span{
&jaeger.Span{OperationName: "jaeger"},
},
},
})
assert.Len(t, jh.getBatches(), 1)
h.SubmitZipkinBatch(nil, []*zipkincore.Span{
&zipkincore.Span{
Name: "zipkin",
},
})
assert.Len(t, zh.spans, 1)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package app

import (
"github.com/uber/tchannel-go/thrift"
"go.uber.org/zap"

zipkinS "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin"
Expand All @@ -35,22 +34,21 @@ const (
UnknownFormatType = "unknown"
)

// SubmitBatchOptions are passed to Submit methods of the handlers.
type SubmitBatchOptions struct {
InboundTransport string
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above: what are the implications of having the InboundTransport not being specified?

Copy link
Contributor

@guanw guanw Apr 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a followup story to take care of that. I imagine if InBoundTransport is not set, we would simply skip setting another dimension for span counter metrics.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't skip, but assign an "undefined" value so that the total counter is still correct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't skip, but assign an "undefined" value so that the total counter is still correct.

hmmm. What I meant was something like the following:

    tags := map[string]string{"svc": serviceName, "debug": debugStr}
    if endpointType != "" {
	  tags["transport"] = endpointType
    }
    c := m.factory.Counter(metrics.Options{Name: m.category, Tags: tags})

so if endpointType is empty we still count it just not having a transport tag. Is adding undefined tag mandatory here for metrics to work correctly?

Copy link
Member Author

@yurishkuro yurishkuro Apr 4, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, first, we don't create counter objects with labels in real time, because it's expensive and we always prefer to pre-create them when possible and cache. Second, when creating a counter we must define a set of labels (tags) it has. Some metrics backends may not like the value of the label to be blank. Why not be explicit about the value then?

    tags := map[string]string{
        "svc": serviceName, 
        "debug": debugStr,  
        "transport": "undefined", // default
    }
    if endpointType != "" {
	 tags["transport"] = endpointType
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha

}

// ZipkinSpansHandler consumes and handles zipkin spans
type ZipkinSpansHandler interface {
// SubmitZipkinBatch records a batch of spans in Zipkin Thrift format
SubmitZipkinBatch(ctx thrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error)
SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error)
}

// JaegerBatchesHandler consumes and handles Jaeger batches
type JaegerBatchesHandler interface {
// SubmitBatches records a batch of spans in Jaeger Thrift format
SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error)
}

// SpanProcessor handles model spans
type SpanProcessor interface {
// ProcessSpans processes model spans and return with either a list of true/false success or an error
ProcessSpans(mSpans []*model.Span, spanFormat string) ([]bool, error)
SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error)
}

type jaegerBatchesHandler struct {
Expand All @@ -66,15 +64,18 @@ func NewJaegerSpanHandler(logger *zap.Logger, modelProcessor SpanProcessor) Jaeg
}
}

func (jbh *jaegerBatchesHandler) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) {
func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) {
responses := make([]*jaeger.BatchSubmitResponse, 0, len(batches))
for _, batch := range batches {
mSpans := make([]*model.Span, 0, len(batch.Spans))
for _, span := range batch.Spans {
mSpan := jConv.ToDomainSpan(span, batch.Process)
mSpans = append(mSpans, mSpan)
}
oks, err := jbh.modelProcessor.ProcessSpans(mSpans, JaegerFormatType)
oks, err := jbh.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: JaegerFormatType,
})
if err != nil {
jbh.logger.Error("Collector failed to process span batch", zap.Error(err))
return nil, err
Expand Down Expand Up @@ -112,13 +113,16 @@ func NewZipkinSpanHandler(logger *zap.Logger, modelHandler SpanProcessor, saniti
}

// SubmitZipkinBatch records a batch of spans already in Zipkin Thrift format.
func (h *zipkinSpanHandler) SubmitZipkinBatch(ctx thrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error) {
func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) {
mSpans := make([]*model.Span, 0, len(spans))
for _, span := range spans {
sanitized := h.sanitizer.Sanitize(span)
mSpans = append(mSpans, convertZipkinToModel(sanitized, h.logger)...)
}
bools, err := h.modelProcessor.ProcessSpans(mSpans, ZipkinFormatType)
bools, err := h.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{
InboundTransport: options.InboundTransport,
SpanFormat: ZipkinFormatType,
})
if err != nil {
h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err))
return nil, err
Expand Down
Loading