-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New pinger implementation #222
Conversation
Looks like this change uncovered a potential deadlock in |
Ok, so the issue is really the way the test server is written. I'll work on including a fix to that in this PR as well. The test server does packet reads and packet writes in the same thread/goroutine. Furthermore, the conn created by As an example, the logs below show that the test server is stuck on writing a PINGRESP and the
Open question: should the paho client avoid doing writes in the cc @MattBrittan |
Note that, while basictestserver uses Will try to have a look at the rest of your comments today (but realistically it may be later in the week). |
I've not really seen any issues with this (other than those caused by One other thing - I'll commit PR #221 tomorrow; this will lead to the need for some changes in your PR (alternatively I can commit your PR first if it's ready). #221 comes out of a few of the discussions with you; having the config public makes it far too easy to introduce race conditions... |
Converted to draft because I need to deal with the merge conflict. |
d63e9bf
to
a6d2894
Compare
Sorry for the delay, this is now ready for review @MattBrittan |
@@ -340,15 +340,15 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { | |||
c.serverProps.SharedSubAvailable = ca.Properties.SharedSubAvailable | |||
} | |||
|
|||
if keepalive > 0 { // "Keep Alive value of 0 has the effect of turning off..." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the keepalive check as the Pinger Run() method is supposed to return immediately if given a keepalive of 0, so this check is unnecessary.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense - the check was put there because the previous pinger panicked on a 0 keepalive.
pingOutstanding int32 | ||
debug log.Logger | ||
// DefaultPinger is the default implementation of Pinger. | ||
type DefaultPinger struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed PingHandler
to DefaultPinger
, I think this is a more descriptive name.
// If the pinger stops due to an error, it returns the error. | ||
// If the keepAlive is 0, it returns nil immediately. | ||
// Run() must be called only once. | ||
Run(conn net.Conn, keepAlive uint16) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm taking in a uint16
for a keepalive value rather than a time.Duration
. I think this makes more sense as it is closer to the MQTT spec, and I feel that conversion to time.Duration
should be an internal implementation detail of something that implements the Pinger
interface.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth naming the parameter keepAliveSeconds to ensure this is clear.
An important thing to note is that this PR doesn't modify the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks great - thanks very much!
I've made a few comments (opinionated nit picking :-) ) but am going to go ahead and commit this now (will then start running the code in a non-critical production role)
PingHandler Pinger | ||
AuthHandler Auther | ||
PingHandler Pinger | ||
defaultPinger bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this needed? (it's set but never read; I suspect it was used in a previous iteration?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops. This was supposed to be used to determine whether the client's SetDebugLogger()
should call SetDebug()
on the pinger. So right now, the Pinger
s logger will won't be set if the user is relying on the default one. This would be a very small fix, so could be tacked on to some other PR.
@@ -340,15 +340,15 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) { | |||
c.serverProps.SharedSubAvailable = ca.Properties.SharedSubAvailable | |||
} | |||
|
|||
if keepalive > 0 { // "Keep Alive value of 0 has the effect of turning off..." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense - the check was put there because the previous pinger panicked on a 0 keepalive.
go func() { | ||
defer c.workers.Done() | ||
defer c.debug.Println("returning from ping handler worker") | ||
if err := c.config.PingHandler.Run(c.config.Conn, keepalive); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like this approach - really easy to see what is happening.
case <-p.ackReceived: | ||
p.previousPingAcked <- struct{}{} | ||
p.debug.Println("sendPingreq() returning after receiving PINGRESP") | ||
case <-pingrespTimeout.C: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would generally use time.After
in this case. NewTimer
is preferable if you are calling .Stop
(to release the timer resources; important if you are doing this in a loop). However in this case there will only be one pingrespTimeout in use at a time so I don't think it's really an issue.
// If the pinger stops due to an error, it returns the error. | ||
// If the keepAlive is 0, it returns nil immediately. | ||
// Run() must be called only once. | ||
Run(conn net.Conn, keepAlive uint16) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
might be worth naming the parameter keepAliveSeconds to ensure this is clear.
close #137