Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add open tracing to pulsar go clinet #518

Merged
merged 5 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ require (
github.com/klauspost/compress v1.10.8
github.com/kr/pretty v0.2.0 // indirect
github.com/linkedin/goavro/v2 v2.9.8
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,8 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
57 changes: 57 additions & 0 deletions pulsar/internal/pulsartracing/consumer_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsartracing

import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/opentracing/opentracing-go"
)

const fromPrefix = "From__"

type ConsumerInterceptor struct {
}

func (t *ConsumerInterceptor) BeforeConsume(message pulsar.ConsumerMessage) {
buildAndInjectChildSpan(message).Finish()
}

func (t *ConsumerInterceptor) OnAcknowledge(consumer pulsar.Consumer, msgID pulsar.MessageID) {}

func (t *ConsumerInterceptor) OnNegativeAcksSend(consumer pulsar.Consumer, msgIDs []pulsar.MessageID) {
}

func buildAndInjectChildSpan(message pulsar.ConsumerMessage) opentracing.Span {
tracer := opentracing.GlobalTracer()
parentContext := ExtractSpanContextFromConsumerMessage(message)

var span opentracing.Span

var startSpanOptions []opentracing.StartSpanOption
if parentContext != nil {
startSpanOptions = []opentracing.StartSpanOption{opentracing.FollowsFrom(parentContext)}
}

span = tracer.StartSpan(fromPrefix+message.Topic()+"__"+message.Subscription(), startSpanOptions...)

enrichConsumerSpan(&message, span)
InjectConsumerMessageSpanContext(opentracing.ContextWithSpan(context.Background(), span), message)

return span
}
88 changes: 88 additions & 0 deletions pulsar/internal/pulsartracing/consumer_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsartracing

import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/stretchr/testify/assert"
"testing"
"time"
)

func TestConsumerBuildAndInjectChildSpan(t *testing.T) {
tracer := mocktracer.New()

opentracing.SetGlobalTracer(tracer)

message := pulsar.ConsumerMessage{
Consumer: &mockConsumer{},
Message: &mockConsumerMessage{
properties: map[string]string{},
},
}

span := buildAndInjectChildSpan(message)
assert.NotNil(t, span)
assert.True(t, len(message.Properties()) > 0)
}

type mockConsumer struct {
}

func (c *mockConsumer) Subscription() string {
return ""
}

func (c *mockConsumer) Unsubscribe() error {
return nil
}

func (c *mockConsumer) Receive(ctx context.Context) (message pulsar.Message, err error) {
return nil, nil
}

func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage {
return nil
}

func (c *mockConsumer) Ack(msg pulsar.Message) {}

func (c *mockConsumer) AckID(msgID pulsar.MessageID) {}

func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) {}

func (c *mockConsumer) Nack(msg pulsar.Message) {}

func (c *mockConsumer) NackID(msgID pulsar.MessageID) {}

func (c *mockConsumer) Close() {}

func (c *mockConsumer) Seek(msgID pulsar.MessageID) error {
return nil
}

func (c *mockConsumer) SeekByTime(time time.Time) error {
return nil
}

func (c *mockConsumer) Name() string {
return ""
}
83 changes: 83 additions & 0 deletions pulsar/internal/pulsartracing/message_carrier_adaptors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsartracing

import (
"errors"
"github.com/apache/pulsar-client-go/pulsar"
)

// ProducerMessageExtractAdapter Implements TextMap Interface
type ProducerMessageExtractAdapter struct {
message *pulsar.ProducerMessage
}

func (a *ProducerMessageExtractAdapter) ForeachKey(handler func(key, val string) error) error {
for k, v := range (*a.message).Properties {
if err := handler(k, v); err != nil {
return err
}
}

return nil
}

func (a *ProducerMessageExtractAdapter) Set(key, val string) {}

// ProducerMessageInjectAdapter Implements TextMap Interface
type ProducerMessageInjectAdapter struct {
message *pulsar.ProducerMessage
}

func (a *ProducerMessageInjectAdapter) ForeachKey(handler func(key, val string) error) error {
return errors.New("iterator should never be used with Tracer.inject()")
}

func (a *ProducerMessageInjectAdapter) Set(key, val string) {
a.message.Properties[key] = val
}

// ConsumerMessageExtractAdapter Implements TextMap Interface
type ConsumerMessageExtractAdapter struct {
message pulsar.ConsumerMessage
}

func (a *ConsumerMessageExtractAdapter) ForeachKey(handler func(key, val string) error) error {
for k, v := range a.message.Properties() {
if err := handler(k, v); err != nil {
return err
}
}

return nil
}

func (a *ConsumerMessageExtractAdapter) Set(key, val string) {}

// ConsumerMessageInjectAdapter Implements TextMap Interface
type ConsumerMessageInjectAdapter struct {
message pulsar.ConsumerMessage
}

func (a *ConsumerMessageInjectAdapter) ForeachKey(handler func(key, val string) error) error {
return errors.New("iterator should never be used with tracer.inject()")
}

func (a *ConsumerMessageInjectAdapter) Set(key, val string) {
a.message.Properties()[key] = val
}
88 changes: 88 additions & 0 deletions pulsar/internal/pulsartracing/message_carrier_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsartracing

import (
"context"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus"
)

func InjectProducerMessageSpanContext(ctx context.Context, message *pulsar.ProducerMessage) {
injectAdapter := &ProducerMessageInjectAdapter{message}

span := opentracing.SpanFromContext(ctx)

err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, injectAdapter)

if err != nil {
log.Error("could not inject span context into pulsar message", err)
}
}

func ExtractSpanContextFromProducerMessage(message *pulsar.ProducerMessage) opentracing.SpanContext {
extractAdapter := &ProducerMessageExtractAdapter{message}

spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, extractAdapter)

if err != nil {
log.Error("could not extract span context from pulsar message", err)
}

return spanContext
}

func ExtractSpanContextFromConsumerMessage(message pulsar.ConsumerMessage) opentracing.SpanContext {
extractAdapter := &ConsumerMessageExtractAdapter{message}

spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, extractAdapter)

if err != nil {
log.Error("could not extract span context from pulsar message", err)
}

return spanContext
}

func InjectConsumerMessageSpanContext(ctx context.Context, message pulsar.ConsumerMessage) {
injectAdapter := &ConsumerMessageInjectAdapter{message}
span := opentracing.SpanFromContext(ctx)

if span == nil {
log.Warn("no span could be extracted from context, nothing will be injected into the message properties")
return
}

err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, injectAdapter)

if err != nil {
log.Error("could not inject span context into pulsar message", err)
}
}

func CreateSpanFromMessage(cm *pulsar.ConsumerMessage, tracer opentracing.Tracer, label string) opentracing.Span {
parentSpan := ExtractSpanContextFromConsumerMessage(*cm)
var span opentracing.Span
if parentSpan != nil {
span = tracer.StartSpan(label, opentracing.ChildOf(parentSpan))
} else {
span = tracer.StartSpan(label)
}
return span
}
Loading