diff --git a/broker.go b/broker.go index 9967711b6..7b32a03d3 100644 --- a/broker.go +++ b/broker.go @@ -4,7 +4,6 @@ import ( "crypto/tls" "encoding/binary" "fmt" - metrics "github.com/rcrowley/go-metrics" "io" "net" "sort" @@ -13,6 +12,8 @@ import ( "sync" "sync/atomic" "time" + + metrics "github.com/rcrowley/go-metrics" ) // Broker represents a single Kafka broker connection. All operations on this object are entirely concurrency-safe. @@ -944,19 +945,16 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int func (b *Broker) sendAndReceiveSASLPlainAuth() error { // default to V0 to allow for backward compatability when SASL is enabled // but not the handshake - saslHandshake := SASLHandshakeV0 if b.conf.Net.SASL.Handshake { - if b.conf.Version.IsAtLeast(V1_0_0_0) { - saslHandshake = SASLHandshakeV1 - } - handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, saslHandshake) + + handshakeErr := b.sendAndReceiveSASLHandshake(SASLTypePlaintext, b.conf.Net.SASL.Version) if handshakeErr != nil { Logger.Printf("Error while performing SASL handshake %s\n", b.addr) return handshakeErr } } - if saslHandshake == SASLHandshakeV1 { + if b.conf.Net.SASL.Version == SASLHandshakeV1 { return b.sendAndReceiveV1SASLPlainAuth() } return b.sendAndReceiveV0SASLPlainAuth() diff --git a/broker_test.go b/broker_test.go index dd73880fb..7bf020fcd 100644 --- a/broker_test.go +++ b/broker_test.go @@ -441,6 +441,7 @@ func TestSASLPlainAuth(t *testing.T) { conf.Net.SASL.Mechanism = SASLTypePlaintext conf.Net.SASL.User = "token" conf.Net.SASL.Password = "password" + conf.Net.SASL.Version = SASLHandshakeV1 broker.conf = conf broker.conf.Version = V1_0_0_0 diff --git a/config.go b/config.go index 874a013cd..e2e651315 100644 --- a/config.go +++ b/config.go @@ -58,6 +58,9 @@ type Config struct { // SASLMechanism is the name of the enabled SASL mechanism. // Possible values: OAUTHBEARER, PLAIN (defaults to PLAIN). Mechanism SASLMechanism + // Version is the SASL Protocol Version to use + // Kafka > 1.x should use V1, except on Azure EventHub which use V0 + Version int16 // Whether or not to send the Kafka SASL handshake first if enabled // (defaults to true). You should only set this to false if you're using // a non-Kafka SASL proxy. @@ -398,6 +401,7 @@ func NewConfig() *Config { c.Net.ReadTimeout = 30 * time.Second c.Net.WriteTimeout = 30 * time.Second c.Net.SASL.Handshake = true + c.Net.SASL.Version = SASLHandshakeV0 c.Metadata.Retry.Max = 3 c.Metadata.Retry.Backoff = 250 * time.Millisecond