-
Notifications
You must be signed in to change notification settings - Fork 721
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add optional timeout to subscribe() #631
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank for your contribution. I've asked for some modificaation to avoid breaking change.
Also you will need to sign ECA for your contribution to be accepted: https://api.eclipse.org/git/eca/status/gh/eclipse/paho.mqtt.python/631
|
||
timeout: the timeout value after which the client disconnects from the | ||
broker. If no timeout is given, the client disconnects only | ||
after "msg_count" messages have been received. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no msg_count in the subscribe.callback()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, I got this from the simple() function. I will remove this.
else: | ||
lock.acquire() | ||
client.loop_start() | ||
lock.acquire(timeout=timeout) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lock.acquire(timeout=timeout) | |
event.wait(timeout=timeout) |
And using a threading.Event() rather than threading.Lock(). An Event seems a better fit than an Lock
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a one-on-one communication I believe that a Lock is actually modelling the relationship better. Do you have any additional use in mind e.g., an observer/debugger watching these events? Otherwise I would tend to see Lock as better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel that an Event is even better at modelling this: the finishing processing is an event and we wait for that event.
With the lock, we have two consecutive call to acquire within the same thread isn't usual way to work with a lock.
def _on_connect(client, userdata, flags, rc): | ||
"""Internal v5 callback""" | ||
_on_connect_v5(client, userdata, flags, rc, None) | ||
|
||
|
||
def _on_message_callback(client, userdata, message): | ||
"""Internal callback""" | ||
userdata['callback'](client, userdata['userdata'], message) | ||
userdata['callback'](client, userdata['userdata'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a breaking change. Existing user of subscribe.callback will be required to change the signature of their callback function to accept an additional parameter "lock".
The idea I see to avoid any breaking change and still support this feature (while simple() still just call callback()) would be to add support for a special exception "_StopSubscriber" that the user callback could raise to disconnect. The callback "_on_message_simple" could then raise that exception, this would continue to make simple() a normal user of callback().
So here we could do:
try:
userdata['callback'](client, userdata['userdata'], message)
except _StopSubscriber:
lock.release()
I suggest to name it "_StopSubscriber" because I would like to kept the existence of this special exception undocumented/unsupported for now, because I think the true solution would be to call "client.disconnect()" instead of this exception. But doing the disconnect will not work because currently the received message is acknowledged after the on_message callback so disconnecting from inside the callback will cause the acknowledge to be dropped.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I thought that since it is an internal function I could change the signature.
I could very simply determine whether userdata['lock'] is set, and if not call with the original signature instead. This would keep it compatible and would simply add the new functionality.
Using an exception here is not something I'd like to do. An exception always models something that is not part of the normal execution path, and in addition, is normally quite a bit slower. I think the approach with checking the value of 'lock' would be better. Or is there another motivation for using the exception?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not only an internal function or I did I missunderstood the code ? It's the callback passed to subscribe.callback() which is user provided.
In subscribe.simple() I agree it's internal, but we added timeout & the lock also on subscribe.callback().
I just realize we recently added a user_data_get
on client. It's a bit broken in that use-case, since it will return the "internal" userdata instead of the user-userdata (userdata["userdata"]
). But that probably solve our issue. We can access to userdata["lock"]
by using client.user_data_get()["lock"]
. This avoid any change in callback signature.
Support an optional timeout for the function subscribe() by adding a lock to communicate between the functions callback() that acquires the lock with a timeout. It thus blocks, either until the function on_message_simple() releases the lock when the requested number of messages has been received, or until the timeout has been reached.
The functionality has been tested and is in use in the project https://github.com/jbaumann/system_watchdog