Skip to content

Commit

Permalink
Reauthentication with MQTT5 Enhanced Authentication (AUTH packet exch…
Browse files Browse the repository at this point in the history
…ange) in autopaho

Adds support for Reauthentication to autopaho.
  • Loading branch information
MattBrittan authored Jan 7, 2024
2 parents 70316d9 + 9b48c5a commit e452008
Show file tree
Hide file tree
Showing 3 changed files with 141 additions and 3 deletions.
16 changes: 16 additions & 0 deletions autopaho/auto.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,22 @@ func (c *ConnectionManager) AwaitConnection(ctx context.Context) error {
}
}

// Authenticate is used to initiate a reauthentication of credentials with the
// server. This function sends the initial Auth packet to start the reauthentication
// then relies on the client AuthHandler managing any further requests from the
// server until either a successful Auth packet is passed back, or a Disconnect
// is received.
func (c *ConnectionManager) Authenticate(ctx context.Context, a *paho.Auth) (*paho.AuthResponse, error) {
c.mu.Lock()
cli := c.cli
c.mu.Unlock()

if cli == nil {
return nil, ConnectionDownError
}
return cli.Authenticate(ctx, a)
}

// Subscribe is used to send a Subscription request to the MQTT server.
// It is passed a pre-prepared Subscribe packet and blocks waiting for
// a response Suback, or for the timeout to fire. Any response Suback
Expand Down
110 changes: 110 additions & 0 deletions autopaho/auto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"time"

"github.com/eclipse/paho.golang/internal/testserver"
"github.com/eclipse/paho.golang/packets"
paholog "github.com/eclipse/paho.golang/paho/log"
"go.uber.org/goleak"

Expand Down Expand Up @@ -398,6 +399,115 @@ func TestBasicPubSub(t *testing.T) {
}
}

func TestAuthenticate(t *testing.T) {
t.Parallel()
server, _ := url.Parse(dummyURL)
serverLogger := paholog.NewTestLogger(t, "testServer:")
logger := paholog.NewTestLogger(t, "test:")

ts := testserver.New(serverLogger)

type tsConnUpMsg struct {
cancelFn func() // Function to cancel test server context
done chan struct{} // Will be closed when the test server has disconnected (and shutdown)
}
tsConnUpChan := make(chan tsConnUpMsg) // Message will be sent when test server connection is up
pahoConnUpChan := make(chan struct{}) // When autopaho reports connection is up write to channel will occur

atCount := 0

config := ClientConfig{
ServerUrls: []*url.URL{server},
KeepAlive: 60,
ConnectRetryDelay: time.Millisecond, // Retry connection very quickly!
ConnectTimeout: shortDelay, // Connection should come up very quickly
AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) {
atCount += 1
if atCount == 2 { // fail on the initial reconnection attempt to exercise retry functionality
return nil, errors.New("connection attempt failed")
}
ctx, cancel := context.WithCancel(ctx)
conn, done, err := ts.Connect(ctx)
if err == nil { // The above may fail if attempted too quickly (before disconnect processed)
tsConnUpChan <- tsConnUpMsg{cancelFn: cancel, done: done}
} else {
cancel()
}
return conn, err
},
OnConnectionUp: func(*ConnectionManager, *paho.Connack) { pahoConnUpChan <- struct{}{} },
Debug: logger,
PahoDebug: logger,
PahoErrors: logger,
ClientConfig: paho.ClientConfig{
ClientID: "test",
AuthHandler: &fakeAuth{},
},
}

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cm, err := NewConnection(ctx, config)
if err != nil {
t.Fatalf("expected NewConnection success: %s", err)
}

var initialConnUpMsg tsConnUpMsg
select {
case initialConnUpMsg = <-tsConnUpChan:
case <-time.After(shortDelay):
t.Fatal("timeout awaiting initial connection request")
}
select {
case <-pahoConnUpChan:
case <-time.After(shortDelay):
t.Fatal("timeout awaiting connection up")
}

ctx, cf := context.WithTimeout(context.Background(), 5*time.Second)
defer cf()
ar, err := cm.Authenticate(ctx, &paho.Auth{
ReasonCode: packets.AuthReauthenticate,
Properties: &paho.AuthProperties{
AuthMethod: "TEST",
AuthData: []byte("secret data"),
},
})
if err != nil {
t.Fatalf("authenticate failed: %s", err)
}
if !ar.Success {
t.Fatal("authenticate failed")
}

cancel() // Cancelling outer context will cascade
select { // Wait for the local client to terminate
case <-cm.Done():
case <-time.After(shortDelay):
t.Fatal("timeout awaiting connection manager shutdown")
}

select { // Wait for test server to terminate
case <-initialConnUpMsg.done:
case <-time.After(shortDelay):
t.Fatal("test server did not shut down in a timely manner")
}
}

// fakeAuth implements the Auther interface to test auto.AuthHandler
type fakeAuth struct{}

func (f *fakeAuth) Authenticate(a *paho.Auth) *paho.Auth {
return &paho.Auth{
Properties: &paho.AuthProperties{
AuthMethod: "TEST",
AuthData: []byte("secret data"),
},
}
}

func (f *fakeAuth) Authenticated() {}

// TestClientConfig_buildConnectPacket exercises buildConnectPacket checking that options and callbacks are applied
func TestClientConfig_buildConnectPacket(t *testing.T) {
server, _ := url.Parse(dummyURL)
Expand Down
18 changes: 15 additions & 3 deletions internal/testserver/testserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,21 @@ func (i *Instance) processIncoming(cp *packets.ControlPacket, out chan<- *packet
case packets.DISCONNECT:
p := cp.Content.(*packets.Disconnect)
return fmt.Errorf("disconnect received with resaon %d", p.ReasonCode)
// case packets.AUTH: not currently supported
// cp.Flags = 1
// cp.Content = &Auth{Properties: &Properties{}}
case packets.AUTH:
authProp := cp.Content.(*packets.Auth).Properties
switch authProp.AuthMethod {
case "TEST":
if !bytes.Equal(authProp.AuthData, []byte("secret data")) {
return fmt.Errorf("invalid authentication data received: %s", authProp.AuthData)
}
default:
return fmt.Errorf("invalid authentication method received: %s", authProp.AuthMethod)
}
response := packets.NewControlPacket(packets.AUTH)
r := response.Content.(*packets.Auth)
r.ReasonCode = packets.AuthSuccess
out <- response
return nil
default:
return fmt.Errorf("unsupported packet type %d received", cp.Type)
}
Expand Down

0 comments on commit e452008

Please sign in to comment.