Skip to content

Commit

Permalink
Add open tracing to pulsar go clinet (#518)
Browse files Browse the repository at this point in the history
Here is the PR to add open tracing to the pulsar go client directly.
This is replicating what the java tracer does:
https://github.com/streamnative/pulsar-tracing
The usage is stipulated in the readme.md file

Co-authored-by: Marais Kruger <maraisk@cynet.com>
  • Loading branch information
maraiskruger1980 and Marais Kruger authored Jul 12, 2021
1 parent 7a60a7e commit 0de4792
Show file tree
Hide file tree
Showing 11 changed files with 660 additions and 2 deletions.
3 changes: 1 addition & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ require (
github.com/klauspost/compress v1.10.8
github.com/kr/pretty v0.2.0 // indirect
github.com/linkedin/goavro/v2 v2.9.8
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.1 // indirect
github.com/opentracing/opentracing-go v1.2.0
github.com/pierrec/lz4 v2.0.5+incompatible
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9k
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
58 changes: 58 additions & 0 deletions pulsar/internal/pulsartracing/consumer_interceptor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsartracing

import (
"context"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/opentracing/opentracing-go"
)

const fromPrefix = "From__"

type ConsumerInterceptor struct {
}

func (t *ConsumerInterceptor) BeforeConsume(message pulsar.ConsumerMessage) {
buildAndInjectChildSpan(message).Finish()
}

func (t *ConsumerInterceptor) OnAcknowledge(consumer pulsar.Consumer, msgID pulsar.MessageID) {}

func (t *ConsumerInterceptor) OnNegativeAcksSend(consumer pulsar.Consumer, msgIDs []pulsar.MessageID) {
}

func buildAndInjectChildSpan(message pulsar.ConsumerMessage) opentracing.Span {
tracer := opentracing.GlobalTracer()
parentContext := ExtractSpanContextFromConsumerMessage(message)

var span opentracing.Span

var startSpanOptions []opentracing.StartSpanOption
if parentContext != nil {
startSpanOptions = []opentracing.StartSpanOption{opentracing.FollowsFrom(parentContext)}
}

span = tracer.StartSpan(fromPrefix+message.Topic()+"__"+message.Subscription(), startSpanOptions...)

enrichConsumerSpan(&message, span)
InjectConsumerMessageSpanContext(opentracing.ContextWithSpan(context.Background(), span), message)

return span
}
89 changes: 89 additions & 0 deletions pulsar/internal/pulsartracing/consumer_interceptor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsartracing

import (
"context"
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/mocktracer"
"github.com/stretchr/testify/assert"
)

func TestConsumerBuildAndInjectChildSpan(t *testing.T) {
tracer := mocktracer.New()

opentracing.SetGlobalTracer(tracer)

message := pulsar.ConsumerMessage{
Consumer: &mockConsumer{},
Message: &mockConsumerMessage{
properties: map[string]string{},
},
}

span := buildAndInjectChildSpan(message)
assert.NotNil(t, span)
assert.True(t, len(message.Properties()) > 0)
}

type mockConsumer struct {
}

func (c *mockConsumer) Subscription() string {
return ""
}

func (c *mockConsumer) Unsubscribe() error {
return nil
}

func (c *mockConsumer) Receive(ctx context.Context) (message pulsar.Message, err error) {
return nil, nil
}

func (c *mockConsumer) Chan() <-chan pulsar.ConsumerMessage {
return nil
}

func (c *mockConsumer) Ack(msg pulsar.Message) {}

func (c *mockConsumer) AckID(msgID pulsar.MessageID) {}

func (c *mockConsumer) ReconsumeLater(msg pulsar.Message, delay time.Duration) {}

func (c *mockConsumer) Nack(msg pulsar.Message) {}

func (c *mockConsumer) NackID(msgID pulsar.MessageID) {}

func (c *mockConsumer) Close() {}

func (c *mockConsumer) Seek(msgID pulsar.MessageID) error {
return nil
}

func (c *mockConsumer) SeekByTime(time time.Time) error {
return nil
}

func (c *mockConsumer) Name() string {
return ""
}
84 changes: 84 additions & 0 deletions pulsar/internal/pulsartracing/message_carrier_adaptors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsartracing

import (
"errors"

"github.com/apache/pulsar-client-go/pulsar"
)

// ProducerMessageExtractAdapter Implements TextMap Interface
type ProducerMessageExtractAdapter struct {
message *pulsar.ProducerMessage
}

func (a *ProducerMessageExtractAdapter) ForeachKey(handler func(key, val string) error) error {
for k, v := range (*a.message).Properties {
if err := handler(k, v); err != nil {
return err
}
}

return nil
}

func (a *ProducerMessageExtractAdapter) Set(key, val string) {}

// ProducerMessageInjectAdapter Implements TextMap Interface
type ProducerMessageInjectAdapter struct {
message *pulsar.ProducerMessage
}

func (a *ProducerMessageInjectAdapter) ForeachKey(handler func(key, val string) error) error {
return errors.New("iterator should never be used with Tracer.inject()")
}

func (a *ProducerMessageInjectAdapter) Set(key, val string) {
a.message.Properties[key] = val
}

// ConsumerMessageExtractAdapter Implements TextMap Interface
type ConsumerMessageExtractAdapter struct {
message pulsar.ConsumerMessage
}

func (a *ConsumerMessageExtractAdapter) ForeachKey(handler func(key, val string) error) error {
for k, v := range a.message.Properties() {
if err := handler(k, v); err != nil {
return err
}
}

return nil
}

func (a *ConsumerMessageExtractAdapter) Set(key, val string) {}

// ConsumerMessageInjectAdapter Implements TextMap Interface
type ConsumerMessageInjectAdapter struct {
message pulsar.ConsumerMessage
}

func (a *ConsumerMessageInjectAdapter) ForeachKey(handler func(key, val string) error) error {
return errors.New("iterator should never be used with tracer.inject()")
}

func (a *ConsumerMessageInjectAdapter) Set(key, val string) {
a.message.Properties()[key] = val
}
89 changes: 89 additions & 0 deletions pulsar/internal/pulsartracing/message_carrier_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package pulsartracing

import (
"context"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/opentracing/opentracing-go"
log "github.com/sirupsen/logrus"
)

func InjectProducerMessageSpanContext(ctx context.Context, message *pulsar.ProducerMessage) {
injectAdapter := &ProducerMessageInjectAdapter{message}

span := opentracing.SpanFromContext(ctx)

err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, injectAdapter)

if err != nil {
log.Error("could not inject span context into pulsar message", err)
}
}

func ExtractSpanContextFromProducerMessage(message *pulsar.ProducerMessage) opentracing.SpanContext {
extractAdapter := &ProducerMessageExtractAdapter{message}

spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, extractAdapter)

if err != nil {
log.Error("could not extract span context from pulsar message", err)
}

return spanContext
}

