Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reset DefaultPinger to reconnect to server #228

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions paho/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ var (
ErrInvalidArguments = errors.New("invalid argument") // If included (errors.Join) in an error, there is a problem with the arguments passed. Retrying on the same connection with the same arguments will not succeed.
)

type pingerError struct {
error
}

func (pe *pingerError) Error() string {
return pe.error.Error()
}

type (
PublishReceived struct {
Packet *Publish
Expand Down Expand Up @@ -348,7 +356,7 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
defer c.workers.Done()
defer c.debug.Println("returning from ping handler worker")
if err := c.config.PingHandler.Run(c.config.Conn, keepalive); err != nil {
go c.error(fmt.Errorf("ping handler error: %w", err))
go c.error(&pingerError{fmt.Errorf("ping handler error: %w", err)})
}
}()

Expand Down Expand Up @@ -548,7 +556,7 @@ func (c *Client) incoming() {
}
}

func (c *Client) close() {
func (c *Client) close(e error) {
c.mu.Lock()
defer c.mu.Unlock()

Expand All @@ -563,8 +571,10 @@ func (c *Client) close() {
close(c.stop)

c.debug.Println("client stopped")
c.config.PingHandler.Stop()
c.debug.Println("ping stopped")
if _, ok := e.(*pingerError); !ok {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider what could happen if the connection drops when a ping is scheduled:

  1. c.error called by something else
  2. pinger exits with "failed to send PINGREQ" (having called reset)
  3. c.error call from step 1 runs and stops the refreshed pinger (meaning that all future calls to Run will fail).

I realise that the above sequence of events is pretty unlikely but I believe it's possible (and it would be pretty hard to trace!). I think it's preferable for the reset to happen in Run meaning Run will work for a new DefaultPinger or following a clean shutdown (the user must always wait for Run to terminate before calling it again). This should be documented in the interface to make it clear that the pinger is reusable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I think that scenario is possible too. I'll think about it more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MattBrittan What about adding Reset() to Pinger interface like this

type Pinger interface {
	// Run() starts the pinger. It blocks until the pinger is stopped.
	// If the pinger stops due to an error, it returns the error.
	// If the keepAlive is 0, it returns nil immediately.
	// Run() must be called only once.
	Run(conn net.Conn, keepAlive uint16) error

	// Stop() gracefully stops the pinger.
	Stop()

	// Reset() resets the pinger to be reusable.
	Reset()

	// PacketSent() is called when a packet is sent to the server.
	PacketSent()

	// PingResp() is called when a PINGRESP is received from the server.
	PingResp()

	// SetDebug() sets the logger for debugging.
	// It is not thread-safe and must be called before Run() to avoid race conditions.
	SetDebug(log.Logger)
}

and call it right after closing client?

func (c *Client) close() {
	c.mu.Lock()
	defer c.mu.Unlock()

	defer c.config.PingHandler.Reset()

	select {
	case <-c.stop:
		// already shutting down, return when shutdown complete
		<-c.done
		return
	default:
	}

	close(c.stop)

	c.debug.Println("client stopped")
	c.config.PingHandler.Stop()
	c.debug.Println("ping stopped")
	_ = c.config.Conn.Close()
	c.debug.Println("conn closed")
	c.acksTracker.reset()
	c.debug.Println("acks tracker reset")
	c.config.Session.ConnectionLost(nil)
	if c.config.autoCloseSession {
		if err := c.config.Session.Close(); err != nil {
			c.errors.Println("error closing session", err)
		}
	}
	c.debug.Println("session updated, waiting on workers")
	c.workers.Wait()
	c.debug.Println("workers done")
	close(c.done)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I see with this is "what should the Pinger do if Reset() is called whilst the pinger is running". We may just say "this can't happen" (probably a mistake!) but then there is no real benefit to having a separate function (you can effectively call Reset from within Run). If we agree that Run could conceivably be called before Stop completes then the question becomes "what can you do about it" (only think I can come up with is to stop the old one and log a message).

As such I don't think there is much value in adding Reset and it's probably simplest/safest if Run:

  • Checks if another Run is active and, if so:
    • Stop it (Run should return an error so it will be logged etc). Run may need to wait for Run to terminate.
  • Reset things
  • Start the new process

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry - ran out of time today so will try to have a look tomorrow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MattBrittan No problem.
Are we 100% sure that Run() will always be called before Stop()?
In this scenario,

  1. c.close() is called right after c.Connect()
  2. Stop() is called before calling Run() in gorutine

I think it can be possible because we don't wait for calling Run() in c.Connect(), although it will always never happen.
To be 100% sure, then I guess we need to separate Run() to things like Start() and Wait() like below, so that c.Connect() can wait for Start() of PingHandler.

// client.go
func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
	...
        pingerWait := make(chan struct{})
	c.debug.Println("received CONNACK, starting PingHandler")
	c.workers.Add(1)
	go func() {
		defer c.workers.Done()
		defer c.debug.Println("returning from ping handler worker")
		if err := c.config.PingHandler.Start(c.config.Conn, keepalive); err != nil {
			...
		}
		close(pingerWait)
		if err:= c.config.PingHandler.Wait(); err != nil {
			go c.error(fmt.Errorf("ping handler error: %w", err))
		}
	}()
	...

	<- pingerWait

	return ca, nil	
}

// pinger.go
func (p *DefaultPinger) Start(conn net.Conn, keepAlive uint16) error {
	...	
}
func (p *DefaultPinger) Wait() error {
	return <-p.errChan
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I had been wondering if it would make more sense to pass Run a Context and use that for termination (was going to mock this up today but ran out of time).

c.config.PingHandler.Stop()
c.debug.Println("ping stopped")
}
_ = c.config.Conn.Close()
c.debug.Println("conn closed")
c.acksTracker.reset()
Expand All @@ -587,12 +597,12 @@ func (c *Client) close() {
// It also closes the client network connection.
func (c *Client) error(e error) {
c.debug.Println("error called:", e)
c.close()
c.close(e)
go c.config.OnClientError(e)
}

func (c *Client) serverDisconnect(d *Disconnect) {
c.close()
c.close(nil)
c.debug.Println("calling OnServerDisconnect")
go c.config.OnServerDisconnect(d)
}
Expand Down Expand Up @@ -973,7 +983,7 @@ func (c *Client) Disconnect(d *Disconnect) error {
c.debug.Println("disconnecting", d)
_, err := d.Packet().WriteTo(c.config.Conn)

c.close()
c.close(nil)

return err
}
Expand Down
34 changes: 17 additions & 17 deletions paho/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestClientConnect(t *testing.T) {
Conn: ts.ClientConn(),
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

cp := &Connect{
Expand Down Expand Up @@ -119,7 +119,7 @@ func TestClientSubscribe(t *testing.T) {
Conn: ts.ClientConn(),
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

c.stop = make(chan struct{})
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestClientUnsubscribe(t *testing.T) {
Conn: ts.ClientConn(),
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

c.stop = make(chan struct{})
Expand Down Expand Up @@ -202,7 +202,7 @@ func TestClientPublishQoS0(t *testing.T) {
Conn: ts.ClientConn(),
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

c.stop = make(chan struct{})
Expand Down Expand Up @@ -244,7 +244,7 @@ func TestClientPublishQoS1(t *testing.T) {
Conn: ts.ClientConn(),
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

c.stop = make(chan struct{})
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestClientPublishQoS2(t *testing.T) {
Conn: ts.ClientConn(),
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

c.stop = make(chan struct{})
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestClientReceiveQoS0(t *testing.T) {
}},
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

c.stop = make(chan struct{})
Expand Down Expand Up @@ -383,7 +383,7 @@ func TestClientReceiveQoS1(t *testing.T) {
}},
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

c.stop = make(chan struct{})
Expand Down Expand Up @@ -431,7 +431,7 @@ func TestClientReceiveQoS2(t *testing.T) {
}},
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

c.stop = make(chan struct{})
Expand Down Expand Up @@ -493,7 +493,7 @@ func TestClientReceiveAndAckInOrder(t *testing.T) {
}},
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

ctx := context.Background()
Expand Down Expand Up @@ -577,7 +577,7 @@ func TestManualAcksInOrder(t *testing.T) {
EnableManualAcknowledgment: true,
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

ctx := context.Background()
Expand Down Expand Up @@ -648,7 +648,7 @@ func TestReceiveServerDisconnect(t *testing.T) {
},
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

c.stop = make(chan struct{})
Expand Down Expand Up @@ -689,7 +689,7 @@ func TestAuthenticate(t *testing.T) {
AuthHandler: &fakeAuth{},
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

c.stop = make(chan struct{})
Expand Down Expand Up @@ -788,7 +788,7 @@ func TestAuthenticateOnConnect(t *testing.T) {
AuthHandler: &auther,
})
require.NotNil(t, c)
defer c.close()
defer c.close(nil)
c.SetDebugLogger(clientLogger)

cp := &Connect{
Expand Down Expand Up @@ -884,7 +884,7 @@ func TestDisconnect(t *testing.T) {
})
require.NotNil(t, c)
c.SetDebugLogger(clientLogger)
defer c.close()
defer c.close(nil)

ctx := context.Background()
ca, err := c.Connect(ctx, &Connect{
Expand Down Expand Up @@ -945,7 +945,7 @@ func TestCloseDeadlock(t *testing.T) {
for i := 0; i < routines; i++ {
go func() {
defer wg.Done()
c.close()
c.close(nil)
}()
go func() {
defer wg.Done()
Expand Down Expand Up @@ -1009,7 +1009,7 @@ func TestSendOnClosedChannel(t *testing.T) {
}()

time.Sleep(10 * time.Millisecond)
c.close()
c.close(nil)
}

func isChannelClosed(ch chan struct{}) (closed bool) {
Expand Down
18 changes: 17 additions & 1 deletion paho/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,11 @@ func (p *DefaultPinger) Run(conn net.Conn, keepAlive uint16) error {
p.timer = time.AfterFunc(0, p.sendPingreq) // Immediately send first pingreq
p.mu.Unlock()

return <-p.errChan
err := <-p.errChan

p.reset()

return err
}

func (p *DefaultPinger) Stop() {
Expand Down Expand Up @@ -172,6 +176,18 @@ func (p *DefaultPinger) sendPingreq() {
}
}

func (p *DefaultPinger) reset() {
p.mu.Lock()
defer p.mu.Unlock()
p.timer = nil
p.previousPingAcked = make(chan struct{}, 1)
p.previousPingAcked <- struct{}{} // initial value
p.done = make(chan struct{})
p.errChan = make(chan error, 1)
p.ackReceived = make(chan struct{}, 1)
p.stopOnce = sync.Once{}
}

func (p *DefaultPinger) stop(err error) {
p.mu.Lock()
defer p.mu.Unlock()
Expand Down