-
Notifications
You must be signed in to change notification settings - Fork 2.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Refactor handlers to separate transport and span format concerns #1458
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -26,6 +26,18 @@ import ( | |
"github.com/jaegertracing/jaeger/storage/spanstore" | ||
) | ||
|
||
// ProcessSpansOptions additional options passed to processor along with the spans. | ||
type ProcessSpansOptions struct { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What would happen if neither |
||
SpanFormat string | ||
InboundTransport string | ||
} | ||
|
||
// SpanProcessor handles model spans | ||
type SpanProcessor interface { | ||
// ProcessSpans processes model spans and return with either a list of true/false success or an error | ||
ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error) | ||
} | ||
|
||
type spanProcessor struct { | ||
queue *queue.BoundedQueue | ||
metrics *SpanProcessorMetrics | ||
|
@@ -109,12 +121,12 @@ func (sp *spanProcessor) saveSpan(span *model.Span) { | |
sp.metrics.SaveLatency.Record(time.Since(startTime)) | ||
} | ||
|
||
func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, spanFormat string) ([]bool, error) { | ||
func (sp *spanProcessor) ProcessSpans(mSpans []*model.Span, options ProcessSpansOptions) ([]bool, error) { | ||
sp.preProcessSpans(mSpans) | ||
sp.metrics.BatchSize.Update(int64(len(mSpans))) | ||
retMe := make([]bool, len(mSpans)) | ||
for i, mSpan := range mSpans { | ||
ok := sp.enqueueSpan(mSpan, spanFormat) | ||
ok := sp.enqueueSpan(mSpan, options.SpanFormat) | ||
if !ok && sp.reportBusy { | ||
return nil, tchannel.ErrServerBusy | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,55 @@ | ||
// Copyright (c) 2019 The Jaeger Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package app | ||
|
||
import ( | ||
"github.com/uber/tchannel-go/thrift" | ||
|
||
"github.com/jaegertracing/jaeger/thrift-gen/jaeger" | ||
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore" | ||
) | ||
|
||
// TChannelHandler implements jaeger.TChanCollector and zipkincore.TChanZipkinCollector. | ||
type TChannelHandler struct { | ||
jaegerHandler JaegerBatchesHandler | ||
zipkinHandler ZipkinSpansHandler | ||
} | ||
|
||
// NewTChannelHandler creates new handler that implements both Jaeger and Zipkin endpoints. | ||
func NewTChannelHandler( | ||
jaegerHandler JaegerBatchesHandler, | ||
zipkinHandler ZipkinSpansHandler, | ||
) *TChannelHandler { | ||
return &TChannelHandler{ | ||
jaegerHandler: jaegerHandler, | ||
zipkinHandler: zipkinHandler, | ||
} | ||
} | ||
|
||
// SubmitZipkinBatch implements zipkincore.TChanZipkinCollector. | ||
func (h *TChannelHandler) SubmitZipkinBatch( | ||
_ thrift.Context, | ||
spans []*zipkincore.Span, | ||
) ([]*zipkincore.Response, error) { | ||
return h.zipkinHandler.SubmitZipkinBatch(spans, SubmitBatchOptions{InboundTransport: "tchannel"}) | ||
} | ||
|
||
// SubmitBatches implements jaeger.TChanCollector. | ||
func (h *TChannelHandler) SubmitBatches( | ||
_ thrift.Context, | ||
batches []*jaeger.Batch, | ||
) ([]*jaeger.BatchSubmitResponse, error) { | ||
return h.jaegerHandler.SubmitBatches(batches, SubmitBatchOptions{InboundTransport: "tchannel"}) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
// Copyright (c) 2019 The Jaeger Authors | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package app | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/assert" | ||
|
||
"github.com/jaegertracing/jaeger/thrift-gen/jaeger" | ||
"github.com/jaegertracing/jaeger/thrift-gen/zipkincore" | ||
) | ||
|
||
var ( | ||
// verify API compliance | ||
_ jaeger.TChanCollector = new(TChannelHandler) | ||
_ zipkincore.TChanZipkinCollector = new(TChannelHandler) | ||
) | ||
|
||
type mockZipkinHandler struct { | ||
spans []*zipkincore.Span | ||
} | ||
|
||
func (p *mockZipkinHandler) SubmitZipkinBatch(spans []*zipkincore.Span, opts SubmitBatchOptions) ([]*zipkincore.Response, error) { | ||
p.spans = append(p.spans, spans...) | ||
return nil, nil | ||
} | ||
|
||
func TestTChannelHandler(t *testing.T) { | ||
jh := &mockJaegerHandler{} | ||
zh := &mockZipkinHandler{} | ||
h := NewTChannelHandler(jh, zh) | ||
h.SubmitBatches(nil, []*jaeger.Batch{ | ||
&jaeger.Batch{ | ||
Spans: []*jaeger.Span{ | ||
&jaeger.Span{OperationName: "jaeger"}, | ||
}, | ||
}, | ||
}) | ||
assert.Len(t, jh.getBatches(), 1) | ||
h.SubmitZipkinBatch(nil, []*zipkincore.Span{ | ||
&zipkincore.Span{ | ||
Name: "zipkin", | ||
}, | ||
}) | ||
assert.Len(t, zh.spans, 1) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,7 +15,6 @@ | |
package app | ||
|
||
import ( | ||
"github.com/uber/tchannel-go/thrift" | ||
"go.uber.org/zap" | ||
|
||
zipkinS "github.com/jaegertracing/jaeger/cmd/collector/app/sanitizer/zipkin" | ||
|
@@ -35,22 +34,21 @@ const ( | |
UnknownFormatType = "unknown" | ||
) | ||
|
||
// SubmitBatchOptions are passed to Submit methods of the handlers. | ||
type SubmitBatchOptions struct { | ||
InboundTransport string | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same as above: what are the implications of having the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have a followup story to take care of that. I imagine if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We shouldn't skip, but assign an "undefined" value so that the total counter is still correct. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
hmmm. What I meant was something like the following:
so if endpointType is empty we still count it just not having a transport tag. Is adding There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, first, we don't create counter objects with labels in real time, because it's expensive and we always prefer to pre-create them when possible and cache. Second, when creating a counter we must define a set of labels (tags) it has. Some metrics backends may not like the value of the label to be blank. Why not be explicit about the value then?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gotcha |
||
} | ||
|
||
// ZipkinSpansHandler consumes and handles zipkin spans | ||
type ZipkinSpansHandler interface { | ||
// SubmitZipkinBatch records a batch of spans in Zipkin Thrift format | ||
SubmitZipkinBatch(ctx thrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error) | ||
SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) | ||
} | ||
|
||
// JaegerBatchesHandler consumes and handles Jaeger batches | ||
type JaegerBatchesHandler interface { | ||
// SubmitBatches records a batch of spans in Jaeger Thrift format | ||
SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) | ||
} | ||
|
||
// SpanProcessor handles model spans | ||
type SpanProcessor interface { | ||
// ProcessSpans processes model spans and return with either a list of true/false success or an error | ||
ProcessSpans(mSpans []*model.Span, spanFormat string) ([]bool, error) | ||
SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) | ||
} | ||
|
||
type jaegerBatchesHandler struct { | ||
|
@@ -66,15 +64,18 @@ func NewJaegerSpanHandler(logger *zap.Logger, modelProcessor SpanProcessor) Jaeg | |
} | ||
} | ||
|
||
func (jbh *jaegerBatchesHandler) SubmitBatches(ctx thrift.Context, batches []*jaeger.Batch) ([]*jaeger.BatchSubmitResponse, error) { | ||
func (jbh *jaegerBatchesHandler) SubmitBatches(batches []*jaeger.Batch, options SubmitBatchOptions) ([]*jaeger.BatchSubmitResponse, error) { | ||
responses := make([]*jaeger.BatchSubmitResponse, 0, len(batches)) | ||
for _, batch := range batches { | ||
mSpans := make([]*model.Span, 0, len(batch.Spans)) | ||
for _, span := range batch.Spans { | ||
mSpan := jConv.ToDomainSpan(span, batch.Process) | ||
mSpans = append(mSpans, mSpan) | ||
} | ||
oks, err := jbh.modelProcessor.ProcessSpans(mSpans, JaegerFormatType) | ||
oks, err := jbh.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{ | ||
InboundTransport: options.InboundTransport, | ||
SpanFormat: JaegerFormatType, | ||
}) | ||
if err != nil { | ||
jbh.logger.Error("Collector failed to process span batch", zap.Error(err)) | ||
return nil, err | ||
|
@@ -112,13 +113,16 @@ func NewZipkinSpanHandler(logger *zap.Logger, modelHandler SpanProcessor, saniti | |
} | ||
|
||
// SubmitZipkinBatch records a batch of spans already in Zipkin Thrift format. | ||
func (h *zipkinSpanHandler) SubmitZipkinBatch(ctx thrift.Context, spans []*zipkincore.Span) ([]*zipkincore.Response, error) { | ||
func (h *zipkinSpanHandler) SubmitZipkinBatch(spans []*zipkincore.Span, options SubmitBatchOptions) ([]*zipkincore.Response, error) { | ||
mSpans := make([]*model.Span, 0, len(spans)) | ||
for _, span := range spans { | ||
sanitized := h.sanitizer.Sanitize(span) | ||
mSpans = append(mSpans, convertZipkinToModel(sanitized, h.logger)...) | ||
} | ||
bools, err := h.modelProcessor.ProcessSpans(mSpans, ZipkinFormatType) | ||
bools, err := h.modelProcessor.ProcessSpans(mSpans, ProcessSpansOptions{ | ||
InboundTransport: options.InboundTransport, | ||
SpanFormat: ZipkinFormatType, | ||
}) | ||
if err != nil { | ||
h.logger.Error("Collector failed to process Zipkin span batch", zap.Error(err)) | ||
return nil, err | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we create constants if they don't exist?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Jude is doing it in the next PR. Right now these strings are not used anywhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I have the followup story to address that.