Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support partition consumer mode for kafka channel #879

Merged
merged 4 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/kafka/cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func main() {
logger.Fatal("unable to create manager.", zap.Error(err))
}

kafkaDispatcher, err := dispatcher.NewDispatcher(provisionerConfig.Brokers, logger)
kafkaDispatcher, err := dispatcher.NewDispatcher(provisionerConfig.Brokers, provisionerConfig.ConsumerMode, logger)
if err != nil {
logger.Fatal("unable to create kafka dispatcher.", zap.Error(err))
}
Expand Down
5 changes: 4 additions & 1 deletion contrib/kafka/pkg/controller/types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package controller

import "github.com/bsm/sarama-cluster"

type KafkaProvisionerConfig struct {
Brokers []string
Brokers []string
ConsumerMode cluster.ConsumerMode
}
20 changes: 16 additions & 4 deletions contrib/kafka/pkg/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@ package controller

import (
"fmt"
"github.com/bsm/sarama-cluster"
"strings"

"github.com/knative/pkg/configmap"
)

const (
BrokerConfigMapKey = "bootstrap_servers"
KafkaChannelSeparator = "."
BrokerConfigMapKey = "bootstrap_servers"
ConsumerModeConfigMapKey = "consumer_mode"
KafkaChannelSeparator = "."
)

// GetProvisionerConfig returns the details of the associated ClusterChannelProvisioner object
Expand All @@ -33,8 +35,18 @@ func GetProvisionerConfig(path string) (*KafkaProvisionerConfig, error) {
}
}
config.Brokers = bootstrapServers
return config, nil
} else {
return nil, fmt.Errorf("missing key %s in provisioner configuration", BrokerConfigMapKey)
}

return nil, fmt.Errorf("missing key %s in provisioner configuration", BrokerConfigMapKey)
if mode, ok := configMap[ConsumerModeConfigMapKey]; ok {
yuzisun marked this conversation as resolved.
Show resolved Hide resolved
if strings.ToLower(mode) == "partitions" {
config.ConsumerMode = cluster.ConsumerModePartitions
} else {
config.ConsumerMode = cluster.ConsumerModeMultiplex
}
} else {
config.ConsumerMode = cluster.ConsumerModeMultiplex
}
return config, nil
}
74 changes: 55 additions & 19 deletions contrib/kafka/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,34 @@ type KafkaDispatcher struct {

type KafkaConsumer interface {
Messages() <-chan *sarama.ConsumerMessage
Partitions() <-chan cluster.PartitionConsumer
MarkOffset(msg *sarama.ConsumerMessage, metadata string)
Close() (err error)
}

type KafkaCluster interface {
NewConsumer(groupID string, topics []string) (KafkaConsumer, error)

GetConsumerMode() cluster.ConsumerMode
}

type saramaCluster struct {
kafkaBrokers []string

consumerMode cluster.ConsumerMode
}

func (c *saramaCluster) NewConsumer(groupID string, topics []string) (KafkaConsumer, error) {
consumerConfig := cluster.NewConfig()
consumerConfig.Version = sarama.V1_1_0_0
consumerConfig.Group.Mode = c.consumerMode
return cluster.NewConsumer(c.kafkaBrokers, groupID, topics, consumerConfig)
}

func (c *saramaCluster) GetConsumerMode() cluster.ConsumerMode {
return c.consumerMode
}

type subscription struct {
Namespace string
Name string
Expand Down Expand Up @@ -166,6 +176,7 @@ func (d *KafkaDispatcher) subscribe(channelRef provisioners.ChannelReference, su

group := fmt.Sprintf("%s.%s.%s", controller.Name, sub.Namespace, sub.Name)
consumer, err := d.kafkaCluster.NewConsumer(group, []string{topicName})

if err != nil {
// we can not create a consumer - logging that, with reason
d.logger.Info("Could not create proper consumer", zap.Error(err))
Expand All @@ -179,28 +190,53 @@ func (d *KafkaDispatcher) subscribe(channelRef provisioners.ChannelReference, su
}
channelMap[sub] = consumer

go func() {
for {
msg, more := <-consumer.Messages()
if more {
d.logger.Info("Dispatching a message for subscription", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
message := fromKafkaMessage(msg)
err := d.dispatchMessage(message, sub)
if err != nil {
d.logger.Warn("Got error trying to dispatch message", zap.Error(err))
if cluster.ConsumerModePartitions == d.kafkaCluster.GetConsumerMode() {
yuzisun marked this conversation as resolved.
Show resolved Hide resolved
go func() {
d.logger.Info("Partition Consumer for subscription started", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
for {
pc, more := <-consumer.Partitions()
if !more {
break
}
// TODO: handle errors with pluggable strategy
consumer.MarkOffset(msg, "") // Mark message as processed
} else {
break
go func(pc cluster.PartitionConsumer) {
for msg := range pc.Messages() {
d.dispatch(channelRef, sub, consumer, msg)
}
}(pc)
}
}
d.logger.Info("Consumer for subscription stopped", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
}()

d.logger.Info("Partition Consumer for subscription stopped", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
}()
} else {
go func() {
d.logger.Info("Consumer for subscription started", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
for {
msg, more := <-consumer.Messages()
if more {
d.dispatch(channelRef, sub, consumer, msg)
} else {
break
}
}
d.logger.Info("Consumer for subscription stopped", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
}()
}
return nil
}

func (d *KafkaDispatcher) dispatch(channelRef provisioners.ChannelReference, sub subscription, consumer KafkaConsumer,
msg *sarama.ConsumerMessage) error {
d.logger.Info("Dispatching a message for subscription", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub), zap.Any("partition", msg.Partition), zap.Any("offset", msg.Offset))
message := fromKafkaMessage(msg)
err := d.dispatchMessage(message, sub)
if err != nil {
d.logger.Warn("Got error trying to dispatch message", zap.Error(err))
}
// TODO: handle errors with pluggable strategy
consumer.MarkOffset(msg, "") // Mark message as processed
return err
}

func (d *KafkaDispatcher) unsubscribe(channel provisioners.ChannelReference, sub subscription) error {
d.logger.Info("Unsubscribing from channel", zap.Any("channel", channel), zap.Any("subscription", sub))
if consumer, ok := d.kafkaConsumers[channel][sub]; ok {
Expand All @@ -224,7 +260,7 @@ func (d *KafkaDispatcher) setConfig(config *multichannelfanout.Config) {
d.config.Store(config)
}

func NewDispatcher(brokers []string, logger *zap.Logger) (*KafkaDispatcher, error) {
func NewDispatcher(brokers []string, consumerMode cluster.ConsumerMode, logger *zap.Logger) (*KafkaDispatcher, error) {

conf := sarama.NewConfig()
conf.Version = sarama.V1_1_0_0
Expand All @@ -242,7 +278,7 @@ func NewDispatcher(brokers []string, logger *zap.Logger) (*KafkaDispatcher, erro
dispatcher := &KafkaDispatcher{
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()),

kafkaCluster: &saramaCluster{kafkaBrokers: brokers},
kafkaCluster: &saramaCluster{kafkaBrokers: brokers, consumerMode: consumerMode},
kafkaConsumers: make(map[provisioners.ChannelReference]map[subscription]KafkaConsumer),
kafkaAsyncProducer: producer,

Expand Down
Loading