Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds the pulsar reporter #227

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -112,6 +112,11 @@ Producer digesting JSON V2 Spans. The reporter uses the
[Sarama async producer](https://pkg.go.dev/github.com/IBM/sarama#AsyncProducer)
underneath.

#### Pulsar Reporter
You can use the Pulsar Reporter to send spans to the Zipkin server. The reporter
uses the [Pulsar go client async producer](https://pkg.go.dev/github.com/apache/pulsar-client-go)
underneath.

## Usage and Examples
[HTTP Server Example](examples/httpserver_test.go)

49 changes: 42 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -4,6 +4,7 @@ go 1.20

require (
github.com/IBM/sarama v1.43.1
github.com/apache/pulsar-client-go v0.14.0
github.com/onsi/ginkgo/v2 v2.11.0
github.com/onsi/gomega v1.27.10
github.com/rabbitmq/amqp091-go v1.9.0
@@ -12,15 +13,30 @@ require (
)

require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.1 // indirect
github.com/AthenZ/athenz v1.10.39 // indirect
github.com/DataDog/zstd v1.5.0 // indirect
github.com/ardielle/ardielle-go v1.5.2 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.4.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/danieljoos/wincred v1.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dvsekhvalnov/jose2go v1.6.0 // indirect
github.com/eapache/go-resiliency v1.6.0 // indirect
github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect
github.com/eapache/queue v1.1.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c // indirect
github.com/hamba/avro/v2 v2.22.2-0.20240625062549-66aad10411d9 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-uuid v1.0.3 // indirect
@@ -29,16 +45,35 @@ require (
github.com/jcmturner/gofork v1.7.6 // indirect
github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect
github.com/jcmturner/rpc/v2 v2.0.3 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mtibben/percent v0.2.1 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect
github.com/rogpeppe/go-internal v1.9.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.9.3 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/mod v0.18.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/oauth2 v0.17.0 // indirect
golang.org/x/sys v0.21.0 // indirect
golang.org/x/term v0.21.0 // indirect
golang.org/x/text v0.16.0 // indirect
golang.org/x/tools v0.22.0 // indirect
google.golang.org/appengine v1.6.8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240415180920-8c6c420018be // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
592 changes: 576 additions & 16 deletions go.sum

Large diffs are not rendered by default.

152 changes: 152 additions & 0 deletions reporter/pulsar/pulsar.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2022 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

/*
Package pulsar implements a Pulsar reporter to send spans to a Pulsar server/cluster.
*/
package pulsar

import (
"context"
"fmt"
"log"
"os"

"github.com/apache/pulsar-client-go/pulsar"

"github.com/openzipkin/zipkin-go/model"
"github.com/openzipkin/zipkin-go/reporter"
)

// defaultPulsarTopic sets the standard Pulsar topic our Reporter will publish
// on. The default topic for zipkin-collector-pulsar is "zipkin", see:
// https://github.com/openzipkin/zipkin/tree/master/zipkin-collector/pulsar
const defaultPulsarTopic = "zipkin"

// pulsarReporter implements Reporter by publishing spans to a Pulsar broker.
type pulsarReporter struct {
e chan error
client pulsar.Client
producer pulsar.Producer
logger *log.Logger
topic string
serializer reporter.SpanSerializer
}

// ReporterOption sets a parameter for the pulsarReporter
type ReporterOption func(c *pulsarReporter)

// Logger sets the logger used to report errors in the collection
// process.
func Logger(logger *log.Logger) ReporterOption {
return func(c *pulsarReporter) {
c.logger = logger
}
}

// Topic sets the pulsar topic to attach the reporter producer on.
func Topic(t string) ReporterOption {
return func(c *pulsarReporter) {
c.topic = t
}
}

// Serializer sets the serialization function to use for sending span data to
// Zipkin.
func Serializer(serializer reporter.SpanSerializer) ReporterOption {
return func(c *pulsarReporter) {
if serializer != nil {
c.serializer = serializer
}
}
}

// Client sets the Pulsar client to use for the reporter.
func Client(p pulsar.Client) ReporterOption {
return func(c *pulsarReporter) {
c.client = p
}
}

// Producer sets the Pulsar producer to use for the reporter.
func Producer(p pulsar.Producer) ReporterOption {
return func(c *pulsarReporter) {
c.producer = p
}
}

func (p *pulsarReporter) logErrors() {
for err := range p.e {
p.logger.Print("msg", err.Error())
}
}

func NewReporter(address string, options ...ReporterOption) (reporter.Reporter, error) {
p := &pulsarReporter{
logger: log.New(os.Stderr, "", log.LstdFlags),
topic: defaultPulsarTopic,
serializer: reporter.JSONSerializer{},
}

for _, option := range options {
option(p)
}

var err error
if p.client == nil {
p.client, err = pulsar.NewClient(pulsar.ClientOptions{
URL: address,
})
if err != nil {
return nil, err
}
}
if p.producer == nil {
p.producer, err = p.client.CreateProducer(pulsar.ProducerOptions{
Topic: p.topic,
})
if err != nil {
return nil, err
}
}

go p.logErrors()

return p, nil
}

func (p *pulsarReporter) Send(s model.SpanModel) {
// Zipkin expects the message to be wrapped in an array
ss := []*model.SpanModel{&s}
m, err := p.serializer.Serialize(ss)
if err != nil {
p.e <- fmt.Errorf("failed when marshalling the span: %s\n", err.Error())
return
}

message := &pulsar.ProducerMessage{
Payload: m,
}
p.producer.SendAsync(context.Background(), message, func(_ pulsar.MessageID, _ *pulsar.ProducerMessage, err error) {
if err != nil {
p.e <- fmt.Errorf("failed to produce msg: %s\n", err.Error())
}
})
}

func (p *pulsarReporter) Close() error {
p.producer.Close()
p.client.Close()
return nil
}
171 changes: 171 additions & 0 deletions reporter/pulsar/pulsar_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
// Copyright 2022 The OpenZipkin Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package pulsar_test

import (
"encoding/json"
"os"
"testing"
"time"

"github.com/apache/pulsar-client-go/pulsar"

"github.com/openzipkin/zipkin-go/model"
zp3 "github.com/openzipkin/zipkin-go/proto/zipkin_proto3"
zipkinpulsar "github.com/openzipkin/zipkin-go/reporter/pulsar"
)

var spans = []*model.SpanModel{
makeNewSpan("avg", 123, 456, 0, true),
makeNewSpan("sum", 123, 789, 456, true),
makeNewSpan("div", 123, 101112, 456, true),
}

func TestPulsarProduce(t *testing.T) {
address := os.Getenv("PULSAR_ADDR")
if address == "" {
t.Skip("PULSAR_ADDR not set, skipping test...")
}
client, producer, closeFunc := setupPulsar(t, address)
defer closeFunc()

reporter, err := zipkinpulsar.NewReporter(address, zipkinpulsar.Producer(producer))
if err != nil {
t.Fatal(err)
}

consume := setupConsume(t, client)
defer consume.Close()

for _, s := range spans {
reporter.Send(*s)
}

for _, s := range spans {
msg := <-consume.Chan()
ds := deserializeSpan(t, msg.Payload())
testEqual(t, s, ds)
}
}

func TestPulsarProduceProto(t *testing.T) {
address := os.Getenv("PULSAR_ADDR")
if address == "" {
t.Skip("PULSAR_ADDR not set, skipping test...")
}
client, producer, closeFunc := setupPulsar(t, address)
defer closeFunc()

reporter, err := zipkinpulsar.NewReporter(
address,
zipkinpulsar.Client(client),
zipkinpulsar.Producer(producer),
zipkinpulsar.Serializer(zp3.SpanSerializer{}),
)
if err != nil {
t.Fatal(err)
}

consume := setupConsume(t, client)
defer consume.Close()

for _, s := range spans {
reporter.Send(*s)
}

for _, s := range spans {
msg := <-consume.Chan()
ds := deserializeSpan(t, msg.Payload())
testEqual(t, s, ds)
}
}

func setupConsume(t *testing.T, client pulsar.Client) pulsar.Consumer {
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Type: pulsar.Failover,
Topic: "zipkin_test",
SubscriptionName: "zipkin_test_sub",
})
failOnError(t, err, "Failed to subscribe to Pulsar")
return consumer
}

func setupPulsar(t *testing.T, address string) (pulsar.Client, pulsar.Producer, func()) {
var err error
client, err := pulsar.NewClient(pulsar.ClientOptions{
URL: address,
})
failOnError(t, err, "Failed to connect to Pulsar")

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: "zipkin_test",
})
failOnError(t, err, "Failed to create Pulsar producer")

return client, producer, func() {
producer.Close()
client.Close()
}
}