func ExtractSpanContextFromConsumerMessage(message pulsar.ConsumerMessage) opentracing.SpanContext {
extractAdapter := &ConsumerMessageExtractAdapter{message}

spanContext, err := opentracing.GlobalTracer().Extract(opentracing.TextMap, extractAdapter)

if err != nil {
log.Error("could not extract span context from pulsar message", err)
}

return spanContext
}

func InjectConsumerMessageSpanContext(ctx context.Context, message pulsar.ConsumerMessage) {
injectAdapter := &ConsumerMessageInjectAdapter{message}
span := opentracing.SpanFromContext(ctx)

if span == nil {
log.Warn("no span could be extracted from context, nothing will be injected into the message properties")
return
}

err := opentracing.GlobalTracer().Inject(span.Context(), opentracing.TextMap, injectAdapter)

if err != nil {
log.Error("could not inject span context into pulsar message", err)
}
}

func CreateSpanFromMessage(cm *pulsar.ConsumerMessage, tracer opentracing.Tracer, label string) opentracing.Span {
parentSpan := ExtractSpanContextFromConsumerMessage(*cm)
var span opentracing.Span
if parentSpan != nil {
span = tracer.StartSpan(label, opentracing.ChildOf(parentSpan))
} else {
span = tracer.StartSpan(label)
}
return span
}
Loading

0 comments on commit 0de4792

Please sign in to comment.