From e31f69e95ec1ba6576eb66cd156f5d7e2eaeb1bd Mon Sep 17 00:00:00 2001 From: Minhyuk Kim Date: Wed, 3 Jan 2024 20:48:01 +0900 Subject: [PATCH 1/5] feat: add Authenticate to autopaho --- autopaho/auto.go | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/autopaho/auto.go b/autopaho/auto.go index e4bdacd..5da7a8d 100644 --- a/autopaho/auto.go +++ b/autopaho/auto.go @@ -368,6 +368,22 @@ func (c *ConnectionManager) AwaitConnection(ctx context.Context) error { } } +// Authenticate is used to initiate a reauthentication of credentials with the +// server. This function sends the initial Auth packet to start the reauthentication +// then relies on the client AuthHandler managing any further requests from the +// server until either a successful Auth packet is passed back, or a Disconnect +// is received. +func (c *ConnectionManager) Authenticate(ctx context.Context, a *paho.Auth) (*paho.AuthResponse, error) { + c.mu.Lock() + cli := c.cli + c.mu.Unlock() + + if cli == nil { + return nil, ConnectionDownError + } + return cli.Authenticate(ctx, a) +} + // Subscribe is used to send a Subscription request to the MQTT server. // It is passed a pre-prepared Subscribe packet and blocks waiting for // a response Suback, or for the timeout to fire. Any response Suback From 03567cc6882b8363d157ed8a47e4cef23f4615d6 Mon Sep 17 00:00:00 2001 From: Minhyuk Kim Date: Sat, 6 Jan 2024 13:32:57 +0900 Subject: [PATCH 2/5] feat: connect to testserver for TestAuthenticate --- autopaho/auto_test.go | 93 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/autopaho/auto_test.go b/autopaho/auto_test.go index 3549510..63e5acb 100644 --- a/autopaho/auto_test.go +++ b/autopaho/auto_test.go @@ -396,6 +396,99 @@ func TestBasicPubSub(t *testing.T) { } } +func TestAuthenticate(t *testing.T) { + t.Parallel() + server, _ := url.Parse(dummyURL) + serverLogger := paholog.NewTestLogger(t, "testServer:") + logger := paholog.NewTestLogger(t, "test:") + + ts := testserver.New(serverLogger) + + type tsConnUpMsg struct { + cancelFn func() // Function to cancel test server context + done chan struct{} // Will be closed when the test server has disconnected (and shutdown) + } + tsConnUpChan := make(chan tsConnUpMsg) // Message will be sent when test server connection is up + pahoConnUpChan := make(chan struct{}) // When autopaho reports connection is up write to channel will occur + + atCount := 0 + + config := ClientConfig{ + ServerUrls: []*url.URL{server}, + KeepAlive: 60, + ConnectRetryDelay: time.Millisecond, // Retry connection very quickly! + ConnectTimeout: shortDelay, // Connection should come up very quickly + AttemptConnection: func(ctx context.Context, _ ClientConfig, _ *url.URL) (net.Conn, error) { + atCount += 1 + if atCount == 2 { // fail on the initial reconnection attempt to exercise retry functionality + return nil, errors.New("connection attempt failed") + } + ctx, cancel := context.WithCancel(ctx) + conn, done, err := ts.Connect(ctx) + if err == nil { // The above may fail if attempted too quickly (before disconnect processed) + tsConnUpChan <- tsConnUpMsg{cancelFn: cancel, done: done} + } else { + cancel() + } + return conn, err + }, + OnConnectionUp: func(*ConnectionManager, *paho.Connack) { pahoConnUpChan <- struct{}{} }, + Debug: logger, + PahoDebug: logger, + PahoErrors: logger, + ClientConfig: paho.ClientConfig{ + ClientID: "test", + AuthHandler: &fakeAuth{}, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + cm, err := NewConnection(ctx, config) + if err != nil { + t.Fatalf("expected NewConnection success: %s", err) + } + + var initialConnUpMsg tsConnUpMsg + select { + case initialConnUpMsg = <-tsConnUpChan: + case <-time.After(shortDelay): + t.Fatal("timeout awaiting initial connection request") + } + select { + case <-pahoConnUpChan: + case <-time.After(shortDelay): + t.Fatal("timeout awaiting connection up") + } + + cancel() // Cancelling outer context will cascade + select { // Wait for the local client to terminate + case <-cm.Done(): + case <-time.After(shortDelay): + t.Fatal("timeout awaiting connection manager shutdown") + } + + select { // Wait for test server to terminate + case <-initialConnUpMsg.done: + case <-time.After(shortDelay): + t.Fatal("test server did not shut down in a timely manner") + } +} + +// fakeAuth implements the Auther interface to test auto.AuthHandler +type fakeAuth struct{} + +func (f *fakeAuth) Authenticate(a *paho.Auth) *paho.Auth { + return &paho.Auth{ + Properties: &paho.AuthProperties{ + AuthMethod: "TEST", + AuthData: []byte("secret data"), + }, + } +} + +func (f *fakeAuth) Authenticated() {} + // TestClientConfig_buildConnectPacket exercises buildConnectPacket checking that options and callbacks are applied func TestClientConfig_buildConnectPacket(t *testing.T) { server, _ := url.Parse(dummyURL) From 1a182e13f9048f41c8171e5625898c45cbc6b224 Mon Sep 17 00:00:00 2001 From: Minhyuk Kim Date: Sat, 6 Jan 2024 14:17:30 +0900 Subject: [PATCH 3/5] feat: change testserver to send AuthSuccess when it gets AUTH packet --- internal/testserver/testserver.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/internal/testserver/testserver.go b/internal/testserver/testserver.go index 9c26220..5ea1ce6 100644 --- a/internal/testserver/testserver.go +++ b/internal/testserver/testserver.go @@ -486,9 +486,12 @@ func (i *Instance) processIncoming(cp *packets.ControlPacket, out chan<- *packet case packets.DISCONNECT: p := cp.Content.(*packets.Disconnect) return fmt.Errorf("disconnect received with resaon %d", p.ReasonCode) - // case packets.AUTH: not currently supported - // cp.Flags = 1 - // cp.Content = &Auth{Properties: &Properties{}} + case packets.AUTH: + response := packets.NewControlPacket(packets.AUTH) + r := response.Content.(*packets.Auth) + r.ReasonCode = packets.AuthSuccess + out <- response + return nil default: return fmt.Errorf("unsupported packet type %d received", cp.Type) } From 6ab7eb7081f1150529ac781d0e171a2ed84d3545 Mon Sep 17 00:00:00 2001 From: Minhyuk Kim Date: Sat, 6 Jan 2024 21:02:32 +0900 Subject: [PATCH 4/5] feat: add test code to TestAuthenticate --- autopaho/auto_test.go | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/autopaho/auto_test.go b/autopaho/auto_test.go index 63e5acb..412f4d4 100644 --- a/autopaho/auto_test.go +++ b/autopaho/auto_test.go @@ -14,6 +14,7 @@ import ( "time" "github.com/eclipse/paho.golang/internal/testserver" + "github.com/eclipse/paho.golang/packets" paholog "github.com/eclipse/paho.golang/paho/log" "go.uber.org/goleak" @@ -461,6 +462,22 @@ func TestAuthenticate(t *testing.T) { t.Fatal("timeout awaiting connection up") } + ctx, cf := context.WithTimeout(context.Background(), 5*time.Second) + defer cf() + ar, err := cm.Authenticate(ctx, &paho.Auth{ + ReasonCode: packets.AuthReauthenticate, + Properties: &paho.AuthProperties{ + AuthMethod: "TEST", + AuthData: []byte("secret data"), + }, + }) + if err != nil { + t.Fatalf("authenticate failed: %s", err) + } + if !ar.Success { + t.Fatal("authenticate failed") + } + cancel() // Cancelling outer context will cascade select { // Wait for the local client to terminate case <-cm.Done(): From 9b48c5a0d2c7aa4cd82c6c4c0832fac96ba78cc3 Mon Sep 17 00:00:00 2001 From: Minhyuk Kim Date: Sat, 6 Jan 2024 21:05:01 +0900 Subject: [PATCH 5/5] feat: change testserver to check auth method and data when it gets AUTH packet --- internal/testserver/testserver.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/internal/testserver/testserver.go b/internal/testserver/testserver.go index 5ea1ce6..390ccf1 100644 --- a/internal/testserver/testserver.go +++ b/internal/testserver/testserver.go @@ -487,6 +487,15 @@ func (i *Instance) processIncoming(cp *packets.ControlPacket, out chan<- *packet p := cp.Content.(*packets.Disconnect) return fmt.Errorf("disconnect received with resaon %d", p.ReasonCode) case packets.AUTH: + authProp := cp.Content.(*packets.Auth).Properties + switch authProp.AuthMethod { + case "TEST": + if !bytes.Equal(authProp.AuthData, []byte("secret data")) { + return fmt.Errorf("invalid authentication data received: %s", authProp.AuthData) + } + default: + return fmt.Errorf("invalid authentication method received: %s", authProp.AuthMethod) + } response := packets.NewControlPacket(packets.AUTH) r := response.Content.(*packets.Auth) r.ReasonCode = packets.AuthSuccess