func testEqual(t *testing.T, want *model.SpanModel, have *model.SpanModel) {
if have.TraceID != want.TraceID {
t.Errorf("incorrect trace_id. have %d, want %d", have.TraceID, want.TraceID)
}
if have.ID != want.ID {
t.Errorf("incorrect id. have %d, want %d", have.ID, want.ID)
}
if have.ParentID == nil {
if want.ParentID != nil {
t.Errorf("incorrect parent_id. have %d, want %d", have.ParentID, want.ParentID)
}
} else if *have.ParentID != *want.ParentID {
t.Errorf("incorrect parent_id. have %d, want %d", have.ParentID, want.ParentID)
}
}

func deserializeSpan(t *testing.T, data []byte) *model.SpanModel {
var receivedSpans []model.SpanModel
err := json.Unmarshal(data, &receivedSpans)
if err != nil {
t.Fatal(err)
}
return &receivedSpans[0]
}

func failOnError(t *testing.T, err error, msg string) {
if err != nil {
t.Fatalf("%s: %s", msg, err)
}
}

func makeNewSpan(methodName string, traceID, spanID, parentSpanID uint64, debug bool) *model.SpanModel {
timestamp := time.Now()
parentID := new(model.ID)
if parentSpanID != 0 {
*parentID = model.ID(parentSpanID)
}

return &model.SpanModel{
SpanContext: model.SpanContext{
TraceID: model.TraceID{Low: traceID},
ID: model.ID(spanID),
ParentID: parentID,
Debug: debug,
},
Name: methodName,
Timestamp: timestamp,
}
}