Skip to content

Commit

Permalink
Add kafka Produce and Consume Tracing Spans (#178)
Browse files Browse the repository at this point in the history
* fix docker compose for host network

* add kafka produce and consume spans

* lint

* change otel to tracing package name and add constants
  • Loading branch information
ukclivecox authored May 5, 2022
1 parent 7096bea commit 4f9b6a2
Show file tree
Hide file tree
Showing 12 changed files with 168 additions and 80 deletions.
4 changes: 2 additions & 2 deletions scheduler/cmd/agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"path/filepath"
"syscall"

"github.com/seldonio/seldon-core/scheduler/pkg/otel"
"github.com/seldonio/seldon-core/scheduler/pkg/tracing"

"github.com/seldonio/seldon-core/scheduler/pkg/agent/metrics"

Expand Down Expand Up @@ -131,7 +131,7 @@ func main() {
}
}

tracer, err := otel.NewTracer("seldon-agent")
tracer, err := tracing.NewTracer("seldon-agent")
if err != nil {
logger.WithError(err).Error("Failed to configure otel tracer")
} else {
Expand Down
6 changes: 3 additions & 3 deletions scheduler/cmd/modelgateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"os/signal"
"syscall"

"github.com/seldonio/seldon-core/scheduler/pkg/otel"
"github.com/seldonio/seldon-core/scheduler/pkg/tracing"

"github.com/seldonio/seldon-core/scheduler/pkg/kafka/gateway"

Expand Down Expand Up @@ -106,7 +106,7 @@ func main() {
}
}

tracer, err := otel.NewTracer("seldon-modelgateway")
tracer, err := tracing.NewTracer("seldon-modelgateway")
if err != nil {
logger.WithError(err).Error("Failed to configure otel tracer")
} else {
Expand All @@ -127,7 +127,7 @@ func main() {
Host: envoyHost,
HttpPort: envoyPort,
GrpcPort: envoyPort,
}, namespace)
}, namespace, tracer)
defer func() { _ = kafkaManager.Stop() }()

kafkaManager.StartConfigListener(agentConfigHandler)
Expand Down
6 changes: 3 additions & 3 deletions scheduler/cmd/pipelinegateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"os/signal"
"syscall"

"github.com/seldonio/seldon-core/scheduler/pkg/otel"
"github.com/seldonio/seldon-core/scheduler/pkg/tracing"

"github.com/seldonio/seldon-core/scheduler/pkg/agent/config"
"github.com/seldonio/seldon-core/scheduler/pkg/agent/k8s"
Expand Down Expand Up @@ -96,7 +96,7 @@ func main() {
}
}

