Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support partition consumer mode for kafka channel #879

Merged
merged 4 commits into from
Mar 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion contrib/kafka/cmd/dispatcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func main() {
logger.Fatal("unable to create manager.", zap.Error(err))
}

kafkaDispatcher, err := dispatcher.NewDispatcher(provisionerConfig.Brokers, logger)
kafkaDispatcher, err := dispatcher.NewDispatcher(provisionerConfig.Brokers, provisionerConfig.ConsumerMode, logger)
if err != nil {
logger.Fatal("unable to create kafka dispatcher.", zap.Error(err))
}
Expand Down
32 changes: 2 additions & 30 deletions contrib/kafka/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,11 @@ package main

import (
"flag"
"fmt"
"os"
"strings"

istiov1alpha3 "github.com/knative/pkg/apis/istio/v1alpha3"
"go.uber.org/zap"
"k8s.io/apimachinery/pkg/runtime"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"os"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did you run gofmt on this file? I'd have expected it to put this import above next to flag.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm I did run gofmt -w main.go, nothing changes, then I purposely put next to flag and rerun gofmt it puts os back after _ k8s.io/client-go/plugin/pkg/client/auth/gcp, looks weird. I am using go version go1.11.5 darwin/amd64

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird! I don't know why it does that, but I guess it's correct by definition. So 👍

"sigs.k8s.io/controller-runtime/pkg/client/config"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -20,7 +17,6 @@ import (
"github.com/knative/eventing/contrib/kafka/pkg/controller/channel"
eventingv1alpha "github.com/knative/eventing/pkg/apis/eventing/v1alpha1"
"github.com/knative/eventing/pkg/provisioners"
"github.com/knative/pkg/configmap"
)

const (
Expand Down Expand Up @@ -64,7 +60,7 @@ func main() {
}

// TODO the underlying config map needs to be watched and the config should be reloaded if there is a change.
provisionerConfig, err := getProvisionerConfig()
provisionerConfig, err := provisionerController.GetProvisionerConfig("/etc/config-provisioner")

if err != nil {
logger.Error(err, "unable to run controller manager")
Expand All @@ -80,27 +76,3 @@ func main() {

mgr.Start(signals.SetupSignalHandler())
}

// getProvisionerConfig returns the details of the associated Provisioner/ClusterChannelProvisioner object
func getProvisionerConfig() (*provisionerController.KafkaProvisionerConfig, error) {
configMap, err := configmap.Load("/etc/config-provisioner")
if err != nil {
return nil, fmt.Errorf("error loading provisioner configuration: %s", err)
}

if len(configMap) == 0 {
return nil, fmt.Errorf("missing provisioner configuration")
}

config := &provisionerController.KafkaProvisionerConfig{}

if value, ok := configMap[BrokerConfigMapKey]; ok {
bootstrapServers := strings.Split(value, ",")
if len(bootstrapServers) != 0 {
config.Brokers = bootstrapServers
return config, nil
}
}

return nil, fmt.Errorf("missing key %s in provisioner configuration", BrokerConfigMapKey)
}
5 changes: 4 additions & 1 deletion contrib/kafka/pkg/controller/types.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package controller

import "github.com/bsm/sarama-cluster"

type KafkaProvisionerConfig struct {
Brokers []string
Brokers []string
ConsumerMode cluster.ConsumerMode
}
18 changes: 14 additions & 4 deletions contrib/kafka/pkg/controller/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package controller

import (
"fmt"
"github.com/bsm/sarama-cluster"
"strings"

"github.com/knative/pkg/configmap"
)

const (
BrokerConfigMapKey = "bootstrap_servers"
KafkaChannelSeparator = "."
BrokerConfigMapKey = "bootstrap_servers"
ConsumerModeConfigMapKey = "consumer_mode"
ConsumerModePartitionConsumerValue = "partitions"
KafkaChannelSeparator = "."
)

// GetProvisionerConfig returns the details of the associated ClusterChannelProvisioner object
Expand All @@ -33,8 +36,15 @@ func GetProvisionerConfig(path string) (*KafkaProvisionerConfig, error) {
}
}
config.Brokers = bootstrapServers
return config, nil
} else {
return nil, fmt.Errorf("missing key %s in provisioner configuration", BrokerConfigMapKey)
}

return nil, fmt.Errorf("missing key %s in provisioner configuration", BrokerConfigMapKey)
config.ConsumerMode = cluster.ConsumerModeMultiplex
if mode, ok := configMap[ConsumerModeConfigMapKey]; ok {
yuzisun marked this conversation as resolved.
Show resolved Hide resolved
if strings.ToLower(mode) == ConsumerModePartitionConsumerValue {
config.ConsumerMode = cluster.ConsumerModePartitions
}
}
return config, nil
}
17 changes: 17 additions & 0 deletions contrib/kafka/pkg/controller/util_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package controller

import (
"github.com/bsm/sarama-cluster"
"io/ioutil"
"os"
"path/filepath"
Expand Down Expand Up @@ -53,6 +54,22 @@ func TestGetProvisionerConfigBrokers(t *testing.T) {
Brokers: []string{"kafkabroker1.kafka:9092", "kafkabroker2.kafka:9092"},
},
},
{
name: "partition consumer",
data: map[string]string{"bootstrap_servers": "kafkabroker.kafka:9092", "consumer_mode": "partitions"},
expected: &KafkaProvisionerConfig{
Brokers: []string{"kafkabroker.kafka:9092"},
ConsumerMode: cluster.ConsumerModePartitions,
},
},
{
name: "default multiplex",
data: map[string]string{"bootstrap_servers": "kafkabroker.kafka:9092", "consumer_mode": "multiplex"},
expected: &KafkaProvisionerConfig{
Brokers: []string{"kafkabroker.kafka:9092"},
ConsumerMode: cluster.ConsumerModeMultiplex,
},
},
}

