Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New pinger implementation #222

Merged
merged 4 commits into from
Jan 13, 2024
Merged

Conversation

vishnureddy17
Copy link
Contributor

close #137

@vishnureddy17
Copy link
Contributor Author

Looks like this change uncovered a potential deadlock in client_test.go. This may be an issue with paho itself or it might just be the way the test is written. I'll make a write-up explaining the issue soon.

@vishnureddy17
Copy link
Contributor Author

Ok, so the issue is really the way the test server is written. I'll work on including a fix to that in this PR as well.

The test server does packet reads and packet writes in the same thread/goroutine. Furthermore, the conn created by net.Pipe() is unbuffered. This can result in a deadlock when the test server is trying to write a packet and the incoming() goroutine in the client is also trying to write a packet. Both the server and the client are trying to write and neither side is reading, resulting in a deadlock, as the conn is unbuffered.

As an example, the logs below show that the test server is stuck on writing a PINGRESP and the incoming() goroutine is stuck on writing a PUBREL:

=== RUN   TestClientPublishQoS2
    test.go:38: 2024-01-09T08:18:02.521885195-05:00 ClientPublishQoS2: no session present - cleaning session state
    test.go:38: 2024-01-09T08:18:02.521955196-05:00 ClientPublishQoS2: State.clean() called
    test.go:48: 2024-01-09T08:18:02.521978196-05:00 ClientPublishQoS2:0 inflight transactions upon connection
    test.go:48: 2024-01-09T08:18:02.522003096-05:00 ClientPublishQoS2:retransmitting 0 messages
    test.go:48: 2024-01-09T08:18:02.522033496-05:00 ClientPublishQoS2:sending message to test/2
    test.go:38: 2024-01-09T08:18:02.522061596-05:00 ClientPublishQoS2: sending QoS12 message
    test.go:38: 2024-01-09T08:18:02.522107896-05:00 TestServer: test server received a control packet: PUBLISH
    test.go:38: 2024-01-09T08:18:02.522130696-05:00 TestServer: received PUBLISH: PacketID:1 QOS:2 Topic:test/2 Duplicate:false Retain:false Payload:
        test payload
        Properties
        
    test.go:38: 2024-01-09T08:18:02.522193896-05:00 TestServer: sending pubrec
    test.go:38: 2024-01-09T08:18:02.522250896-05:00 TestServer: sent pubrec
    test.go:38: 2024-01-09T08:18:02.522141096-05:00 ClientPublishQoS2: sendPingreq() sending PINGREQ packet
    test.go:38: 2024-01-09T08:18:02.522325796-05:00 TestServer: test server received a control packet: PINGREQ
    test.go:38: 2024-01-09T08:18:02.522334796-05:00 TestServer: test server sending pingresp
    test.go:38: 2024-01-09T08:18:02.522340496-05:00 ClientPublishQoS2: PacketSent() resetting timer
    test.go:38: 2024-01-09T08:18:02.522410996-05:00 ClientPublishQoS2: received PUBREC packet with id  1
    test.go:38: 2024-01-09T08:18:02.522443396-05:00 ClientPublishQoS2: sending PUBREL for 1
    test.go:38: 2024-01-09T08:18:02.522470896-05:00 ClientPublishQoS2: sendPingreq() sent PINGREQ packet, waiting for PINGRESP
    test.go:38: 2024-01-09T08:18:02.522512396-05:00 ClientPublishQoS2: PacketSent() resetting timer
panic: test timed out after 30s
running tests:
        TestClientPublishQoS2 (30s)

goroutine 50 [running]:
testing.(*M).startAlarm.func1()
        /usr/local/go/src/testing/testing.go:2259 +0x3b9
created by time.goFunc
        /usr/local/go/src/time/sleep.go:176 +0x2d

goroutine 1 [chan receive]:
testing.(*T).Run(0xc00008aea0, {0x75b57f?, 0x51797c?}, 0x788ea0)
        /usr/local/go/src/testing/testing.go:1649 +0x3c8
testing.runTests.func1(0x9df720?)
        /usr/local/go/src/testing/testing.go:2054 +0x3e
testing.tRunner(0xc00008aea0, 0xc00014dc48)
        /usr/local/go/src/testing/testing.go:1595 +0xff
testing.runTests(0xc0000aae60?, {0x9d9a40, 0x1a, 0x1a}, {0x41829f?, 0xc00014dd08?, 0x9dee40?})
        /usr/local/go/src/testing/testing.go:2052 +0x445
testing.(*M).Run(0xc0000aae60)
        /usr/local/go/src/testing/testing.go:1925 +0x636
main.main()
        _testmain.go:97 +0x19c

