diff --git a/internal/consuming/kafka.go b/internal/consuming/kafka.go index 6cd5bff001..44cf1cbbd5 100644 --- a/internal/consuming/kafka.go +++ b/internal/consuming/kafka.go @@ -16,7 +16,9 @@ import ( "github.com/centrifugal/centrifuge" "github.com/twmb/franz-go/pkg/kgo" "github.com/twmb/franz-go/pkg/kmsg" + "github.com/twmb/franz-go/pkg/sasl/aws" "github.com/twmb/franz-go/pkg/sasl/plain" + "github.com/twmb/franz-go/pkg/sasl/scram" ) type KafkaConfig struct { @@ -157,13 +159,30 @@ func (c *KafkaConsumer) initClient() (*kgo.Client, error) { } if c.config.SASLMechanism != "" { - if c.config.SASLMechanism != "plain" { - return nil, fmt.Errorf("only plain SASL auth mechanism is supported") + switch c.config.SASLMechanism { + case "plain": + opts = append(opts, kgo.SASL(plain.Auth{ + User: c.config.SASLUser, + Pass: c.config.SASLPassword, + }.AsMechanism())) + case "scram-sha-256": + opts = append(opts, kgo.SASL(scram.Auth{ + User: c.config.SASLUser, + Pass: c.config.SASLPassword, + }.AsSha256Mechanism())) + case "scram-sha-512": + opts = append(opts, kgo.SASL(scram.Auth{ + User: c.config.SASLUser, + Pass: c.config.SASLPassword, + }.AsSha512Mechanism())) + case "aws-msk-iam": + opts = append(opts, kgo.SASL(aws.Auth{ + AccessKey: c.config.SASLUser, + SecretKey: c.config.SASLPassword, + }.AsManagedStreamingIAMMechanism())) + default: + return nil, fmt.Errorf("unsupported SASL mechanism: %s", c.config.SASLMechanism) } - opts = append(opts, kgo.SASL(plain.Auth{ - User: c.config.SASLUser, - Pass: c.config.SASLPassword, - }.AsMechanism())) } client, err := kgo.NewClient(opts...)