From 2e4bbd0ca9fdd1fb574db84331e48b721ec1d889 Mon Sep 17 00:00:00 2001 From: Minhyuk Kim Date: Sun, 14 Jan 2024 08:38:52 +0900 Subject: [PATCH 1/3] fix: reset DefaultPinger --- paho/pinger.go | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/paho/pinger.go b/paho/pinger.go index d8ba4f0..f3418d7 100644 --- a/paho/pinger.go +++ b/paho/pinger.go @@ -96,7 +96,11 @@ func (p *DefaultPinger) Run(conn net.Conn, keepAlive uint16) error { p.timer = time.AfterFunc(0, p.sendPingreq) // Immediately send first pingreq p.mu.Unlock() - return <-p.errChan + err := <-p.errChan + + p.reset() + + return err } func (p *DefaultPinger) Stop() { @@ -172,6 +176,18 @@ func (p *DefaultPinger) sendPingreq() { } } +func (p *DefaultPinger) reset() { + p.mu.Lock() + defer p.mu.Unlock() + p.timer = nil + p.previousPingAcked = make(chan struct{}, 1) + p.previousPingAcked <- struct{}{} // initial value + p.done = make(chan struct{}) + p.errChan = make(chan error, 1) + p.ackReceived = make(chan struct{}, 1) + p.stopOnce = sync.Once{} +} + func (p *DefaultPinger) stop(err error) { p.mu.Lock() defer p.mu.Unlock() From e4dc5a661e715c841b6e68089e9c510b0c1700b5 Mon Sep 17 00:00:00 2001 From: Minhyuk Kim Date: Sun, 14 Jan 2024 08:40:46 +0900 Subject: [PATCH 2/3] fix: prevent to stop Pinger when it caused by stopped Pinger --- paho/client.go | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/paho/client.go b/paho/client.go index 9573baf..600017a 100644 --- a/paho/client.go +++ b/paho/client.go @@ -48,6 +48,14 @@ var ( ErrInvalidArguments = errors.New("invalid argument") // If included (errors.Join) in an error, there is a problem with the arguments passed. Retrying on the same connection with the same arguments will not succeed. ) +type pingerError struct { + error +} + +func (pe *pingerError) Error() string { + return pe.error.Error() +} + type ( PublishReceived struct { Packet *Publish @@ -348,7 +356,7 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { defer c.workers.Done() defer c.debug.Println("returning from ping handler worker") if err := c.config.PingHandler.Run(c.config.Conn, keepalive); err != nil { - go c.error(fmt.Errorf("ping handler error: %w", err)) + go c.error(&pingerError{fmt.Errorf("ping handler error: %w", err)}) } }() @@ -548,7 +556,7 @@ func (c *Client) incoming() { } } -func (c *Client) close() { +func (c *Client) close(e error) { c.mu.Lock() defer c.mu.Unlock() @@ -563,8 +571,10 @@ func (c *Client) close() { close(c.stop) c.debug.Println("client stopped") - c.config.PingHandler.Stop() - c.debug.Println("ping stopped") + if _, ok := e.(*pingerError); !ok { + c.config.PingHandler.Stop() + c.debug.Println("ping stopped") + } _ = c.config.Conn.Close() c.debug.Println("conn closed") c.acksTracker.reset() @@ -587,12 +597,12 @@ func (c *Client) close() { // It also closes the client network connection. func (c *Client) error(e error) { c.debug.Println("error called:", e) - c.close() + c.close(e) go c.config.OnClientError(e) } func (c *Client) serverDisconnect(d *Disconnect) { - c.close() + c.close(nil) c.debug.Println("calling OnServerDisconnect") go c.config.OnServerDisconnect(d) } @@ -973,7 +983,7 @@ func (c *Client) Disconnect(d *Disconnect) error { c.debug.Println("disconnecting", d) _, err := d.Packet().WriteTo(c.config.Conn) - c.close() + c.close(nil) return err } From e2223d3e9823c17b8c463474345c09748744fb1e Mon Sep 17 00:00:00 2001 From: Minhyuk Kim Date: Sun, 14 Jan 2024 08:49:30 +0900 Subject: [PATCH 3/3] fix: update client_test with changed c.close() --- paho/client_test.go | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/paho/client_test.go b/paho/client_test.go index ef9367b..0f59a9b 100644 --- a/paho/client_test.go +++ b/paho/client_test.go @@ -81,7 +81,7 @@ func TestClientConnect(t *testing.T) { Conn: ts.ClientConn(), }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) cp := &Connect{ @@ -119,7 +119,7 @@ func TestClientSubscribe(t *testing.T) { Conn: ts.ClientConn(), }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) c.stop = make(chan struct{}) @@ -164,7 +164,7 @@ func TestClientUnsubscribe(t *testing.T) { Conn: ts.ClientConn(), }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) c.stop = make(chan struct{}) @@ -202,7 +202,7 @@ func TestClientPublishQoS0(t *testing.T) { Conn: ts.ClientConn(), }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) c.stop = make(chan struct{}) @@ -244,7 +244,7 @@ func TestClientPublishQoS1(t *testing.T) { Conn: ts.ClientConn(), }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) c.stop = make(chan struct{}) @@ -289,7 +289,7 @@ func TestClientPublishQoS2(t *testing.T) { Conn: ts.ClientConn(), }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) c.stop = make(chan struct{}) @@ -336,7 +336,7 @@ func TestClientReceiveQoS0(t *testing.T) { }}, }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) c.stop = make(chan struct{}) @@ -383,7 +383,7 @@ func TestClientReceiveQoS1(t *testing.T) { }}, }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) c.stop = make(chan struct{}) @@ -431,7 +431,7 @@ func TestClientReceiveQoS2(t *testing.T) { }}, }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) c.stop = make(chan struct{}) @@ -493,7 +493,7 @@ func TestClientReceiveAndAckInOrder(t *testing.T) { }}, }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) ctx := context.Background() @@ -577,7 +577,7 @@ func TestManualAcksInOrder(t *testing.T) { EnableManualAcknowledgment: true, }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) ctx := context.Background() @@ -648,7 +648,7 @@ func TestReceiveServerDisconnect(t *testing.T) { }, }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) c.stop = make(chan struct{}) @@ -689,7 +689,7 @@ func TestAuthenticate(t *testing.T) { AuthHandler: &fakeAuth{}, }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) c.stop = make(chan struct{}) @@ -788,7 +788,7 @@ func TestAuthenticateOnConnect(t *testing.T) { AuthHandler: &auther, }) require.NotNil(t, c) - defer c.close() + defer c.close(nil) c.SetDebugLogger(clientLogger) cp := &Connect{ @@ -884,7 +884,7 @@ func TestDisconnect(t *testing.T) { }) require.NotNil(t, c) c.SetDebugLogger(clientLogger) - defer c.close() + defer c.close(nil) ctx := context.Background() ca, err := c.Connect(ctx, &Connect{ @@ -945,7 +945,7 @@ func TestCloseDeadlock(t *testing.T) { for i := 0; i < routines; i++ { go func() { defer wg.Done() - c.close() + c.close(nil) }() go func() { defer wg.Done() @@ -1009,7 +1009,7 @@ func TestSendOnClosedChannel(t *testing.T) { }() time.Sleep(10 * time.Millisecond) - c.close() + c.close(nil) } func isChannelClosed(ch chan struct{}) (closed bool) {