goroutine 34 [select]:
github.com/eclipse/paho.golang/paho.(*Client).publishQoS12(0xc000160ea0, {0x7de7b8, 0xa0d560}, 0xc0000b6e80, {0x1?})
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/client.go:871 +0x345
github.com/eclipse/paho.golang/paho.(*Client).PublishWithOptions(0xc000160ea0, {0x7de7b8, 0xa0d560}, 0xc0000b6e40, {0xc0000a0d60?})
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/client.go:840 +0x22e
github.com/eclipse/paho.golang/paho.(*Client).Publish(...)
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/client.go:784
github.com/eclipse/paho.golang/paho.TestClientPublishQoS2(0xc000160d00)
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/client_test.go:308 +0x609
testing.tRunner(0xc000160d00, 0x788ea0)
        /usr/local/go/src/testing/testing.go:1595 +0xff
created by testing.(*T).Run in goroutine 1
        /usr/local/go/src/testing/testing.go:1648 +0x3ad

goroutine 35 [select]:
net.(*pipe).write(0xc0000e0480, {0xc0000da940, 0x2, 0x40})
        /usr/local/go/src/net/pipe.go:194 +0x305
net.(*pipe).Write(0x7f8dc0399d90?, {0xc0000da940?, 0xc0000e0480?, 0x9df720?})
        /usr/local/go/src/net/pipe.go:174 +0x1c
net.(*Buffers).WriteTo(0xc0000a46c0, {0x7f8dc03b4358?, 0xc0000e0480?})
        /usr/local/go/src/net/net.go:726 +0xb9
github.com/eclipse/paho.golang/packets.(*ControlPacket).WriteTo(0xc0000d2b00, {0x7f8dc03b4358, 0xc0000e0480})
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/packets/packets.go:345 +0x346
github.com/eclipse/paho.golang/internal/basictestserver.(*TestServer).Run(0xc0000e0400)
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/internal/basictestserver/server.go:166 +0x145d
created by github.com/eclipse/paho.golang/paho.TestClientPublishQoS2 in goroutine 34
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/client_test.go:278 +0x2a5

goroutine 36 [select]:
net.(*pipe).write(0xc0000e0500, {0xc0000daa80, 0x2, 0x40})
        /usr/local/go/src/net/pipe.go:194 +0x305
net.(*pipe).Write(0x6eb420?, {0xc0000daa80?, 0xc0000e0500?, 0x9df720?})
        /usr/local/go/src/net/pipe.go:174 +0x1c
net.(*Buffers).WriteTo(0xc0000a4738, {0x7f8dc03b4358?, 0xc0000e0500?})
        /usr/local/go/src/net/net.go:726 +0xb9
github.com/eclipse/paho.golang/packets.(*ControlPacket).WriteTo(0xc000194cd0, {0x7f8dc03b4358, 0xc0000e0500})
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/packets/packets.go:345 +0x346
github.com/eclipse/paho.golang/packets.(*Pubrel).WriteTo(0xc0000a4510?, {0x7f8dc03b4358?, 0xc0000e0500?})
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/packets/pubrel.go:76 +0x56
github.com/eclipse/paho.golang/paho/session/state.(*State).PacketReceived(0xc0000c62c0, 0xc0000d2900, 0x0?)
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/session/state/state.go:537 +0x62c
github.com/eclipse/paho.golang/paho.(*Client).incoming(0xc000160ea0)
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/client.go:507 +0x2e8
github.com/eclipse/paho.golang/paho.TestClientPublishQoS2.func1()
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/client_test.go:294 +0x52
created by github.com/eclipse/paho.golang/paho.TestClientPublishQoS2 in goroutine 34
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/client_test.go:292 +0x48d

goroutine 37 [chan receive]:
github.com/eclipse/paho.golang/paho.(*DefaultPinger).Run(0xc000144690, {0x7dffd8?, 0xc0000e0500?}, 0x0?)
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/pinger.go:78 +0x196
github.com/eclipse/paho.golang/paho.TestClientPublishQoS2.func2()
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/client_test.go:298 +0x6c
created by github.com/eclipse/paho.golang/paho.TestClientPublishQoS2 in goroutine 34
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/client_test.go:296 +0x4cf

goroutine 5 [select]:
github.com/eclipse/paho.golang/paho.(*DefaultPinger).sendPingreq(0xc000144690)
        /home/vishnureddy/projects/paho-pinger-fix/paho.golang/paho/pinger.go:136 +0x2c5
created by time.goFunc
        /usr/local/go/src/time/sleep.go:176 +0x2d
exit status 2
FAIL    github.com/eclipse/paho.golang/paho     30.006s

Open question: should the paho client avoid doing writes in the incoming() goroutine? This would potentially be more performant (in the case where the connection is full-duplex) and safer, but the current way the client is implemented is unlikely to cause major issues.

cc @MattBrittan

@MattBrittan
Copy link
Contributor