tracer, err := otel.NewTracer("seldon-pipelinegateway")
tracer, err := tracing.NewTracer("seldon-pipelinegateway")
if err != nil {
logger.WithError(err).Error("Failed to configure otel tracer")
} else {
Expand All @@ -113,7 +113,7 @@ func main() {
logger.Info("Closed config handler")
}()

km := pipeline.NewKafkaManager(logger, namespace)
km := pipeline.NewKafkaManager(logger, namespace, tracer)
km.StartConfigListener(agentConfigHandler)

httpServer := pipeline.NewGatewayHttpServer(httpPort, logger, nil, km)
Expand Down
59 changes: 39 additions & 20 deletions scheduler/pkg/kafka/gateway/consumer.go
Original file line number Diff line number Diff line change
@@ -1,34 +1,45 @@
package gateway

import (
"context"

"github.com/confluentinc/confluent-kafka-go/kafka"
seldontracer "github.com/seldonio/seldon-core/scheduler/pkg/tracing"
"github.com/signalfx/splunk-otel-go/instrumentation/github.com/confluentinc/confluent-kafka-go/kafka/splunkkafka"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

const (
pollTimeoutMillisecs = 100
pollTimeoutMillisecs = 10000
)

type InferKafkaGateway struct {
logger log.FieldLogger
nworkers int
workers []*InferWorker
broker string
modelConfig *KafkaModelConfig
serverConfig *KafkaServerConfig
consumer *kafka.Consumer
producer *kafka.Producer
done chan bool
logger log.FieldLogger
nworkers int
workers []*InferWorker
broker string
modelConfig *KafkaModelConfig
serverConfig *KafkaServerConfig
consumer *kafka.Consumer
producer *kafka.Producer
done chan bool
tracerProvider *seldontracer.TracerProvider
tracer trace.Tracer
}

func NewInferKafkaGateway(logger log.FieldLogger, nworkers int, broker string, modelConfig *KafkaModelConfig, serverConfig *KafkaServerConfig) (*InferKafkaGateway, error) {
func NewInferKafkaGateway(logger log.FieldLogger, nworkers int, broker string, modelConfig *KafkaModelConfig, serverConfig *KafkaServerConfig, traceProvider *seldontracer.TracerProvider) (*InferKafkaGateway, error) {
ic := &InferKafkaGateway{
logger: logger.WithField("source", "InferConsumer"),
nworkers: nworkers,
broker: broker,
modelConfig: modelConfig,
serverConfig: serverConfig,
done: make(chan bool),
logger: logger.WithField("source", "InferConsumer"),
nworkers: nworkers,
broker: broker,
modelConfig: modelConfig,
serverConfig: serverConfig,
done: make(chan bool),
tracerProvider: traceProvider,
tracer: traceProvider.TraceProvider.Tracer("Worker"),
}
return ic, ic.setup()
}
Expand All @@ -40,8 +51,8 @@ func (ig *InferKafkaGateway) setup() error {
// Create producer
var producerConfigMap = kafka.ConfigMap{
"bootstrap.servers": ig.broker,
"go.delivery.reports": false, // Need this othewise will get memory leak
"linger.ms": 0, // To help with low latency - should be configurable in future
"go.delivery.reports": true, // ensure we read delivery reports otherwise memory leak
"linger.ms": 0, // To help with low latency - should be configurable in future
}
logger.Infof("Creating producer with broker %s", ig.broker)
ig.producer, err = kafka.NewProducer(&producerConfigMap)
Expand Down Expand Up @@ -75,7 +86,7 @@ func (ig *InferKafkaGateway) setup() error {
}

for i := 0; i < ig.nworkers; i++ {
worker, err := NewInferWorker(ig, ig.logger)
worker, err := NewInferWorker(ig, ig.logger, ig.tracerProvider)
if err != nil {
return err
}
Expand Down Expand Up @@ -121,6 +132,13 @@ func (ig *InferKafkaGateway) Serve() {
switch e := ev.(type) {
case *kafka.Message:

// Add tracing span
ctx := context.Background()
carrierIn := splunkkafka.NewMessageCarrier(e)
ctx = otel.GetTextMapPropagator().Extract(ctx, carrierIn)
_, span := ig.tracer.Start(ctx, "Consume")
span.SetAttributes(attribute.String(seldontracer.SELDON_REQUEST_ID, string(e.Key)))

headers := collectHeaders(e.Headers)

job := InferWork{
Expand All @@ -129,6 +147,7 @@ func (ig *InferKafkaGateway) Serve() {
}
// enqueue a job
jobChan <- &job
span.End()

case kafka.Error:
ig.logger.Error(e, "Received stream error")
Expand Down
36 changes: 20 additions & 16 deletions scheduler/pkg/kafka/gateway/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package gateway
import (
"sync"

seldontracer "github.com/seldonio/seldon-core/scheduler/pkg/tracing"

"github.com/seldonio/seldon-core/scheduler/pkg/kafka"

"github.com/seldonio/seldon-core/scheduler/apis/mlops/scheduler"
Expand All @@ -14,28 +16,30 @@ import (
const DEFAULT_NWORKERS = 4

type KafkaManager struct {
active bool
logger log.FieldLogger
mu sync.Mutex
gateways map[string]*InferKafkaGateway //internal model name to infer consumer
broker string
serverConfig *KafkaServerConfig
configChan chan config.AgentConfiguration
topicNamer *kafka.TopicNamer
active bool
logger log.FieldLogger
mu sync.Mutex
gateways map[string]*InferKafkaGateway //internal model name to infer consumer
broker string
serverConfig *KafkaServerConfig
configChan chan config.AgentConfiguration
topicNamer *kafka.TopicNamer
tracerProvider *seldontracer.TracerProvider
}

func (km *KafkaManager) Name() string {
panic("implement me")
}

func NewKafkaManager(logger log.FieldLogger, serverConfig *KafkaServerConfig, namespace string) *KafkaManager {
func NewKafkaManager(logger log.FieldLogger, serverConfig *KafkaServerConfig, namespace string, traceProvider *seldontracer.TracerProvider) *KafkaManager {
return &KafkaManager{
active: false,
logger: logger.WithField("source", "KafkaManager"),
gateways: make(map[string]*InferKafkaGateway),
serverConfig: serverConfig,
configChan: make(chan config.AgentConfiguration),
topicNamer: kafka.NewTopicNamer(namespace),
active: false,
logger: logger.WithField("source", "KafkaManager"),
gateways: make(map[string]*InferKafkaGateway),
serverConfig: serverConfig,
configChan: make(chan config.AgentConfiguration),
topicNamer: kafka.NewTopicNamer(namespace),
tracerProvider: traceProvider,
}
}

Expand Down Expand Up @@ -100,7 +104,7 @@ func (km *KafkaManager) AddModel(modelName string, streamSpec *scheduler.StreamS
}
}
km.logger.Infof("Adding consumer to broker %s for model %s, input topic %s output topic %s", km.broker, modelName, modelConfig.InputTopic, modelConfig.OutputTopic)
inferGateway, err := NewInferKafkaGateway(km.logger, DEFAULT_NWORKERS, km.broker, modelConfig, km.serverConfig)
inferGateway, err := NewInferKafkaGateway(km.logger, DEFAULT_NWORKERS, km.broker, modelConfig, km.serverConfig, km.tracerProvider)
km.gateways[modelName] = inferGateway
if err != nil {
return err
Expand Down
16 changes: 11 additions & 5 deletions scheduler/pkg/kafka/gateway/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package gateway
import (
"testing"

seldontracer "github.com/seldonio/seldon-core/scheduler/pkg/tracing"

"github.com/seldonio/seldon-core/scheduler/apis/mlops/scheduler"

. "github.com/onsi/gomega"
Expand Down Expand Up @@ -44,9 +46,11 @@ func TestManagerAddModel(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logger := log.New()
km := NewKafkaManager(logger, &KafkaServerConfig{}, "default")
tp, err := seldontracer.NewTracer("test")
g.Expect(err).To(BeNil())
km := NewKafkaManager(logger, &KafkaServerConfig{}, "default", tp)
km.active = true
err := km.AddModel(test.modelName, test.streamSpec)
err = km.AddModel(test.modelName, test.streamSpec)
g.Expect(err).To(BeNil())
g.Expect(km.gateways[test.modelName].modelConfig.ModelName).To(Equal(test.modelName))
g.Expect(km.gateways[test.modelName].modelConfig.InputTopic).To(Equal(test.expectedInputTopic))
Expand All @@ -68,11 +72,13 @@ func TestManagerRemoveModel(t *testing.T) {
modelName string
expectedGateways int
}
gw1, err := NewInferKafkaGateway(log.New(), 0, "", &KafkaModelConfig{ModelName: "foo", InputTopic: "topic1", OutputTopic: "topic2"}, &KafkaServerConfig{})
tp, err := seldontracer.NewTracer("test")
g.Expect(err).To(BeNil())
gw1, err := NewInferKafkaGateway(log.New(), 0, "", &KafkaModelConfig{ModelName: "foo", InputTopic: "topic1", OutputTopic: "topic2"}, &KafkaServerConfig{}, tp)
g.Expect(err).To(BeNil())
gw2, err := NewInferKafkaGateway(log.New(), 0, "", &KafkaModelConfig{ModelName: "foo2", InputTopic: "topic2", OutputTopic: "topic3"}, &KafkaServerConfig{})
gw2, err := NewInferKafkaGateway(log.New(), 0, "", &KafkaModelConfig{ModelName: "foo2", InputTopic: "topic2", OutputTopic: "topic3"}, &KafkaServerConfig{}, tp)
g.Expect(err).To(BeNil())
gw3, err := NewInferKafkaGateway(log.New(), 0, "", &KafkaModelConfig{ModelName: "foo2", InputTopic: "topic2", OutputTopic: "topic3"}, &KafkaServerConfig{})
gw3, err := NewInferKafkaGateway(log.New(), 0, "", &KafkaModelConfig{ModelName: "foo2", InputTopic: "topic2", OutputTopic: "topic3"}, &KafkaServerConfig{}, tp)
g.Expect(err).To(BeNil())
tests := []test{
{
Expand Down
21 changes: 18 additions & 3 deletions scheduler/pkg/kafka/gateway/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import (
"strconv"
"time"

seldontracer "github.com/seldonio/seldon-core/scheduler/pkg/tracing"
"go.opentelemetry.io/otel/attribute"

"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/credentials/insecure"

grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
Expand Down Expand Up @@ -38,6 +42,7 @@ type InferWorker struct {
httpClient *http.Client
restUrl *url.URL
consumer *InferKafkaGateway
tracer trace.Tracer
}

type InferWork struct {
Expand All @@ -49,7 +54,7 @@ type V2Error struct {
Error string `json:"error"`
}

func NewInferWorker(consumer *InferKafkaGateway, logger log.FieldLogger) (*InferWorker, error) {
func NewInferWorker(consumer *InferKafkaGateway, logger log.FieldLogger, traceProvider *seldontracer.TracerProvider) (*InferWorker, error) {
grpcClient, err := getGrpcClient(consumer.serverConfig.Host, consumer.serverConfig.GrpcPort)
if err != nil {
return nil, err
Expand All @@ -61,6 +66,7 @@ func NewInferWorker(consumer *InferKafkaGateway, logger log.FieldLogger) (*Infer
restUrl: restUrl,
httpClient: &http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)},
consumer: consumer,
tracer: traceProvider.TraceProvider.Tracer("Worker"),
}, nil
}

Expand Down Expand Up @@ -180,20 +186,28 @@ func (iw *InferWorker) produce(ctx context.Context, job *InferWork, topic string
Headers: kafkaHeaders,
}

ctx, span := iw.tracer.Start(ctx, "Produce")
span.SetAttributes(attribute.String(seldontracer.SELDON_REQUEST_ID, string(job.msg.Key)))
carrierOut := splunkkafka.NewMessageCarrier(msg)
otel.GetTextMapPropagator().Inject(ctx, carrierOut)

err := iw.consumer.producer.Produce(msg, nil)
deliveryChan := make(chan kafka.Event)
err := iw.consumer.producer.Produce(msg, deliveryChan)
if err != nil {
iw.logger.WithError(err).Errorf("Failed to produce response for model %s", topic)
return err
}
go func() {
<-deliveryChan
span.End()
}()

return nil
}

func (iw *InferWorker) restRequest(ctx context.Context, job *InferWork, maybeConvert bool) error {
logger := iw.logger.WithField("func", "restRequest")

logger.Debugf("REST request to %s for %s", iw.restUrl.String(), iw.consumer.modelConfig.ModelName)
data := job.msg.Value
if maybeConvert {
data = maybeChainRest(job.msg.Value)
Expand Down Expand Up @@ -226,6 +240,7 @@ func (iw *InferWorker) restRequest(ctx context.Context, job *InferWork, maybeCon

func (iw *InferWorker) grpcRequest(ctx context.Context, job *InferWork, req *v2.ModelInferRequest) error {
logger := iw.logger.WithField("func", "grpcRequest")
logger.Debugf("gRPC request for %s", iw.consumer.modelConfig.ModelName)
//Update req with correct modelName
req.ModelName = iw.consumer.modelConfig.ModelName
req.ModelVersion = fmt.Sprintf("%d", util.GetPinnedModelVersion())
Expand Down
Loading

0 comments on commit 4f9b6a2

Please sign in to comment.