Skip to content

Commit

Permalink
Enable pipelinegateway multi topics consumer (#372)
Browse files Browse the repository at this point in the history
* refactor func to util pkg

* add skeleton of multi topic consumer

* add remove topic

* add extra helper functions

* add consumer manager

* wire up consumer manager in kafka manager

* wire up env vars in pipeline cmd

* move request to the multi topic consumer class

* wire up tracing

* fix lint

* fix env var names

* remove cb for now

* adding stop
  • Loading branch information
sakoush authored Jul 29, 2022
1 parent 5f458b9 commit 4bdcc6d
Show file tree
Hide file tree
Showing 9 changed files with 345 additions and 124 deletions.
22 changes: 21 additions & 1 deletion scheduler/cmd/pipelinegateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"os"
"os/signal"
"strconv"
"syscall"

"github.com/seldonio/seldon-core/scheduler/pkg/kafka/config"
Expand Down Expand Up @@ -69,6 +70,7 @@ func makeSignalHandler(logger *log.Logger, done chan<- bool) {
close(done)
}

// TODO: move to a common util
func updateNamespace() {
nsBytes, err := ioutil.ReadFile(kubernetesNamespacePath)
if err != nil {
Expand All @@ -80,6 +82,21 @@ func updateNamespace() {
}
}

//TODO: move to a common util
func getEnVar(logger *log.Logger, key string, defaultValue int) int {
valStr := os.Getenv(key)
if valStr != "" {
val, err := strconv.ParseInt(valStr, 10, 64)
if err != nil {
logger.WithError(err).Fatalf("Failed to parse %s", key)
}
logger.Infof("Got %s = %d", key, val)
return int(val)
}
logger.Infof("Returning default %s = %d", key, defaultValue)
return defaultValue
}

func runningInsideK8s() bool {
return namespace != ""
}
Expand Down Expand Up @@ -112,7 +129,10 @@ func main() {
logger.WithError(err).Fatal("Failed to load Kafka config")
}

km, err := pipeline.NewKafkaManager(logger, namespace, kafkaConfigMap, tracer)
maxNumTopicsPerConsumer := getEnVar(logger, pipeline.EnvMaxNumTopicPerConsumer, pipeline.DefaultMaxNumTopicsPerConsumer)
maxNumConsumers := getEnVar(logger, pipeline.EnvMaxNumConsumers, pipeline.DefaultMaxNumConsumers)
km, err := pipeline.NewKafkaManager(
logger, namespace, kafkaConfigMap, tracer, maxNumConsumers, maxNumTopicsPerConsumer)
if err != nil {
logger.WithError(err).Fatal("Failed to create kafka manager")
}
Expand Down
9 changes: 2 additions & 7 deletions scheduler/pkg/kafka/gateway/manager.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package gateway

import (
"fmt"
"sync"

"github.com/seldonio/seldon-core/scheduler/pkg/kafka/config"
seldontracer "github.com/seldonio/seldon-core/scheduler/pkg/tracing"
"github.com/seldonio/seldon-core/scheduler/pkg/util"
log "github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -41,13 +41,8 @@ func NewConsumerManager(logger log.FieldLogger, consumerConfig *ConsumerConfig,
}
}

func (cm *ConsumerManager) getKafkaConsumerName(modelName string) string {
idx := modelIdToConsumerBucket(modelName, cm.maxNumConsumers)
return fmt.Sprintf("%s-%d", modelGatewayConsumerNamePrefix, idx)
}

func (cm *ConsumerManager) getInferKafkaConsumer(modelName string, create bool) (*InferKafkaConsumer, error) {
consumerBucketId := cm.getKafkaConsumerName(modelName)
consumerBucketId := util.GetKafkaConsumerName(modelName, modelGatewayConsumerNamePrefix, cm.maxNumConsumers)
ic, ok := cm.consumers[consumerBucketId]

if !ok && !create {
Expand Down
14 changes: 0 additions & 14 deletions scheduler/pkg/kafka/gateway/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/seldonio/seldon-core/scheduler/pkg/envoy/resources"
"github.com/seldonio/seldon-core/scheduler/pkg/util"
"google.golang.org/grpc/metadata"
)

Expand Down Expand Up @@ -44,16 +43,3 @@ func filterKafkaInputHeaders(headers []kafka.Header) []kafka.Header {
}
return filteredHeaders
}

// Map a model name / id to a consumer bucket consistently.
// This requires that number of buckets does not change between calls.
// If it changes there is a potential redundant work that is being done as kafka
// will restart from earliest.
func modelIdToConsumerBucket(modelId string, numBuckets int) uint32 {
hash, err := util.Hash(modelId)
if err != nil {
// is this ok to revert to bucket 0?
return 0
}
return hash % uint32(numBuckets)
}
99 changes: 99 additions & 0 deletions scheduler/pkg/kafka/pipeline/consumer_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package pipeline

import (
"fmt"
"sync"

"github.com/google/uuid"
"github.com/seldonio/seldon-core/scheduler/pkg/kafka/config"
log "github.com/sirupsen/logrus"
"go.opentelemetry.io/otel/trace"
)

const (
EnvMaxNumConsumers = "PIPELINEGATEWAY_MAX_NUM_CONSUMERS"
EnvMaxNumTopicPerConsumer = "PIPELINEGATEWAY_MAX_NUM_TOPICS_PER_CONSUMER"
DefaultMaxNumTopicsPerConsumer = 100
DefaultMaxNumConsumers = 200
)

type ConsumerManager struct {
logger log.FieldLogger
mu sync.Mutex
// all consumers we have
consumers []*MultiTopicsKafkaConsumer
consumerConfig *config.KafkaConfig
maxNumConsumers int
maxNumTopicsPerConsumer int
tracer trace.Tracer
}

func NewConsumerManager(logger log.FieldLogger, consumerConfig *config.KafkaConfig, maxNumTopicsPerConsumer, maxNumConsumers int, tracer trace.Tracer) *ConsumerManager {
logger.Infof("Setting consumer manager with max num consumers: %d, max topics per consumers: %d", maxNumConsumers, maxNumTopicsPerConsumer)
return &ConsumerManager{
logger: logger.WithField("source", "ConsumerManager"),
consumerConfig: consumerConfig,
maxNumTopicsPerConsumer: maxNumTopicsPerConsumer,
maxNumConsumers: maxNumConsumers,
tracer: tracer,
}
}

func (cm *ConsumerManager) createConsumer() error {
if len(cm.consumers) == cm.maxNumTopicsPerConsumer {
return fmt.Errorf("Max number of consumers reached")
}

c, err := NewMultiTopicsKafkaConsumer(cm.logger, cm.consumerConfig, uuid.New().String(), cm.tracer)
if err != nil {
return err
}
cm.consumers = append(cm.consumers, c)
return nil
}

func (cm *ConsumerManager) getKafkaConsumer() (*MultiTopicsKafkaConsumer, error) {
// TODO: callers can get the same consumer and can AddTopics that can get this consumer beyond maxNumTopicsPerConsumer
// this is fine for now
cm.mu.Lock()
defer cm.mu.Unlock()

if len(cm.consumers) == 0 {
if err := cm.createConsumer(); err != nil {
return nil, err
}
}
c := cm.consumers[len(cm.consumers)-1]

if c.GetNumTopics() < cm.maxNumTopicsPerConsumer {
return c, nil
} else {
err := cm.createConsumer()
if err != nil {
return nil, err
} else {
return cm.consumers[len(cm.consumers)-1], nil
}
}
}

func (cm *ConsumerManager) GetNumModels() int {
cm.mu.Lock()
defer cm.mu.Unlock()
tot := 0
for _, c := range cm.consumers {
tot += c.GetNumTopics()
}
return tot
}

func (cm *ConsumerManager) Stop() {
cm.mu.Lock()
defer cm.mu.Unlock()
for _, c := range cm.consumers {
err := c.Close()
if err != nil {
cm.logger.Warnf("Consumer %s failed to close", c.id)
}
}
}
128 changes: 28 additions & 100 deletions scheduler/pkg/kafka/pipeline/kafkamanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/seldonio/seldon-core/scheduler/pkg/envoy/resources"

"github.com/confluentinc/confluent-kafka-go/kafka"
cmap "github.com/orcaman/concurrent-map"
"github.com/rs/xid"
kafka2 "github.com/seldonio/seldon-core/scheduler/pkg/kafka"
seldontracer "github.com/seldonio/seldon-core/scheduler/pkg/tracing"
Expand All @@ -33,24 +32,22 @@ type PipelineInferer interface {
}

type KafkaManager struct {
kafkaConfig *config.KafkaConfig
producer *kafka.Producer
pipelines sync.Map
logger logrus.FieldLogger
mu sync.RWMutex
topicNamer *kafka2.TopicNamer
tracer trace.Tracer
kafkaConfig *config.KafkaConfig
producer *kafka.Producer
pipelines sync.Map
logger logrus.FieldLogger
mu sync.RWMutex
topicNamer *kafka2.TopicNamer
tracer trace.Tracer
consumerManager *ConsumerManager
}

type Pipeline struct {
resourceName string
consumer *kafka.Consumer
// map of kafka id to request
requests cmap.ConcurrentMap
done chan bool
isModel bool
wg *sync.WaitGroup
hasStarted bool
consumer *MultiTopicsKafkaConsumer
isModel bool
wg *sync.WaitGroup
hasStarted bool
}

type Request struct {
Expand All @@ -63,12 +60,15 @@ type Request struct {
isError bool
}

func NewKafkaManager(logger logrus.FieldLogger, namespace string, kafkaConfig *config.KafkaConfig, traceProvider *seldontracer.TracerProvider) (*KafkaManager, error) {
func NewKafkaManager(logger logrus.FieldLogger, namespace string, kafkaConfig *config.KafkaConfig, traceProvider *seldontracer.TracerProvider, maxNumConsumers, maxNumTopicsPerConsumer int) (*KafkaManager, error) {
tracer := traceProvider.GetTraceProvider().Tracer("KafkaManager")
km := &KafkaManager{
kafkaConfig: kafkaConfig,
logger: logger.WithField("source", "KafkaManager"),
topicNamer: kafka2.NewTopicNamer(namespace),
tracer: traceProvider.GetTraceProvider().Tracer("KafkaManager"),
kafkaConfig: kafkaConfig,
logger: logger.WithField("source", "KafkaManager"),
topicNamer: kafka2.NewTopicNamer(namespace),
tracer: tracer,
consumerManager: NewConsumerManager(logger, kafkaConfig, maxNumTopicsPerConsumer, maxNumConsumers, tracer),
mu: sync.RWMutex{},
}
err := km.createProducer()
if err != nil {
Expand All @@ -83,11 +83,7 @@ func (km *KafkaManager) Stop() {
km.mu.Lock()
defer km.mu.Unlock()
km.producer.Close()
km.pipelines.Range(func(key interface{}, value interface{}) bool {
pipeline := value.(*Pipeline)
close(pipeline.done)
return true
})
km.consumerManager.Stop()
logger.Info("Stopped all pipelines")
}

Expand All @@ -105,19 +101,13 @@ func (km *KafkaManager) createProducer() error {
}

func (km *KafkaManager) createPipeline(resource string, isModel bool) (*Pipeline, error) {
consumerConfig := config.CloneKafkaConfigMap(km.kafkaConfig.Consumer)
consumerConfig["group.id"] = resource
km.logger.Infof("Creating consumer with config %v", consumerConfig)
consumer, err := kafka.NewConsumer(&consumerConfig)
consumer, err := km.consumerManager.getKafkaConsumer()
if err != nil {
return nil, err
}
km.logger.Infof("Created consumer %s", consumer.String())
return &Pipeline{
resourceName: resource,
consumer: consumer,
requests: cmap.New(),
done: make(chan bool),
isModel: isModel,
wg: new(sync.WaitGroup),
hasStarted: false,
Expand Down Expand Up @@ -174,8 +164,8 @@ func (km *KafkaManager) Infer(ctx context.Context, resourceName string, isModel
wg: new(sync.WaitGroup),
key: key,
}
pipeline.requests.Set(key, request)
defer pipeline.requests.Remove(key)
pipeline.consumer.requests.Set(key, request)
defer pipeline.consumer.requests.Remove(key)
request.wg.Add(1)

outputTopic := km.topicNamer.GetPipelineTopicInputs(resourceName)
Expand Down Expand Up @@ -234,73 +224,11 @@ func (km *KafkaManager) consume(pipeline *Pipeline) error {
if pipeline.isModel {
topicName = km.topicNamer.GetModelTopicOutputs(pipeline.resourceName)
}

err := pipeline.consumer.SubscribeTopics([]string{topicName},
func(consumer *kafka.Consumer, event kafka.Event) error {
switch event.(type) {
case kafka.AssignedPartitions:
if !pipeline.hasStarted {
pipeline.wg.Done() // Mark consumer as ready so we can send our requests
pipeline.hasStarted = true
}
}
return nil
})
err := pipeline.consumer.AddTopic(topicName, nil)
pipeline.wg.Done()
logger.Infof("Topic %s added in consumer id %s", topicName, pipeline.consumer.id)
if err != nil {
return err
}
if !km.kafkaConfig.HasKafkaBootstrapServer() { // Should only happen in testing
pipeline.wg.Done() // Mark consumer as ready so we can send our requests
}
logger.Infof("Started consumer for topic (pipeline:%v) %s", !pipeline.isModel, topicName)
run := true
for run {
select {
case <-pipeline.done:
run = false
default:
ev := pipeline.consumer.Poll(pollTimeoutMillisecs)
if ev == nil {
continue
}

switch e := ev.(type) {
case *kafka.Message:
logger.Debugf("Received message from %s with key %s", topicName, string(e.Key))
if val, ok := pipeline.requests.Get(string(e.Key)); ok {

// Add tracing span
ctx := context.Background()
carrierIn := splunkkafka.NewMessageCarrier(e)
ctx = otel.GetTextMapPropagator().Extract(ctx, carrierIn)
_, span := km.tracer.Start(ctx, "Consume")
span.SetAttributes(attribute.String(RequestIdHeader, string(e.Key)))

request := val.(*Request)
request.mu.Lock()
if request.active {
logger.Debugf("Process response for key %s", string(e.Key))
request.isError = hasErrorHeader(e.Headers)
request.response = e.Value
request.headers = e.Headers
request.wg.Done()
request.active = false
} else {
logger.Warnf("Got duplicate request with key %s", string(e.Key))
}
request.mu.Unlock()
span.End()
}

case kafka.Error:
km.logger.Error(e, "Received stream error")
if e.Code() == kafka.ErrAllBrokersDown {
run = false
}
default:
km.logger.Debug("Ignored %s", e.String())
}
}
}
return pipeline.consumer.Close()
return nil
}
Loading

0 comments on commit 4bdcc6d

Please sign in to comment.