Skip to content

Commit

Permalink
Merge pull request #648 from rshriram/master
Browse files Browse the repository at this point in the history
Add support for SASL plain text authentication
  • Loading branch information
eapache committed May 5, 2016
2 parents 388a9be + d0d6717 commit 89bd629
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 0 deletions.
66 changes: 66 additions & 0 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sarama

import (
"crypto/tls"
"encoding/binary"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -82,6 +83,22 @@ func (b *Broker) Open(conf *Config) error {
b.conn = newBufConn(b.conn)

b.conf = conf

if conf.Net.SASL.Enable {
b.connErr = b.sendAndReceiveSASLPlainAuth()
if b.connErr != nil {
err = b.conn.Close()
if err == nil {
Logger.Printf("Closed connection to broker %s\n", b.addr)
} else {
Logger.Printf("Error while closing connection to broker %s: %s\n", b.addr, err)
}
b.conn = nil
atomic.StoreInt32(&b.opened, 0)
return
}
}

b.done = make(chan bool)
b.responses = make(chan responsePromise, b.conf.Net.MaxOpenRequests-1)

Expand Down Expand Up @@ -454,3 +471,52 @@ func (b *Broker) responseReceiver() {
}
close(b.done)
}

// Kafka 0.10.0 plans to support SASL Plain and Kerberos as per PR #812 (KIP-43)/(JIRA KAFKA-3149)
// Some hosted kafka services such as IBM Message Hub already offer SASL/PLAIN auth with Kafka 0.9
//
// In SASL Plain, Kafka expects the auth header to be in the following format
// Message format (from https://tools.ietf.org/html/rfc4616):
//
// message = [authzid] UTF8NUL authcid UTF8NUL passwd
// authcid = 1*SAFE ; MUST accept up to 255 octets
// authzid = 1*SAFE ; MUST accept up to 255 octets
// passwd = 1*SAFE ; MUST accept up to 255 octets
// UTF8NUL = %x00 ; UTF-8 encoded NUL character
//
// SAFE = UTF1 / UTF2 / UTF3 / UTF4
// ;; any UTF-8 encoded Unicode character except NUL
//
// When credentials are valid, Kafka returns a 4 byte array of null characters.
// When credentials are invalid, Kafka closes the connection. This does not seem to be the ideal way
// of responding to bad credentials but thats how its being done today.
func (b *Broker) sendAndReceiveSASLPlainAuth() error {
length := 1 + len(b.conf.Net.SASL.User) + 1 + len(b.conf.Net.SASL.Password)
authBytes := make([]byte, length+4) //4 byte length header + auth data
binary.BigEndian.PutUint32(authBytes, uint32(length))
copy(authBytes[4:], []byte("\x00"+b.conf.Net.SASL.User+"\x00"+b.conf.Net.SASL.Password))

err := b.conn.SetWriteDeadline(time.Now().Add(b.conf.Net.WriteTimeout))
if err != nil {
Logger.Printf("Failed to set write deadline when doing SASL auth with broker %s: %s\n", b.addr, err.Error())
return err
}

_, err = b.conn.Write(authBytes)
if err != nil {
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}

header := make([]byte, 4)
n, err := io.ReadFull(b.conn, header)
// If the credentials are valid, we would get a 4 byte response filled with null characters.
// Otherwise, the broker closes the connection and we get an EOF
if err != nil {
Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
return err
}

Logger.Printf("SASL authentication successful with broker %s:%v - %v\n", b.addr, n, header)
return nil
}
23 changes: 23 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,17 @@ type Config struct {
Config *tls.Config
}

// SASL based authentication with broker. While there are multiple SASL authentication methods
// the current implementation is limited to plaintext (SASL/PLAIN) authentication
SASL struct {
// Whether or not to use SASL authentication when connecting to the broker
// (defaults to false).
Enable bool
//username and password for SASL/PLAIN authentication
User string
Password string
}

// KeepAlive specifies the keep-alive period for an active network connection.
// If zero, keep-alives are disabled. (default is 0: disabled).
KeepAlive time.Duration
Expand Down Expand Up @@ -257,6 +268,14 @@ func (c *Config) Validate() error {
if c.Net.TLS.Enable == false && c.Net.TLS.Config != nil {
Logger.Println("Net.TLS is disabled but a non-nil configuration was provided.")
}
if c.Net.SASL.Enable == false {
if c.Net.SASL.User != "" {
Logger.Println("Net.SASL is disabled but a non-empty username was provided.")
}
if c.Net.SASL.Password != "" {
Logger.Println("Net.SASL is disabled but a non-empty password was provided.")
}
}
if c.Producer.RequiredAcks > 1 {
Logger.Println("Producer.RequiredAcks > 1 is deprecated and will raise an exception with kafka >= 0.8.2.0.")
}
Expand Down Expand Up @@ -294,6 +313,10 @@ func (c *Config) Validate() error {
return ConfigurationError("Net.WriteTimeout must be > 0")
case c.Net.KeepAlive < 0:
return ConfigurationError("Net.KeepAlive must be >= 0")
case c.Net.SASL.Enable == true && c.Net.SASL.User == "":
return ConfigurationError("Net.SASL.User must not be empty when SASL is enabled")
case c.Net.SASL.Enable == true && c.Net.SASL.Password == "":
return ConfigurationError("Net.SASL.Password must not be empty when SASL is enabled")
}

// validate the Metadata values
Expand Down

0 comments on commit 89bd629

Please sign in to comment.