Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share CBS Session #88

Merged
merged 9 commits into from
Aug 30, 2019
2 changes: 0 additions & 2 deletions samples/asynctests/test_azure_event_hubs_receive_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,6 @@ async def test_event_hubs_batch_receive_async(live_eventhub_config):

@pytest.mark.asyncio
async def test_event_hubs_shared_connection_async(live_eventhub_config):
pytest.skip("Unstable on OSX and Linux - need to fix") # TODO
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome that these are now resolved :)
Once this is merged I think we have an issue for these that we can now close?

Copy link
Contributor Author

@yunhaoling yunhaoling Aug 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, we have #83 tracking the issue.
also I find the receive timeout = 1s (too short) in these two tests may also be the reason for the unstability.

I extend the timeout to be 3 seconds and now the tests are more stable(all green!!) now.

Copy link
Contributor Author

@yunhaoling yunhaoling Aug 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@annatisch

Hello Anna,
One thing I noticed when receiving messages using the receive_message_batch function, is that if the operation is timeout, we would set self._shutdown = True:

self._shutdown = True

But we don't close or reset the state of the receiver client.
The result is that I can still call the receive_message_batch but it would simply return nothing, as the do_work function would return immediately after checking that self._shutdown is True.

I think that behavior may seem to be wrong. Should we close the receiver here?
Is it by design or it's actually a bug.

uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth = authentication.SASTokenAsync.from_shared_access_key(
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
Expand Down Expand Up @@ -172,7 +171,6 @@ async def receive_ten(partition, receiver):

@pytest.mark.asyncio
async def test_event_hubs_multiple_receiver_async(live_eventhub_config):
pytest.skip("Unstable on OSX and Linux - need to fix") # TODO
uri = "sb://{}/{}".format(live_eventhub_config['hostname'], live_eventhub_config['event_hub'])
sas_auth_a = authentication.SASTokenAsync.from_shared_access_key(
uri, live_eventhub_config['key_name'], live_eventhub_config['access_key'])
Expand Down
10 changes: 8 additions & 2 deletions uamqp/async_ops/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import queue
import uuid

from uamqp import address, authentication, client, constants, errors
from uamqp import address, authentication, client, constants, errors, compat
from uamqp.utils import get_running_loop
from uamqp.async_ops.connection_async import ConnectionAsync
from uamqp.async_ops.receiver_async import MessageReceiverAsync
Expand Down Expand Up @@ -222,6 +222,7 @@ async def open_async(self, connection=None):
_logger.info("Using existing connection.")
self._auth = connection.auth
self._ext_connection = True
await connection.lock_async()
self._connection = connection or self.connection_type(
self._hostname,
self._auth,
Expand Down Expand Up @@ -256,6 +257,8 @@ async def open_async(self, connection=None):
loop=self.loop)
if self._keep_alive_interval:
self._keep_alive_thread = asyncio.ensure_future(self._keep_alive_async(), loop=self.loop)
if self._ext_connection:
connection.release_async()

async def close_async(self):
"""Close the client asynchronously. This includes closing the Session
Expand Down Expand Up @@ -857,6 +860,7 @@ async def receive_messages_async(self, on_message_received):
service. It takes a single argument, a ~uamqp.message.Message object.
:type on_message_received: callable[~uamqp.message.Message]
"""
self._streaming_receive = True
await self.open_async()
self._message_received_callback = on_message_received
receiving = True
Expand All @@ -867,6 +871,7 @@ async def receive_messages_async(self, on_message_received):
receiving = False
raise
finally:
self._streaming_receive = False
if not receiving:
await self.close_async()

Expand Down Expand Up @@ -945,7 +950,7 @@ def receive_messages_iter_async(self, on_message_received=None):
:rtype: Generator[~uamqp.message.Message]
"""
self._message_received_callback = on_message_received
self._received_messages = queue.Queue()
self._received_messages = self._received_messages or compat.queue.Queue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assignment statement still necessary? self._received_messages should never be None now right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed 😀

return AsyncMessageIter(self, auto_complete=self.auto_complete)

async def redirect_async(self, redirect, auth):
Expand All @@ -967,6 +972,7 @@ async def redirect_async(self, redirect, auth):
self._shutdown = False
self._last_activity_timestamp = None
self._was_message_received = False
self._received_messages = compat.queue.Queue()

self._remote_address = address.Source(redirect.address)
await self._redirect_async(redirect, auth)
Expand Down
15 changes: 10 additions & 5 deletions uamqp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ def open(self, connection=None):
_logger.debug("Using existing connection.")
self._auth = connection.auth
self._ext_connection = True
connection.lock()
self._connection = connection or self.connection_type(
self._hostname,
self._auth,
Expand Down Expand Up @@ -269,6 +270,8 @@ def open(self, connection=None):
if self._keep_alive_interval:
self._keep_alive_thread = threading.Thread(target=self._keep_alive)
self._keep_alive_thread.start()
if self._ext_connection:
connection.release()

def close(self):
"""Close the client. This includes closing the Session
Expand Down Expand Up @@ -871,7 +874,8 @@ def __init__(
self._last_activity_timestamp = None
self._was_message_received = False
self._message_received_callback = None
self._received_messages = None
self._streaming_receive = False
self._received_messages = compat.queue.Queue()

# Receiver and Link settings
self._max_message_size = kwargs.pop('max_message_size', None) or constants.MAX_MESSAGE_LENGTH_BYTES
Expand Down Expand Up @@ -993,7 +997,7 @@ def _message_received(self, message):
self._message_received_callback(message)
self._complete_message(message, self.auto_complete)

if self._received_messages:
if not self._streaming_receive:
self._received_messages.put(message)
elif not message.settled:
# Message was received with callback processing and wasn't settled.
Expand Down Expand Up @@ -1073,8 +1077,8 @@ def receive_messages(self, on_message_received):
service. It takes a single argument, a ~uamqp.message.Message object.
:type on_message_received: callable[~uamqp.message.Message]
"""
self._streaming_receive = True
self.open()
self._received_messages = None
self._message_received_callback = on_message_received
receiving = True
try:
Expand All @@ -1084,6 +1088,7 @@ def receive_messages(self, on_message_received):
receiving = False
raise
finally:
self._streaming_receive = False
if not receiving:
self.close()

Expand All @@ -1097,7 +1102,7 @@ def receive_messages_iter(self, on_message_received=None):
:type on_message_received: callable[~uamqp.message.Message]
"""
self._message_received_callback = on_message_received
self._received_messages = compat.queue.Queue()
self._received_messages = self._received_messages or compat.queue.Queue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assignment statement still necessary? self._received_messages should never be None now right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right! it's no longer needed.
Removed 😀

return self._message_generator()

def redirect(self, redirect, auth):
Expand All @@ -1119,7 +1124,7 @@ def redirect(self, redirect, auth):
self._shutdown = False
self._last_activity_timestamp = None
self._was_message_received = False
self._received_messages = None
self._received_messages = compat.queue.Queue()

self._remote_address = address.Source(redirect.address)
self._redirect(redirect, auth)