From 6d1a4e8e6523300fd961066f8e65ac317ff9f986 Mon Sep 17 00:00:00 2001 From: Marais Kruger Date: Tue, 18 May 2021 18:05:34 +0300 Subject: [PATCH 1/5] add open tracing to pulsar client --- go.mod | 3 +- go.sum | 2 + .../internal/tracing/consumer-interceptor.go | 40 ++++++++++ .../tracing/message-carrier-adaptors.go | 66 ++++++++++++++++ .../internal/tracing/message-carrier-util.go | 79 +++++++++++++++++++ .../internal/tracing/producer-interceptor.go | 39 +++++++++ pulsar/internal/tracing/span-enrichment.go | 26 ++++++ 7 files changed, 253 insertions(+), 2 deletions(-) create mode 100644 pulsar/internal/tracing/consumer-interceptor.go create mode 100644 pulsar/internal/tracing/message-carrier-adaptors.go create mode 100644 pulsar/internal/tracing/message-carrier-util.go create mode 100644 pulsar/internal/tracing/producer-interceptor.go create mode 100644 pulsar/internal/tracing/span-enrichment.go diff --git a/go.mod b/go.mod index 1ecd393acd..9a5534a809 100644 --- a/go.mod +++ b/go.mod @@ -15,8 +15,7 @@ require ( github.com/klauspost/compress v1.10.8 github.com/kr/pretty v0.2.0 // indirect github.com/linkedin/goavro/v2 v2.9.8 - github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect - github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/opentracing/opentracing-go v1.2.0 github.com/pierrec/lz4 v2.0.5+incompatible github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.7.1 diff --git a/go.sum b/go.sum index a14857b2d2..1c9bc971c0 100644 --- a/go.sum +++ b/go.sum @@ -114,6 +114,8 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY= github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE= github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo= +github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= +github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= diff --git a/pulsar/internal/tracing/consumer-interceptor.go b/pulsar/internal/tracing/consumer-interceptor.go new file mode 100644 index 0000000000..5adc3e116b --- /dev/null +++ b/pulsar/internal/tracing/consumer-interceptor.go @@ -0,0 +1,40 @@ +package pulsartracing + +import ( + "context" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/opentracing/opentracing-go" +) + +const fromPrefix = "From__" + +type ConsumerInterceptor struct { +} + +func (t *ConsumerInterceptor) BeforeConsume(message pulsar.ConsumerMessage) { + buildAndInjectChildSpan(message).Finish() +} + +func (t *ConsumerInterceptor) OnAcknowledge(consumer pulsar.Consumer, msgID pulsar.MessageID) {} + +func (t *ConsumerInterceptor) OnNegativeAcksSend(consumer pulsar.Consumer, msgIDs []pulsar.MessageID) { +} + +func buildAndInjectChildSpan(message pulsar.ConsumerMessage) opentracing.Span { + tracer := opentracing.GlobalTracer() + parentContext := ExtractSpanContextFromConsumerMessage(message) + + var span opentracing.Span + + var startSpanOptions []opentracing.StartSpanOption + if parentContext != nil { + startSpanOptions = []opentracing.StartSpanOption{opentracing.FollowsFrom(parentContext)} + } + + span = tracer.StartSpan(fromPrefix+message.Topic()+"__"+message.Subscription(), startSpanOptions...) + + enrichConsumerSpan(message, span) + InjectConsumerMessageSpanContext(opentracing.ContextWithSpan(context.Background(), span), message) + + return span +} diff --git a/pulsar/internal/tracing/message-carrier-adaptors.go b/pulsar/internal/tracing/message-carrier-adaptors.go new file mode 100644 index 0000000000..b0ca62d451 --- /dev/null +++ b/pulsar/internal/tracing/message-carrier-adaptors.go @@ -0,0 +1,66 @@ +package pulsartracing + +import ( + "errors" + "github.com/apache/pulsar-client-go/pulsar" +) + +// ProducerMessageExtractAdapter Implements TextMap Interface +type ProducerMessageExtractAdapter struct { + message *pulsar.ProducerMessage +} + +func (a *ProducerMessageExtractAdapter) ForeachKey(handler func(key, val string) error) error { + for k, v := range (*a.message).Properties { + if err := handler(k, v); err != nil { + return err + } + } + + return nil +} + +func (a *ProducerMessageExtractAdapter) Set(key, val string) {} + +// ProducerMessageInjectAdapter Implements TextMap Interface +type ProducerMessageInjectAdapter struct { + message *pulsar.ProducerMessage +} + +func (a *ProducerMessageInjectAdapter) ForeachKey(handler func(key, val string) error) error { + return errors.New("iterator should never be used with Tracer.inject()") +} + +func (a *ProducerMessageInjectAdapter) Set(key, val string) { + a.message.Properties[key] = val +} + +// ConsumerMessageExtractAdapter Implements TextMap Interface +type ConsumerMessageExtractAdapter struct { + message pulsar.ConsumerMessage +} + +func (a *ConsumerMessageExtractAdapter) ForeachKey(handler func(key, val string) error) error { + for k, v := range a.message.Properties() { + if err := handler(k, v); err != nil { + return err + } + } + + return nil +} + +func (a *ConsumerMessageExtractAdapter) Set(key, val string) {} + +// ConsumerMessageInjectAdapter Implements TextMap Interface +type ConsumerMessageInjectAdapter struct { + message pulsar.ConsumerMessage +} + +func (a *ConsumerMessageInjectAdapter) ForeachKey(handler func(key, val string) error) error { + return errors.New("iterator should never be used with tracer.inject()") +} + +func (a *ConsumerMessageInjectAdapter) Set(key, val string) { + a.message.Properties()[key] = val +} diff --git a/pulsar/internal/tracing/message-carrier-util.go b/pulsar/internal/tracing/message-carrier-util.go new file mode 100644 index 0000000000..371370238a --- /dev/null +++ b/pulsar/internal/tracing/message-carrier-util.go @@ -0,0 +1,79 @@ +package pulsartracing + +import ( + "context" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/opentracing/opentracing-go" + log "github.com/sirupsen/logrus" +) + +func InjectProducerMessageSpanContext(ctx context.Context, message *pulsar.ProducerMessage) { + injectAdapter := &ProducerMessageInjectAdapter{message} + + span := opentracing.SpanFromContext(ctx) + + for k, v := range message.Properties { + span.SetTag(k, v) + } + + err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, injectAdapter) + + if err != nil { + log.Error("could not inject span context into pulsar message", err) + } +} + +func ExtractSpanContextFromProducerMessage(message *pulsar.ProducerMessage) opentracing.SpanContext { + extractAdapter := &ProducerMessageExtractAdapter{message} + + spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, extractAdapter) + + if err != nil { + log.Error("could not extract span context from pulsar message", err) + } + + return spanContext +} + +func ExtractSpanContextFromConsumerMessage(message pulsar.ConsumerMessage) opentracing.SpanContext { + extractAdapter := &ConsumerMessageExtractAdapter{message} + + spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, extractAdapter) + + if err != nil { + log.Error("could not extract span context from pulsar message", err) + } + + return spanContext +} + +func InjectConsumerMessageSpanContext(ctx context.Context, message pulsar.ConsumerMessage) { + injectAdapter := &ConsumerMessageInjectAdapter{message} + span := opentracing.SpanFromContext(ctx) + + if span == nil { + log.Warn("no span could be extracted from context, nothing will be injected into the message properties") + return + } + + for k, v := range message.Properties() { + span.SetTag(k, v) + } + + err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, injectAdapter) + + if err != nil { + log.Error("could not inject span context into pulsar message", err) + } +} + +func CreateSpanFromMessage(cm *pulsar.ConsumerMessage, tracer opentracing.Tracer, label string) opentracing.Span { + parentSpan := ExtractSpanContextFromConsumerMessage(*cm) + var span opentracing.Span + if parentSpan != nil { + span = tracer.StartSpan(label, opentracing.ChildOf(parentSpan)) + } else { + span = tracer.StartSpan(label) + } + return span +} diff --git a/pulsar/internal/tracing/producer-interceptor.go b/pulsar/internal/tracing/producer-interceptor.go new file mode 100644 index 0000000000..8b66276f85 --- /dev/null +++ b/pulsar/internal/tracing/producer-interceptor.go @@ -0,0 +1,39 @@ +package pulsartracing + +import ( + "context" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/opentracing/opentracing-go" +) + +const toPrefix = "To__" + +type ProducerInterceptor struct { +} + +func (t *ProducerInterceptor) BeforeSend(producer pulsar.Producer, message *pulsar.ProducerMessage) { + buildAndInjectSpan(message, producer).Finish() +} + +func (t *ProducerInterceptor) OnSendAcknowledgement(producer pulsar.Producer, message *pulsar.ProducerMessage, msgID pulsar.MessageID) { +} + +func buildAndInjectSpan(message *pulsar.ProducerMessage, producer pulsar.Producer) opentracing.Span { + tracer := opentracing.GlobalTracer() + spanContext := ExtractSpanContextFromProducerMessage(message) + + var span opentracing.Span + + var startSpanOptions []opentracing.StartSpanOption + if spanContext != nil { + startSpanOptions = []opentracing.StartSpanOption{opentracing.FollowsFrom(spanContext)} + } + + span = tracer.StartSpan(toPrefix+producer.Topic(), startSpanOptions...) + span.SetTag("span.kind", "producer") + enrichProducerSpan(producer, span) + + InjectProducerMessageSpanContext(opentracing.ContextWithSpan(context.Background(), span), message) + + return span +} diff --git a/pulsar/internal/tracing/span-enrichment.go b/pulsar/internal/tracing/span-enrichment.go new file mode 100644 index 0000000000..fe75b0ab6b --- /dev/null +++ b/pulsar/internal/tracing/span-enrichment.go @@ -0,0 +1,26 @@ +package pulsartracing + +import ( + "github.com/apache/pulsar-client-go/pulsar" + "github.com/opentracing/opentracing-go" +) + +func enrichConsumerSpan(message pulsar.ConsumerMessage, span opentracing.Span) { + spanCommonTags(span) + + span.SetTag("message_bus.destination", message.Topic()) + span.SetTag("messageId", message.ID()) + span.SetTag("subscription", message.Subscription()) +} + +func enrichProducerSpan(producer pulsar.Producer, span opentracing.Span) { + spanCommonTags(span) + + span.SetTag("message_bus.destination", producer.Topic()) + span.SetTag("sequenceId", producer.LastSequenceID()) +} + +func spanCommonTags(span opentracing.Span) { + span.SetTag("component", "pulsar-client-go") + span.SetTag("peer.service", "pulsar-broker") +} From 8af24811c1f8c8675c3f9254f3816a86f16a20a4 Mon Sep 17 00:00:00 2001 From: Marais Kruger Date: Wed, 19 May 2021 15:27:55 +0300 Subject: [PATCH 2/5] add open tracing to pulsar client --- .../consumer-interceptor.go | 17 ++++++++ .../message-carrier-adaptors.go | 17 ++++++++ .../message-carrier-util.go | 17 ++++++++ .../producer-interceptor.go | 17 ++++++++ pulsar/internal/pulsartracing/readme.md | 40 +++++++++++++++++ .../internal/pulsartracing/span-enrichment.go | 43 +++++++++++++++++++ pulsar/internal/tracing/span-enrichment.go | 26 ----------- 7 files changed, 151 insertions(+), 26 deletions(-) rename pulsar/internal/{tracing => pulsartracing}/consumer-interceptor.go (59%) rename pulsar/internal/{tracing => pulsartracing}/message-carrier-adaptors.go (68%) rename pulsar/internal/{tracing => pulsartracing}/message-carrier-util.go (74%) rename pulsar/internal/{tracing => pulsartracing}/producer-interceptor.go (58%) create mode 100644 pulsar/internal/pulsartracing/readme.md create mode 100644 pulsar/internal/pulsartracing/span-enrichment.go delete mode 100644 pulsar/internal/tracing/span-enrichment.go diff --git a/pulsar/internal/tracing/consumer-interceptor.go b/pulsar/internal/pulsartracing/consumer-interceptor.go similarity index 59% rename from pulsar/internal/tracing/consumer-interceptor.go rename to pulsar/internal/pulsartracing/consumer-interceptor.go index 5adc3e116b..102cfe0466 100644 --- a/pulsar/internal/tracing/consumer-interceptor.go +++ b/pulsar/internal/pulsartracing/consumer-interceptor.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package pulsartracing import ( diff --git a/pulsar/internal/tracing/message-carrier-adaptors.go b/pulsar/internal/pulsartracing/message-carrier-adaptors.go similarity index 68% rename from pulsar/internal/tracing/message-carrier-adaptors.go rename to pulsar/internal/pulsartracing/message-carrier-adaptors.go index b0ca62d451..82a6838e64 100644 --- a/pulsar/internal/tracing/message-carrier-adaptors.go +++ b/pulsar/internal/pulsartracing/message-carrier-adaptors.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package pulsartracing import ( diff --git a/pulsar/internal/tracing/message-carrier-util.go b/pulsar/internal/pulsartracing/message-carrier-util.go similarity index 74% rename from pulsar/internal/tracing/message-carrier-util.go rename to pulsar/internal/pulsartracing/message-carrier-util.go index 371370238a..55902ff3d4 100644 --- a/pulsar/internal/tracing/message-carrier-util.go +++ b/pulsar/internal/pulsartracing/message-carrier-util.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package pulsartracing import ( diff --git a/pulsar/internal/tracing/producer-interceptor.go b/pulsar/internal/pulsartracing/producer-interceptor.go similarity index 58% rename from pulsar/internal/tracing/producer-interceptor.go rename to pulsar/internal/pulsartracing/producer-interceptor.go index 8b66276f85..199ecca206 100644 --- a/pulsar/internal/tracing/producer-interceptor.go +++ b/pulsar/internal/pulsartracing/producer-interceptor.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package pulsartracing import ( diff --git a/pulsar/internal/pulsartracing/readme.md b/pulsar/internal/pulsartracing/readme.md new file mode 100644 index 0000000000..319587e436 --- /dev/null +++ b/pulsar/internal/pulsartracing/readme.md @@ -0,0 +1,40 @@ +### Usage + +#### Interceptors based solution + +```go +// create new tracer +// register tracer with GlobalTracer +opentracing.SetGlobalTracer(tracer) +``` + +**Producer** + +```go +tracingInterceptor := &pulsartracing.ProducerInterceptor{} + +options := pulsar.ProducerOptions{ +Topic: topicName, +Interceptors: pulsar.ProducerInterceptors{tracingInterceptor}, +} +``` + +**Consumer** +```go +tracingInterceptor := &pulsartracing.ConsumerInterceptor{} + +options := pulsar.ConsumerOptions{ +Topics: topicName, +SubscriptionName: subscriptionName, +Type: pulsar.Shared, +Interceptors: pulsar.ConsumerInterceptors{tracingInterceptor}, +} + + +// to create span with message as parent span +span := pulsartracing.CreateSpanFromMessage(message, tracer, "child_span") +``` + +## License + +[Apache 2.0 License](./LICENSE). \ No newline at end of file diff --git a/pulsar/internal/pulsartracing/span-enrichment.go b/pulsar/internal/pulsartracing/span-enrichment.go new file mode 100644 index 0000000000..0a56daf014 --- /dev/null +++ b/pulsar/internal/pulsartracing/span-enrichment.go @@ -0,0 +1,43 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package pulsartracing + +import ( + "github.com/apache/pulsar-client-go/pulsar" + "github.com/opentracing/opentracing-go" +) + +func enrichConsumerSpan(message pulsar.ConsumerMessage, span opentracing.Span) { + spanCommonTags(span) + + span.SetTag("message_bus.destination", message.Topic()) + span.SetTag("messageId", message.ID()) + span.SetTag("subscription", message.Subscription()) +} + +func enrichProducerSpan(producer pulsar.Producer, span opentracing.Span) { + spanCommonTags(span) + + span.SetTag("message_bus.destination", producer.Topic()) + span.SetTag("sequenceId", producer.LastSequenceID()) +} + +func spanCommonTags(span opentracing.Span) { + span.SetTag("component", "pulsar-client-go") + span.SetTag("peer.service", "pulsar-broker") +} diff --git a/pulsar/internal/tracing/span-enrichment.go b/pulsar/internal/tracing/span-enrichment.go deleted file mode 100644 index fe75b0ab6b..0000000000 --- a/pulsar/internal/tracing/span-enrichment.go +++ /dev/null @@ -1,26 +0,0 @@ -package pulsartracing - -import ( - "github.com/apache/pulsar-client-go/pulsar" - "github.com/opentracing/opentracing-go" -) - -func enrichConsumerSpan(message pulsar.ConsumerMessage, span opentracing.Span) { - spanCommonTags(span) - - span.SetTag("message_bus.destination", message.Topic()) - span.SetTag("messageId", message.ID()) - span.SetTag("subscription", message.Subscription()) -} - -func enrichProducerSpan(producer pulsar.Producer, span opentracing.Span) { - spanCommonTags(span) - - span.SetTag("message_bus.destination", producer.Topic()) - span.SetTag("sequenceId", producer.LastSequenceID()) -} - -func spanCommonTags(span opentracing.Span) { - span.SetTag("component", "pulsar-client-go") - span.SetTag("peer.service", "pulsar-broker") -} From be1bdd568234fa45bd75fa9a7295be2a751e7823 Mon Sep 17 00:00:00 2001 From: Marais Kruger Date: Fri, 28 May 2021 18:33:53 +0300 Subject: [PATCH 3/5] add tests and do some improvements --- ...interceptor.go => consumer_interceptor.go} | 2 +- .../consumer_interceptor_test.go | 71 ++++++++++++ ...daptors.go => message_carrier_adaptors.go} | 0 ...arrier-util.go => message_carrier_util.go} | 8 -- .../message_carrier_util_test.go | 103 ++++++++++++++++++ ...interceptor.go => producer_interceptor.go} | 4 +- .../producer_interceptor_test.go | 51 +++++++++ .../internal/pulsartracing/span-enrichment.go | 11 +- 8 files changed, 237 insertions(+), 13 deletions(-) rename pulsar/internal/pulsartracing/{consumer-interceptor.go => consumer_interceptor.go} (98%) create mode 100644 pulsar/internal/pulsartracing/consumer_interceptor_test.go rename pulsar/internal/pulsartracing/{message-carrier-adaptors.go => message_carrier_adaptors.go} (100%) rename pulsar/internal/pulsartracing/{message-carrier-util.go => message_carrier_util.go} (95%) create mode 100644 pulsar/internal/pulsartracing/message_carrier_util_test.go rename pulsar/internal/pulsartracing/{producer-interceptor.go => producer_interceptor.go} (96%) create mode 100644 pulsar/internal/pulsartracing/producer_interceptor_test.go diff --git a/pulsar/internal/pulsartracing/consumer-interceptor.go b/pulsar/internal/pulsartracing/consumer_interceptor.go similarity index 98% rename from pulsar/internal/pulsartracing/consumer-interceptor.go rename to pulsar/internal/pulsartracing/consumer_interceptor.go index 102cfe0466..fb90851d5c 100644 --- a/pulsar/internal/pulsartracing/consumer-interceptor.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor.go @@ -50,7 +50,7 @@ func buildAndInjectChildSpan(message pulsar.ConsumerMessage) opentracing.Span { span = tracer.StartSpan(fromPrefix+message.Topic()+"__"+message.Subscription(), startSpanOptions...) - enrichConsumerSpan(message, span) + enrichConsumerSpan(&message, span) InjectConsumerMessageSpanContext(opentracing.ContextWithSpan(context.Background(), span), message) return span diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go b/pulsar/internal/pulsartracing/consumer_interceptor_test.go new file mode 100644 index 0000000000..5f658610be --- /dev/null +++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go @@ -0,0 +1,71 @@ +package pulsartracing + +import ( + "context" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/mocktracer" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestConsumerBuildAndInjectChildSpan(t *testing.T) { + tracer := mocktracer.New() + + opentracing.SetGlobalTracer(tracer) + + message := pulsar.ConsumerMessage{ + Consumer: &mockConsumer{}, + Message: &mockConsumerMessage{ + properties: map[string]string{}, + }, + } + + span := buildAndInjectChildSpan(message) + assert.NotNil(t, span) + assert.True(t, len(message.Properties()) > 0) +} + +type mockConsumer struct { +} + +func (c *mockConsumer) Subscription() string { + return "" +} + +func (c *mockConsumer) Unsubscribe() error { + return nil +} + +func (c *mockConsumer) Receive(ctx context.Context) (message pulsar.Message, err error) { + return nil, nil +} + +func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage { + return nil +} + +func (c *mockConsumer) Ack(msg pulsar.Message) {} + +func (c *mockConsumer) AckID(msgID pulsar.MessageID) {} + +func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) {} + +func (c *mockConsumer) Nack(msg pulsar.Message) {} + +func (c *mockConsumer) NackID(msgID pulsar.MessageID) {} + +func (c *mockConsumer) Close() {} + +func (c *mockConsumer) Seek(msgID pulsar.MessageID) error { + return nil +} + +func (c *mockConsumer) SeekByTime(time time.Time) error { + return nil +} + +func (c *mockConsumer) Name() string { + return "" +} diff --git a/pulsar/internal/pulsartracing/message-carrier-adaptors.go b/pulsar/internal/pulsartracing/message_carrier_adaptors.go similarity index 100% rename from pulsar/internal/pulsartracing/message-carrier-adaptors.go rename to pulsar/internal/pulsartracing/message_carrier_adaptors.go diff --git a/pulsar/internal/pulsartracing/message-carrier-util.go b/pulsar/internal/pulsartracing/message_carrier_util.go similarity index 95% rename from pulsar/internal/pulsartracing/message-carrier-util.go rename to pulsar/internal/pulsartracing/message_carrier_util.go index 55902ff3d4..a33c51e1fc 100644 --- a/pulsar/internal/pulsartracing/message-carrier-util.go +++ b/pulsar/internal/pulsartracing/message_carrier_util.go @@ -29,10 +29,6 @@ func InjectProducerMessageSpanContext(ctx context.Context, message *pulsar.Produ span := opentracing.SpanFromContext(ctx) - for k, v := range message.Properties { - span.SetTag(k, v) - } - err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, injectAdapter) if err != nil { @@ -73,10 +69,6 @@ func InjectConsumerMessageSpanContext(ctx context.Context, message pulsar.Consum return } - for k, v := range message.Properties() { - span.SetTag(k, v) - } - err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, injectAdapter) if err != nil { diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go b/pulsar/internal/pulsartracing/message_carrier_util_test.go new file mode 100644 index 0000000000..a26ed69b03 --- /dev/null +++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go @@ -0,0 +1,103 @@ +package pulsartracing + +import ( + "context" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/mocktracer" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestProducerMessageInjectAndExtract(t *testing.T) { + message := &pulsar.ProducerMessage{ + Properties: map[string]string{}, + } + + tracer := mocktracer.New() + + opentracing.SetGlobalTracer(tracer) + + span := tracer.StartSpan("test") + + InjectProducerMessageSpanContext(opentracing.ContextWithSpan(context.Background(), span), message) + assert.True(t, len(message.Properties) > 0) + extractedSpanContext := ExtractSpanContextFromProducerMessage(message) + assert.Equal(t, span.Context(), extractedSpanContext) +} + +func TestConsumerMessageInjectAndExtract(t *testing.T) { + message := pulsar.ConsumerMessage{ + Message: &mockConsumerMessage{ + properties: map[string]string{}, + }, + } + + tracer := mocktracer.New() + + opentracing.SetGlobalTracer(tracer) + + span := tracer.StartSpan("test") + + InjectConsumerMessageSpanContext(opentracing.ContextWithSpan(context.Background(), span), message) + assert.True(t, len(message.Properties()) > 0) + extractedSpanContext := ExtractSpanContextFromConsumerMessage(message) + assert.Equal(t, span.Context(), extractedSpanContext) +} + +type mockConsumerMessage struct { + properties map[string]string +} + +func (msg *mockConsumerMessage) Topic() string { + return "" +} + +func (msg *mockConsumerMessage) Properties() map[string]string { + return msg.properties +} + +func (msg *mockConsumerMessage) Payload() []byte { + return nil +} + +func (msg *mockConsumerMessage) ID() pulsar.MessageID { + return nil +} + +func (msg *mockConsumerMessage) PublishTime() time.Time { + return time.Time{} +} + +func (msg *mockConsumerMessage) EventTime() time.Time { + return time.Time{} +} + +func (msg *mockConsumerMessage) Key() string { + return "" +} + +func (msg *mockConsumerMessage) OrderingKey() string { + return "" +} + +func (msg *mockConsumerMessage) RedeliveryCount() uint32 { + return 0 +} + +func (msg *mockConsumerMessage) IsReplicated() bool { + return false +} + +func (msg *mockConsumerMessage) GetReplicatedFrom() string { + return "" +} + +func (msg *mockConsumerMessage) GetSchemaValue(v interface{}) error { + return nil +} + +func (msg *mockConsumerMessage) ProducerName() string { + return "" +} diff --git a/pulsar/internal/pulsartracing/producer-interceptor.go b/pulsar/internal/pulsartracing/producer_interceptor.go similarity index 96% rename from pulsar/internal/pulsartracing/producer-interceptor.go rename to pulsar/internal/pulsartracing/producer_interceptor.go index 199ecca206..cf88772e25 100644 --- a/pulsar/internal/pulsartracing/producer-interceptor.go +++ b/pulsar/internal/pulsartracing/producer_interceptor.go @@ -47,8 +47,8 @@ func buildAndInjectSpan(message *pulsar.ProducerMessage, producer pulsar.Produce } span = tracer.StartSpan(toPrefix+producer.Topic(), startSpanOptions...) - span.SetTag("span.kind", "producer") - enrichProducerSpan(producer, span) + + enrichProducerSpan(message, producer, span) InjectProducerMessageSpanContext(opentracing.ContextWithSpan(context.Background(), span), message) diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go b/pulsar/internal/pulsartracing/producer_interceptor_test.go new file mode 100644 index 0000000000..656cab8b96 --- /dev/null +++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go @@ -0,0 +1,51 @@ +package pulsartracing + +import ( + "context" + "github.com/apache/pulsar-client-go/pulsar" + "github.com/opentracing/opentracing-go" + "github.com/opentracing/opentracing-go/mocktracer" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestProducerBuildAndInjectSpan(t *testing.T) { + tracer := mocktracer.New() + opentracing.SetGlobalTracer(tracer) + + message := &pulsar.ProducerMessage{ + Properties: map[string]string{}, + } + + span := buildAndInjectSpan(message, &mockProducer{}) + assert.NotNil(t, span) + assert.True(t, len(message.Properties) > 0) +} + +type mockProducer struct { +} + +func (p *mockProducer) Topic() string { + return "" +} + +func (p *mockProducer) Name() string { + return "" +} + +func (p *mockProducer) Send(context.Context, *pulsar.ProducerMessage) (pulsar.MessageID, error) { + return nil, nil +} + +func (p *mockProducer) SendAsync(context.Context, *pulsar.ProducerMessage, func(pulsar.MessageID, *pulsar.ProducerMessage, error)) { +} + +func (p *mockProducer) LastSequenceID() int64 { + return 0 +} + +func (p *mockProducer) Flush() error { + return nil +} + +func (p *mockProducer) Close() {} diff --git a/pulsar/internal/pulsartracing/span-enrichment.go b/pulsar/internal/pulsartracing/span-enrichment.go index 0a56daf014..e75c398813 100644 --- a/pulsar/internal/pulsartracing/span-enrichment.go +++ b/pulsar/internal/pulsartracing/span-enrichment.go @@ -22,17 +22,24 @@ import ( "github.com/opentracing/opentracing-go" ) -func enrichConsumerSpan(message pulsar.ConsumerMessage, span opentracing.Span) { +func enrichConsumerSpan(message *pulsar.ConsumerMessage, span opentracing.Span) { spanCommonTags(span) + for k, v := range message.Properties() { + span.SetTag(k, v) + } span.SetTag("message_bus.destination", message.Topic()) span.SetTag("messageId", message.ID()) span.SetTag("subscription", message.Subscription()) } -func enrichProducerSpan(producer pulsar.Producer, span opentracing.Span) { +func enrichProducerSpan(message *pulsar.ProducerMessage, producer pulsar.Producer, span opentracing.Span) { spanCommonTags(span) + for k, v := range message.Properties { + span.SetTag(k, v) + } + span.SetTag("span.kind", "producer") span.SetTag("message_bus.destination", producer.Topic()) span.SetTag("sequenceId", producer.LastSequenceID()) } From e6e5ca92e002613c25b562ab44aa3298018b2166 Mon Sep 17 00:00:00 2001 From: Marais Kruger Date: Fri, 28 May 2021 18:37:40 +0300 Subject: [PATCH 4/5] add license --- .../pulsartracing/consumer_interceptor_test.go | 17 +++++++++++++++++ .../pulsartracing/message_carrier_util_test.go | 17 +++++++++++++++++ .../pulsartracing/producer_interceptor_test.go | 17 +++++++++++++++++ 3 files changed, 51 insertions(+) diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go b/pulsar/internal/pulsartracing/consumer_interceptor_test.go index 5f658610be..d2e653c0c0 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package pulsartracing import ( diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go b/pulsar/internal/pulsartracing/message_carrier_util_test.go index a26ed69b03..d853b23d25 100644 --- a/pulsar/internal/pulsartracing/message_carrier_util_test.go +++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package pulsartracing import ( diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go b/pulsar/internal/pulsartracing/producer_interceptor_test.go index 656cab8b96..39c89b7fb6 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go @@ -1,3 +1,20 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + package pulsartracing import ( From e2012376a0f6ec8f86abdbf2194ea6bba23a5a98 Mon Sep 17 00:00:00 2001 From: Marais Kruger Date: Sat, 5 Jun 2021 11:49:25 +0300 Subject: [PATCH 5/5] run go fmt and goimports --- pulsar/internal/pulsartracing/consumer_interceptor.go | 1 + pulsar/internal/pulsartracing/consumer_interceptor_test.go | 5 +++-- pulsar/internal/pulsartracing/message_carrier_adaptors.go | 1 + pulsar/internal/pulsartracing/message_carrier_util.go | 1 + pulsar/internal/pulsartracing/message_carrier_util_test.go | 5 +++-- pulsar/internal/pulsartracing/producer_interceptor.go | 1 + pulsar/internal/pulsartracing/producer_interceptor_test.go | 3 ++- 7 files changed, 12 insertions(+), 5 deletions(-) diff --git a/pulsar/internal/pulsartracing/consumer_interceptor.go b/pulsar/internal/pulsartracing/consumer_interceptor.go index fb90851d5c..3969d45d8b 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor.go @@ -19,6 +19,7 @@ package pulsartracing import ( "context" + "github.com/apache/pulsar-client-go/pulsar" "github.com/opentracing/opentracing-go" ) diff --git a/pulsar/internal/pulsartracing/consumer_interceptor_test.go b/pulsar/internal/pulsartracing/consumer_interceptor_test.go index d2e653c0c0..b15a926bee 100644 --- a/pulsar/internal/pulsartracing/consumer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/consumer_interceptor_test.go @@ -19,12 +19,13 @@ package pulsartracing import ( "context" + "testing" + "time" + "github.com/apache/pulsar-client-go/pulsar" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" - "testing" - "time" ) func TestConsumerBuildAndInjectChildSpan(t *testing.T) { diff --git a/pulsar/internal/pulsartracing/message_carrier_adaptors.go b/pulsar/internal/pulsartracing/message_carrier_adaptors.go index 82a6838e64..3b6394baa7 100644 --- a/pulsar/internal/pulsartracing/message_carrier_adaptors.go +++ b/pulsar/internal/pulsartracing/message_carrier_adaptors.go @@ -19,6 +19,7 @@ package pulsartracing import ( "errors" + "github.com/apache/pulsar-client-go/pulsar" ) diff --git a/pulsar/internal/pulsartracing/message_carrier_util.go b/pulsar/internal/pulsartracing/message_carrier_util.go index a33c51e1fc..d1fd0ddb96 100644 --- a/pulsar/internal/pulsartracing/message_carrier_util.go +++ b/pulsar/internal/pulsartracing/message_carrier_util.go @@ -19,6 +19,7 @@ package pulsartracing import ( "context" + "github.com/apache/pulsar-client-go/pulsar" "github.com/opentracing/opentracing-go" log "github.com/sirupsen/logrus" diff --git a/pulsar/internal/pulsartracing/message_carrier_util_test.go b/pulsar/internal/pulsartracing/message_carrier_util_test.go index d853b23d25..9fe608dc90 100644 --- a/pulsar/internal/pulsartracing/message_carrier_util_test.go +++ b/pulsar/internal/pulsartracing/message_carrier_util_test.go @@ -19,12 +19,13 @@ package pulsartracing import ( "context" + "testing" + "time" + "github.com/apache/pulsar-client-go/pulsar" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" - "testing" - "time" ) func TestProducerMessageInjectAndExtract(t *testing.T) { diff --git a/pulsar/internal/pulsartracing/producer_interceptor.go b/pulsar/internal/pulsartracing/producer_interceptor.go index cf88772e25..b400e57785 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor.go +++ b/pulsar/internal/pulsartracing/producer_interceptor.go @@ -19,6 +19,7 @@ package pulsartracing import ( "context" + "github.com/apache/pulsar-client-go/pulsar" "github.com/opentracing/opentracing-go" ) diff --git a/pulsar/internal/pulsartracing/producer_interceptor_test.go b/pulsar/internal/pulsartracing/producer_interceptor_test.go index 39c89b7fb6..b146e4e93b 100644 --- a/pulsar/internal/pulsartracing/producer_interceptor_test.go +++ b/pulsar/internal/pulsartracing/producer_interceptor_test.go @@ -19,11 +19,12 @@ package pulsartracing import ( "context" + "testing" + "github.com/apache/pulsar-client-go/pulsar" "github.com/opentracing/opentracing-go" "github.com/opentracing/opentracing-go/mocktracer" "github.com/stretchr/testify/assert" - "testing" ) func TestProducerBuildAndInjectSpan(t *testing.T) {