for _, tc := range testCases {
Expand Down
78 changes: 59 additions & 19 deletions contrib/kafka/pkg/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,34 @@ type KafkaDispatcher struct {

type KafkaConsumer interface {
Messages() <-chan *sarama.ConsumerMessage
Partitions() <-chan cluster.PartitionConsumer
MarkOffset(msg *sarama.ConsumerMessage, metadata string)
Close() (err error)
}

type KafkaCluster interface {
NewConsumer(groupID string, topics []string) (KafkaConsumer, error)

GetConsumerMode() cluster.ConsumerMode
}

type saramaCluster struct {
kafkaBrokers []string

consumerMode cluster.ConsumerMode
}

func (c *saramaCluster) NewConsumer(groupID string, topics []string) (KafkaConsumer, error) {
consumerConfig := cluster.NewConfig()
consumerConfig.Version = sarama.V1_1_0_0
consumerConfig.Group.Mode = c.consumerMode
return cluster.NewConsumer(c.kafkaBrokers, groupID, topics, consumerConfig)
}

func (c *saramaCluster) GetConsumerMode() cluster.ConsumerMode {
return c.consumerMode
}

type subscription struct {
Namespace string
Name string
Expand Down Expand Up @@ -166,6 +176,7 @@ func (d *KafkaDispatcher) subscribe(channelRef provisioners.ChannelReference, su

group := fmt.Sprintf("%s.%s.%s", controller.Name, sub.Namespace, sub.Name)
consumer, err := d.kafkaCluster.NewConsumer(group, []string{topicName})

if err != nil {
// we can not create a consumer - logging that, with reason
d.logger.Info("Could not create proper consumer", zap.Error(err))
Expand All @@ -179,26 +190,55 @@ func (d *KafkaDispatcher) subscribe(channelRef provisioners.ChannelReference, su
}
channelMap[sub] = consumer

go func() {
for {
msg, more := <-consumer.Messages()
if more {
d.logger.Info("Dispatching a message for subscription", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
message := fromKafkaMessage(msg)
err := d.dispatchMessage(message, sub)
if err != nil {
d.logger.Warn("Got error trying to dispatch message", zap.Error(err))
}
// TODO: handle errors with pluggable strategy
consumer.MarkOffset(msg, "") // Mark message as processed
} else {
break
if cluster.ConsumerModePartitions == d.kafkaCluster.GetConsumerMode() {
yuzisun marked this conversation as resolved.
Show resolved Hide resolved
go d.partitionConsumerLoop(consumer, channelRef, sub)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks a lot, this looks easier to test now.

} else {
go d.multiplexConsumerLoop(consumer, channelRef, sub)
}
return nil
}

func (d *KafkaDispatcher) partitionConsumerLoop(consumer KafkaConsumer, channelRef provisioners.ChannelReference, sub subscription) {
d.logger.Info("Partition Consumer for subscription started", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
for {
pc, more := <-consumer.Partitions()
if !more {
break
}
go func(pc cluster.PartitionConsumer) {
for msg := range pc.Messages() {
d.dispatch(channelRef, sub, consumer, msg)
}
}(pc)
}
d.logger.Info("Partition Consumer for subscription stopped", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
}

func (d *KafkaDispatcher) multiplexConsumerLoop(consumer KafkaConsumer, channelRef provisioners.ChannelReference, sub subscription) {
d.logger.Info("Consumer for subscription started", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
for {
msg, more := <-consumer.Messages()
if more {
d.dispatch(channelRef, sub, consumer, msg)
} else {
break
}
d.logger.Info("Consumer for subscription stopped", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
}()
}
d.logger.Info("Consumer for subscription stopped", zap.Any("channelRef", channelRef), zap.Any("subscription", sub))
}

return nil
func (d *KafkaDispatcher) dispatch(channelRef provisioners.ChannelReference, sub subscription, consumer KafkaConsumer,
msg *sarama.ConsumerMessage) error {
d.logger.Info("Dispatching a message for subscription", zap.Any("channelRef", channelRef),
zap.Any("subscription", sub), zap.Any("partition", msg.Partition), zap.Any("offset", msg.Offset))
message := fromKafkaMessage(msg)
err := d.dispatchMessage(message, sub)
if err != nil {
d.logger.Warn("Got error trying to dispatch message", zap.Error(err))
}
// TODO: handle errors with pluggable strategy
consumer.MarkOffset(msg, "") // Mark message as processed
return err
}

func (d *KafkaDispatcher) unsubscribe(channel provisioners.ChannelReference, sub subscription) error {
Expand All @@ -224,7 +264,7 @@ func (d *KafkaDispatcher) setConfig(config *multichannelfanout.Config) {
d.config.Store(config)
}

func NewDispatcher(brokers []string, logger *zap.Logger) (*KafkaDispatcher, error) {
func NewDispatcher(brokers []string, consumerMode cluster.ConsumerMode, logger *zap.Logger) (*KafkaDispatcher, error) {

conf := sarama.NewConfig()
conf.Version = sarama.V1_1_0_0
Expand All @@ -242,7 +282,7 @@ func NewDispatcher(brokers []string, logger *zap.Logger) (*KafkaDispatcher, erro
dispatcher := &KafkaDispatcher{
dispatcher: provisioners.NewMessageDispatcher(logger.Sugar()),

kafkaCluster: &saramaCluster{kafkaBrokers: brokers},
kafkaCluster: &saramaCluster{kafkaBrokers: brokers, consumerMode: consumerMode},
kafkaConsumers: make(map[provisioners.ChannelReference]map[subscription]KafkaConsumer),
kafkaAsyncProducer: producer,

Expand Down
Loading