Note that, while basictestserver uses net.Pipe, testserver uses a different technique to avoid similar issues to the one you found (spent ages tracing an issue...). Might be worth moving basictestserver over to the alternate approach.

Will try to have a look at the rest of your comments today (but realistically it may be later in the week).

@MattBrittan
Copy link
Contributor

MattBrittan commented Jan 10, 2024

Open question: should the paho client avoid doing writes in the incoming() goroutine? This would potentially be more performant (in the case where the connection is full-duplex) and safer, but the current way the client is implemented is unlikely to cause major issues.

I've not really seen any issues with this (other than those caused by net.Pipe :-) ). My feeling is that the simplicity of the current approach is an important consideration; separating writes would complicate things a bit so I think we would need to establish that it does improve performance/safety (for standard net.Conn I believe the current approach is safe but may well have missed something). I'll do some more on this project tomorrow (have been doing some tidying on the python project) as I'd like to have a release candidate by the end of the week (so I can run it myself for a few weeks before releasing it). I think we are just about there...

One other thing - I'll commit PR #221 tomorrow; this will lead to the need for some changes in your PR (alternatively I can commit your PR first if it's ready). #221 comes out of a few of the discussions with you; having the config public makes it far too easy to introduce race conditions...

@vishnureddy17 vishnureddy17 marked this pull request as ready for review January 11, 2024 16:26
@vishnureddy17 vishnureddy17 marked this pull request as draft January 11, 2024 16:31
@vishnureddy17
Copy link
Contributor Author

Converted to draft because I need to deal with the merge conflict.

@vishnureddy17
Copy link
Contributor Author

Sorry for the delay, this is now ready for review @MattBrittan

@@ -340,15 +340,15 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
c.serverProps.SharedSubAvailable = ca.Properties.SharedSubAvailable
}

if keepalive > 0 { // "Keep Alive value of 0 has the effect of turning off..."
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the keepalive check as the Pinger Run() method is supposed to return immediately if given a keepalive of 0, so this check is unnecessary.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense - the check was put there because the previous pinger panicked on a 0 keepalive.

pingOutstanding int32
debug log.Logger
// DefaultPinger is the default implementation of Pinger.
type DefaultPinger struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Renamed PingHandler to DefaultPinger, I think this is a more descriptive name.

// If the pinger stops due to an error, it returns the error.
// If the keepAlive is 0, it returns nil immediately.
// Run() must be called only once.
Run(conn net.Conn, keepAlive uint16) error
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm taking in a uint16 for a keepalive value rather than a time.Duration. I think this makes more sense as it is closer to the MQTT spec, and I feel that conversion to time.Duration should be an internal implementation detail of something that implements the Pinger interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth naming the parameter keepAliveSeconds to ensure this is clear.

@vishnureddy17
Copy link
Contributor Author

An important thing to note is that this PR doesn't modify the SessionManager to be able to call PacketSent() when it does a write.

Copy link
Contributor

@MattBrittan MattBrittan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great - thanks very much!

I've made a few comments (opinionated nit picking :-) ) but am going to go ahead and commit this now (will then start running the code in a non-critical production role)

PingHandler Pinger
AuthHandler Auther
PingHandler Pinger
defaultPinger bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this needed? (it's set but never read; I suspect it was used in a previous iteration?)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops. This was supposed to be used to determine whether the client's SetDebugLogger() should call SetDebug() on the pinger. So right now, the Pingers logger will won't be set if the user is relying on the default one. This would be a very small fix, so could be tacked on to some other PR.

@@ -340,15 +340,15 @@ func (c *Client) Connect(ctx context.Context, cp *Connect) (*Connack, error) {
c.serverProps.SharedSubAvailable = ca.Properties.SharedSubAvailable
}

if keepalive > 0 { // "Keep Alive value of 0 has the effect of turning off..."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense - the check was put there because the previous pinger panicked on a 0 keepalive.

go func() {
defer c.workers.Done()
defer c.debug.Println("returning from ping handler worker")
if err := c.config.PingHandler.Run(c.config.Conn, keepalive); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach - really easy to see what is happening.

case <-p.ackReceived:
p.previousPingAcked <- struct{}{}
p.debug.Println("sendPingreq() returning after receiving PINGRESP")
case <-pingrespTimeout.C:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would generally use time.After in this case. NewTimer is preferable if you are calling .Stop (to release the timer resources; important if you are doing this in a loop). However in this case there will only be one pingrespTimeout in use at a time so I don't think it's really an issue.

// If the pinger stops due to an error, it returns the error.
// If the keepAlive is 0, it returns nil immediately.
// Run() must be called only once.
Run(conn net.Conn, keepAlive uint16) error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

might be worth naming the parameter keepAliveSeconds to ensure this is clear.

@MattBrittan MattBrittan merged commit e379c50 into eclipse-paho:master Jan 13, 2024
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

PingHandler will not time time out
2 participants