-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Share CBS Session #88
Changes from 5 commits
f2dd0f6
5c31769
e19b267
3dacd9c
9d67746
aaa350c
c7fd471
d3e4291
436b989
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,7 +13,7 @@ | |
import queue | ||
import uuid | ||
|
||
from uamqp import address, authentication, client, constants, errors | ||
from uamqp import address, authentication, client, constants, errors, compat | ||
from uamqp.utils import get_running_loop | ||
from uamqp.async_ops.connection_async import ConnectionAsync | ||
from uamqp.async_ops.receiver_async import MessageReceiverAsync | ||
|
@@ -222,6 +222,7 @@ async def open_async(self, connection=None): | |
_logger.info("Using existing connection.") | ||
self._auth = connection.auth | ||
self._ext_connection = True | ||
await connection.lock_async() | ||
self._connection = connection or self.connection_type( | ||
self._hostname, | ||
self._auth, | ||
|
@@ -256,6 +257,8 @@ async def open_async(self, connection=None): | |
loop=self.loop) | ||
if self._keep_alive_interval: | ||
self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async(), loop=self.loop) | ||
if self._ext_connection: | ||
connection.release_async() | ||
|
||
async def close_async(self): | ||
"""Close the client asynchronously. This includes closing the Session | ||
|
@@ -857,6 +860,7 @@ async def receive_messages_async(self, on_message_received): | |
service. It takes a single argument, a ~uamqp.message.Message object. | ||
:type on_message_received: callable[~uamqp.message.Message] | ||
""" | ||
self._streaming_receive = True | ||
await self.open_async() | ||
self._message_received_callback = on_message_received | ||
receiving = True | ||
|
@@ -867,6 +871,7 @@ async def receive_messages_async(self, on_message_received): | |
receiving = False | ||
raise | ||
finally: | ||
self._streaming_receive = False | ||
if not receiving: | ||
await self.close_async() | ||
|
||
|
@@ -945,7 +950,7 @@ def receive_messages_iter_async(self, on_message_received=None): | |
:rtype: Generator[~uamqp.message.Message] | ||
""" | ||
self._message_received_callback = on_message_received | ||
self._received_messages = queue.Queue() | ||
self._received_messages = self._received_messages or compat.queue.Queue() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this assignment statement still necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Removed 😀 |
||
return AsyncMessageIter(self, auto_complete=self.auto_complete) | ||
|
||
async def redirect_async(self, redirect, auth): | ||
|
@@ -967,6 +972,7 @@ async def redirect_async(self, redirect, auth): | |
self._shutdown = False | ||
self._last_activity_timestamp = None | ||
self._was_message_received = False | ||
self._received_messages = compat.queue.Queue() | ||
|
||
self._remote_address = address.Source(redirect.address) | ||
await self._redirect_async(redirect, auth) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -236,6 +236,7 @@ def open(self, connection=None): | |
_logger.debug("Using existing connection.") | ||
self._auth = connection.auth | ||
self._ext_connection = True | ||
connection.lock() | ||
self._connection = connection or self.connection_type( | ||
self._hostname, | ||
self._auth, | ||
|
@@ -269,6 +270,8 @@ def open(self, connection=None): | |
if self._keep_alive_interval: | ||
self._keep_alive_thread = threading.Thread(target=self._keep_alive) | ||
self._keep_alive_thread.start() | ||
if self._ext_connection: | ||
connection.release() | ||
|
||
def close(self): | ||
"""Close the client. This includes closing the Session | ||
|
@@ -871,7 +874,8 @@ def __init__( | |
self._last_activity_timestamp = None | ||
self._was_message_received = False | ||
self._message_received_callback = None | ||
self._received_messages = None | ||
self._streaming_receive = False | ||
self._received_messages = compat.queue.Queue() | ||
|
||
# Receiver and Link settings | ||
self._max_message_size = kwargs.pop('max_message_size', None) or constants.MAX_MESSAGE_LENGTH_BYTES | ||
|
@@ -993,7 +997,7 @@ def _message_received(self, message): | |
self._message_received_callback(message) | ||
self._complete_message(message, self.auto_complete) | ||
|
||
if self._received_messages: | ||
if not self._streaming_receive: | ||
self._received_messages.put(message) | ||
elif not message.settled: | ||
# Message was received with callback processing and wasn't settled. | ||
|
@@ -1073,8 +1077,8 @@ def receive_messages(self, on_message_received): | |
service. It takes a single argument, a ~uamqp.message.Message object. | ||
:type on_message_received: callable[~uamqp.message.Message] | ||
""" | ||
self._streaming_receive = True | ||
self.open() | ||
self._received_messages = None | ||
self._message_received_callback = on_message_received | ||
receiving = True | ||
try: | ||
|
@@ -1084,6 +1088,7 @@ def receive_messages(self, on_message_received): | |
receiving = False | ||
raise | ||
finally: | ||
self._streaming_receive = False | ||
if not receiving: | ||
self.close() | ||
|
||
|
@@ -1097,7 +1102,7 @@ def receive_messages_iter(self, on_message_received=None): | |
:type on_message_received: callable[~uamqp.message.Message] | ||
""" | ||
self._message_received_callback = on_message_received | ||
self._received_messages = compat.queue.Queue() | ||
self._received_messages = self._received_messages or compat.queue.Queue() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this assignment statement still necessary? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you're right! it's no longer needed. |
||
return self._message_generator() | ||
|
||
def redirect(self, redirect, auth): | ||
|
@@ -1119,7 +1124,7 @@ def redirect(self, redirect, auth): | |
self._shutdown = False | ||
self._last_activity_timestamp = None | ||
self._was_message_received = False | ||
self._received_messages = None | ||
self._received_messages = compat.queue.Queue() | ||
|
||
self._remote_address = address.Source(redirect.address) | ||
self._redirect(redirect, auth) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome that these are now resolved :)
Once this is merged I think we have an issue for these that we can now close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, we have #83 tracking the issue.
also I find the receive timeout = 1s (too short) in these two tests may also be the reason for the unstability.
I extend the timeout to be 3 seconds and now the tests are more stable(all green!!) now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@annatisch
Hello Anna,
One thing I noticed when receiving messages using the
receive_message_batch
function, is that if the operation is timeout, we would setself._shutdown = True
:azure-uamqp-python/uamqp/client.py
Line 947 in 7f667c1
But we don't close or reset the state of the receiver client.
The result is that I can still call the
receive_message_batch
but it would simply return nothing, as thedo_work
function would return immediately after checking thatself._shutdown
is True.I think that behavior may seem to be wrong. Should we close the receiver here?
Is it by design or it's actually a bug.