diff --git a/contrib/kafka/cmd/dispatcher/main.go b/contrib/kafka/cmd/dispatcher/main.go index 861d5665c68..9ef18689623 100644 --- a/contrib/kafka/cmd/dispatcher/main.go +++ b/contrib/kafka/cmd/dispatcher/main.go @@ -60,7 +60,7 @@ func main() { logger.Fatal("unable to create manager.", zap.Error(err)) } - kafkaDispatcher, err := dispatcher.NewDispatcher(provisionerConfig.Brokers, logger) + kafkaDispatcher, err := dispatcher.NewDispatcher(provisionerConfig.Brokers, provisionerConfig.ConsumerMode, logger) if err != nil { logger.Fatal("unable to create kafka dispatcher.", zap.Error(err)) } diff --git a/contrib/kafka/main.go b/contrib/kafka/main.go index a70ab510f99..4f748ab266f 100644 --- a/contrib/kafka/main.go +++ b/contrib/kafka/main.go @@ -2,14 +2,11 @@ package main import ( "flag" - "fmt" - "os" - "strings" - istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3" "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "os" "sigs.k8s.io/controller-runtime/pkg/client/config" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/manager" @@ -20,7 +17,6 @@ import ( "github.com/knative/eventing/contrib/kafka/pkg/controller/channel" eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1" "github.com/knative/eventing/pkg/provisioners" - "github.com/knative/pkg/configmap" ) const ( @@ -64,7 +60,7 @@ func main() { } // TODO the underlying config map needs to be watched and the config should be reloaded if there is a change. - provisionerConfig, err := getProvisionerConfig() + provisionerConfig, err := provisionerController.GetProvisionerConfig("/etc/config-provisioner") if err != nil { logger.Error(err, "unable to run controller manager") @@ -80,27 +76,3 @@ func main() { mgr.Start(signals.SetupSignalHandler()) } - -// getProvisionerConfig returns the details of the associated Provisioner/ClusterChannelProvisioner object -func getProvisionerConfig() (*provisionerController.KafkaProvisionerConfig, error) { - configMap, err := configmap.Load("/etc/config-provisioner") - if err != nil { - return nil, fmt.Errorf("error loading provisioner configuration: %s", err) - } - - if len(configMap) == 0 { - return nil, fmt.Errorf("missing provisioner configuration") - } - - config := &provisionerController.KafkaProvisionerConfig{} - - if value, ok := configMap[BrokerConfigMapKey]; ok { - bootstrapServers := strings.Split(value, ",") - if len(bootstrapServers) != 0 { - config.Brokers = bootstrapServers - return config, nil - } - } - - return nil, fmt.Errorf("missing key %s in provisioner configuration", BrokerConfigMapKey) -} diff --git a/contrib/kafka/pkg/controller/types.go b/contrib/kafka/pkg/controller/types.go index 46a85cea400..a3160c48c32 100644 --- a/contrib/kafka/pkg/controller/types.go +++ b/contrib/kafka/pkg/controller/types.go @@ -1,5 +1,8 @@ package controller +import "github.com/bsm/sarama-cluster" + type KafkaProvisionerConfig struct { - Brokers []string + Brokers []string + ConsumerMode cluster.ConsumerMode } diff --git a/contrib/kafka/pkg/controller/util.go b/contrib/kafka/pkg/controller/util.go index 8f0d744e662..d856e406931 100644 --- a/contrib/kafka/pkg/controller/util.go +++ b/contrib/kafka/pkg/controller/util.go @@ -2,14 +2,17 @@ package controller import ( "fmt" + "github.com/bsm/sarama-cluster" "strings" "github.com/knative/pkg/configmap" ) const ( - BrokerConfigMapKey = "bootstrap_servers" - KafkaChannelSeparator = "." + BrokerConfigMapKey = "bootstrap_servers" + ConsumerModeConfigMapKey = "consumer_mode" + ConsumerModePartitionConsumerValue = "partitions" + KafkaChannelSeparator = "." ) // GetProvisionerConfig returns the details of the associated ClusterChannelProvisioner object @@ -33,8 +36,15 @@ func GetProvisionerConfig(path string) (*KafkaProvisionerConfig, error) { } } config.Brokers = bootstrapServers - return config, nil + } else { + return nil, fmt.Errorf("missing key %s in provisioner configuration", BrokerConfigMapKey) } - return nil, fmt.Errorf("missing key %s in provisioner configuration", BrokerConfigMapKey) + config.ConsumerMode = cluster.ConsumerModeMultiplex + if mode, ok := configMap[ConsumerModeConfigMapKey]; ok { + if strings.ToLower(mode) == ConsumerModePartitionConsumerValue { + config.ConsumerMode = cluster.ConsumerModePartitions + } + } + return config, nil } diff --git a/contrib/kafka/pkg/controller/util_test.go b/contrib/kafka/pkg/controller/util_test.go index 66867498d3a..267def2e9d6 100644 --- a/contrib/kafka/pkg/controller/util_test.go +++ b/contrib/kafka/pkg/controller/util_test.go @@ -1,6 +1,7 @@ package controller import ( + "github.com/bsm/sarama-cluster" "io/ioutil" "os" "path/filepath" @@ -53,6 +54,22 @@ func TestGetProvisionerConfigBrokers(t *testing.T) { Brokers: []string{"kafkabroker1.kafka:9092", "kafkabroker2.kafka:9092"}, }, }, + { + name: "partition consumer", + data: map[string]string{"bootstrap_servers": "kafkabroker.kafka:9092", "consumer_mode": "partitions"}, + expected: &KafkaProvisionerConfig{ + Brokers: []string{"kafkabroker.kafka:9092"}, + ConsumerMode: cluster.ConsumerModePartitions, + }, + }, + { + name: "default multiplex", + data: map[string]string{"bootstrap_servers": "kafkabroker.kafka:9092", "consumer_mode": "multiplex"}, + expected: &KafkaProvisionerConfig{ + Brokers: []string{"kafkabroker.kafka:9092"}, + ConsumerMode: cluster.ConsumerModeMultiplex, + }, + }, } for _, tc := range testCases { diff --git a/contrib/kafka/pkg/dispatcher/dispatcher.go b/contrib/kafka/pkg/dispatcher/dispatcher.go index de794d67992..52f9feffb4b 100644 --- a/contrib/kafka/pkg/dispatcher/dispatcher.go +++ b/contrib/kafka/pkg/dispatcher/dispatcher.go @@ -49,24 +49,34 @@ type KafkaDispatcher struct { type KafkaConsumer interface { Messages() <-chan *sarama.ConsumerMessage + Partitions() <-chan cluster.PartitionConsumer MarkOffset(msg *sarama.ConsumerMessage, metadata string) Close() (err error) } type KafkaCluster interface { NewConsumer(groupID string, topics []string) (KafkaConsumer, error) + + GetConsumerMode() cluster.ConsumerMode } type saramaCluster struct { kafkaBrokers []string + + consumerMode cluster.ConsumerMode } func (c *saramaCluster) NewConsumer(groupID string, topics []string) (KafkaConsumer, error) { consumerConfig := cluster.NewConfig() consumerConfig.Version = sarama.V1_1_0_0 + consumerConfig.Group.Mode = c.consumerMode return cluster.NewConsumer(c.kafkaBrokers, groupID, topics, consumerConfig) } +func (c *saramaCluster) GetConsumerMode() cluster.ConsumerMode { + return c.consumerMode +} + type subscription struct { Namespace string Name string @@ -166,6 +176,7 @@ func (d *KafkaDispatcher) subscribe(channelRef provisioners.ChannelReference, su group := fmt.Sprintf("%s.%s.%s", controller.Name, sub.Namespace, sub.Name) consumer, err := d.kafkaCluster.NewConsumer(group, []string{topicName}) + if err != nil { // we can not create a consumer - logging that, with reason d.logger.Info("Could not create proper consumer", zap.Error(err)) @@ -179,26 +190,55 @@ func (d *KafkaDispatcher) subscribe(channelRef provisioners.ChannelReference, su } channelMap[sub] = consumer - go func() { - for { - msg, more := <-consumer.Messages() - if more { - d.logger.Info("Dispatching a message for subscription", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) - message := fromKafkaMessage(msg) - err := d.dispatchMessage(message, sub) - if err != nil { - d.logger.Warn("Got error trying to dispatch message", zap.Error(err)) - } - // TODO: handle errors with pluggable strategy - consumer.MarkOffset(msg, "") // Mark message as processed - } else { - break + if cluster.ConsumerModePartitions == d.kafkaCluster.GetConsumerMode() { + go d.partitionConsumerLoop(consumer, channelRef, sub) + } else { + go d.multiplexConsumerLoop(consumer, channelRef, sub) + } + return nil +} + +func (d *KafkaDispatcher) partitionConsumerLoop(consumer KafkaConsumer, channelRef provisioners.ChannelReference, sub subscription) { + d.logger.Info("Partition Consumer for subscription started", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) + for { + pc, more := <-consumer.Partitions() + if !more { + break + } + go func(pc cluster.PartitionConsumer) { + for msg := range pc.Messages() { + d.dispatch(channelRef, sub, consumer, msg) } + }(pc) + } + d.logger.Info("Partition Consumer for subscription stopped", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) +} + +func (d *KafkaDispatcher) multiplexConsumerLoop(consumer KafkaConsumer, channelRef provisioners.ChannelReference, sub subscription) { + d.logger.Info("Consumer for subscription started", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) + for { + msg, more := <-consumer.Messages() + if more { + d.dispatch(channelRef, sub, consumer, msg) + } else { + break } - d.logger.Info("Consumer for subscription stopped", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) - }() + } + d.logger.Info("Consumer for subscription stopped", zap.Any("channelRef", channelRef), zap.Any("subscription", sub)) +} - return nil +func (d *KafkaDispatcher) dispatch(channelRef provisioners.ChannelReference, sub subscription, consumer KafkaConsumer, + msg *sarama.ConsumerMessage) error { + d.logger.Info("Dispatching a message for subscription", zap.Any("channelRef", channelRef), + zap.Any("subscription", sub), zap.Any("partition", msg.Partition), zap.Any("offset", msg.Offset)) + message := fromKafkaMessage(msg) + err := d.dispatchMessage(message, sub) + if err != nil { + d.logger.Warn("Got error trying to dispatch message", zap.Error(err)) + } + // TODO: handle errors with pluggable strategy + consumer.MarkOffset(msg, "") // Mark message as processed + return err } func (d *KafkaDispatcher) unsubscribe(channel provisioners.ChannelReference, sub subscription) error { @@ -224,7 +264,7 @@ func (d *KafkaDispatcher) setConfig(config *multichannelfanout.Config) { d.config.Store(config) } -func NewDispatcher(brokers []string, logger *zap.Logger) (*KafkaDispatcher, error) { +func NewDispatcher(brokers []string, consumerMode cluster.ConsumerMode, logger *zap.Logger) (*KafkaDispatcher, error) { conf := sarama.NewConfig() conf.Version = sarama.V1_1_0_0 @@ -242,7 +282,7 @@ func NewDispatcher(brokers []string, logger *zap.Logger) (*KafkaDispatcher, erro dispatcher := &KafkaDispatcher{ dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()), - kafkaCluster: &saramaCluster{kafkaBrokers: brokers}, + kafkaCluster: &saramaCluster{kafkaBrokers: brokers, consumerMode: consumerMode}, kafkaConsumers: make(map[provisioners.ChannelReference]map[subscription]KafkaConsumer), kafkaAsyncProducer: producer, diff --git a/contrib/kafka/pkg/dispatcher/dispatcher_test.go b/contrib/kafka/pkg/dispatcher/dispatcher_test.go index 4d41c9be486..13b7805b62c 100644 --- a/contrib/kafka/pkg/dispatcher/dispatcher_test.go +++ b/contrib/kafka/pkg/dispatcher/dispatcher_test.go @@ -2,9 +2,13 @@ package dispatcher import ( "errors" + "fmt" + "github.com/bsm/sarama-cluster" "io/ioutil" "net/http" "net/http/httptest" + "sync" + "sync/atomic" "testing" "github.com/Shopify/sarama" @@ -22,13 +26,18 @@ import ( ) type mockConsumer struct { - message chan *sarama.ConsumerMessage + message chan *sarama.ConsumerMessage + partitions chan cluster.PartitionConsumer } func (c *mockConsumer) Messages() <-chan *sarama.ConsumerMessage { return c.message } +func (c *mockConsumer) Partitions() <-chan cluster.PartitionConsumer { + return c.partitions +} + func (c *mockConsumer) Close() error { return nil } @@ -37,29 +46,140 @@ func (c *mockConsumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) return } +type mockPartitionConsumer struct { + highWaterMarkOffset int64 + l sync.Mutex + topic string + partition int32 + offset int64 + messages chan *sarama.ConsumerMessage + errors chan *sarama.ConsumerError + singleClose sync.Once + consumed bool +} + +// AsyncClose implements the AsyncClose method from the sarama.PartitionConsumer interface. +func (pc *mockPartitionConsumer) AsyncClose() { + pc.singleClose.Do(func() { + close(pc.messages) + close(pc.errors) + }) +} + +// Close implements the Close method from the sarama.PartitionConsumer interface. It will +// verify whether the partition consumer was actually started. +func (pc *mockPartitionConsumer) Close() error { + if !pc.consumed { + return fmt.Errorf("Expectations set on %s/%d, but no partition consumer was started.", pc.topic, pc.partition) + } + pc.AsyncClose() + var ( + closeErr error + wg sync.WaitGroup + ) + wg.Add(1) + go func() { + defer wg.Done() + var errs = make(sarama.ConsumerErrors, 0) + for err := range pc.errors { + errs = append(errs, err) + } + if len(errs) > 0 { + closeErr = errs + } + }() + wg.Add(1) + go func() { + defer wg.Done() + for range pc.messages { + // drain + } + }() + wg.Wait() + return closeErr +} + +// Errors implements the Errors method from the sarama.PartitionConsumer interface. +func (pc *mockPartitionConsumer) Errors() <-chan *sarama.ConsumerError { + return pc.errors +} + +// Messages implements the Messages method from the sarama.PartitionConsumer interface. +func (pc *mockPartitionConsumer) Messages() <-chan *sarama.ConsumerMessage { + return pc.messages +} + +func (pc *mockPartitionConsumer) HighWaterMarkOffset() int64 { + return atomic.LoadInt64(&pc.highWaterMarkOffset) + 1 +} + +func (pc *mockPartitionConsumer) Topic() string { + return pc.topic +} + +// Partition returns the consumed partition +func (pc *mockPartitionConsumer) Partition() int32 { + return pc.partition +} + +// InitialOffset returns the offset used for creating the PartitionConsumer instance. +// The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest +func (pc *mockPartitionConsumer) InitialOffset() int64 { + return 0 +} + +// MarkOffset marks the offset of a message as preocessed. +func (pc *mockPartitionConsumer) MarkOffset(offset int64, metadata string) { +} + +// ResetOffset resets the offset to a previously processed message. +func (pc *mockPartitionConsumer) ResetOffset(offset int64, metadata string) { + pc.offset = 0 +} + type mockSaramaCluster struct { // closed closes the message channel so that it doesn't block during the test closed bool // Handle to the latest created consumer, useful to access underlying message chan consumerChannel chan *sarama.ConsumerMessage + // Handle to the latest created partition consumer, useful to access underlying message chan + partitionConsumerChannel chan cluster.PartitionConsumer // createErr will return an error when creating a consumer createErr bool + // consumer mode + consumerMode cluster.ConsumerMode } func (c *mockSaramaCluster) NewConsumer(groupID string, topics []string) (KafkaConsumer, error) { if c.createErr { return nil, errors.New("error creating consumer") } - consumer := &mockConsumer{ - message: make(chan *sarama.ConsumerMessage), - } - if c.closed { - close(consumer.message) + + var consumer *mockConsumer + if c.consumerMode != cluster.ConsumerModePartitions { + consumer = &mockConsumer{ + message: make(chan *sarama.ConsumerMessage), + } + if c.closed { + close(consumer.message) + } + c.consumerChannel = consumer.message + } else { + consumer = &mockConsumer{ + partitions: make(chan cluster.PartitionConsumer), + } + if c.closed { + close(consumer.partitions) + } + c.partitionConsumerChannel = consumer.partitions } - c.consumerChannel = consumer.message return consumer, nil } +func (c *mockSaramaCluster) GetConsumerMode() cluster.ConsumerMode { + return c.consumerMode +} + func TestDispatcher_UpdateConfig(t *testing.T) { testCases := []struct { name string @@ -407,6 +527,55 @@ func TestSubscribe(t *testing.T) { } +func TestPartitionConsumer(t *testing.T) { + sc := &mockSaramaCluster{consumerMode: cluster.ConsumerModePartitions} + data := []byte("data") + d := &KafkaDispatcher{ + kafkaCluster: sc, + kafkaConsumers: make(map[provisioners.ChannelReference]map[subscription]KafkaConsumer), + dispatcher: provisioners.NewMessageDispatcher(zap.NewNop().Sugar()), + logger: zap.NewNop(), + } + testHandler := &dispatchTestHandler{ + t: t, + payload: data, + done: make(chan bool)} + server := httptest.NewServer(testHandler) + defer server.Close() + channelRef := provisioners.ChannelReference{ + Name: "test-channel", + Namespace: "test-ns", + } + subRef := subscription{ + Name: "test-sub", + Namespace: "test-ns", + SubscriberURI: server.URL[7:], + } + err := d.subscribe(channelRef, subRef) + if err != nil { + t.Errorf("unexpected error %s", err) + } + defer close(sc.partitionConsumerChannel) + pc := &mockPartitionConsumer{ + topic: channelRef.Name, + partition: 1, + messages: make(chan *sarama.ConsumerMessage, 1), + } + pc.messages <- &sarama.ConsumerMessage{ + Topic: channelRef.Name, + Partition: 1, + Headers: []*sarama.RecordHeader{ + { + Key: []byte("k1"), + Value: []byte("v1"), + }, + }, + Value: data, + } + sc.partitionConsumerChannel <- pc + <-testHandler.done +} + func TestSubscribeError(t *testing.T) { sc := &mockSaramaCluster{ createErr: true,