-
Notifications
You must be signed in to change notification settings - Fork 47
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Share CBS Session #88
Conversation
The function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just one comment (x2) regarding the assignment statements :)
Otherwise looks good!
uamqp/client.py
Outdated
@@ -1097,7 +1102,7 @@ def receive_messages_iter(self, on_message_received=None): | |||
:type on_message_received: callable[~uamqp.message.Message] | |||
""" | |||
self._message_received_callback = on_message_received | |||
self._received_messages = compat.queue.Queue() | |||
self._received_messages = self._received_messages or compat.queue.Queue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this assignment statement still necessary? self._received_messages
should never be None
now right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right! it's no longer needed.
Removed 😀
uamqp/async_ops/client_async.py
Outdated
@@ -945,7 +950,7 @@ def receive_messages_iter_async(self, on_message_received=None): | |||
:rtype: Generator[~uamqp.message.Message] | |||
""" | |||
self._message_received_callback = on_message_received | |||
self._received_messages = queue.Queue() | |||
self._received_messages = self._received_messages or compat.queue.Queue() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this assignment statement still necessary? self._received_messages
should never be None
now right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed 😀
@@ -131,7 +131,6 @@ def on_message_received(message): | |||
|
|||
@pytest.mark.asyncio | |||
async def test_event_hubs_shared_connection_async(live_eventhub_config): | |||
pytest.skip("Unstable on OSX and Linux - need to fix") # TODO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome that these are now resolved :)
Once this is merged I think we have an issue for these that we can now close?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yup, we have #83 tracking the issue.
also I find the receive timeout = 1s (too short) in these two tests may also be the reason for the unstability.
I extend the timeout to be 3 seconds and now the tests are more stable(all green!!) now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello Anna,
One thing I noticed when receiving messages using the receive_message_batch
function, is that if the operation is timeout, we would set self._shutdown = True
:
azure-uamqp-python/uamqp/client.py
Line 947 in 7f667c1
self._shutdown = True |
But we don't close or reset the state of the receiver client.
The result is that I can still call the receive_message_batch
but it would simply return nothing, as the do_work
function would return immediately after checking that self._shutdown
is True.
I think that behavior may seem to be wrong. Should we close the receiver here?
Is it by design or it's actually a bug.
* initial _received_messages object * Use flag to control the streaming behavior * Update comments and code * release_async isn't a sync function * Add missing module * Wait too short cause the receive client to close so that the test would fail * Remove unnecessary queue assignment * remove unused import module * Update timeout as the unit is milliseconds
* Share CBS Session (#88) * initial _received_messages object * Use flag to control the streaming behavior * Update comments and code * release_async isn't a sync function * Add missing module * Wait too short cause the receive client to close so that the test would fail * Remove unnecessary queue assignment * remove unused import module * Update timeout as the unit is milliseconds * Update for 1.2.3 (#91) * review update (#97)
* Runtime metric (#95) * Init commit for desired compatibility * runtime metric init commit * Small fix of runtime receiver metric * Fix pylint error * Update according to review * update link destroy * Remove offered capabilities for now * add sample/test code * Update test * Share CBS Session (#89) * Share CBS Session (#88) * initial _received_messages object * Use flag to control the streaming behavior * Update comments and code * release_async isn't a sync function * Add missing module * Wait too short cause the receive client to close so that the test would fail * Remove unnecessary queue assignment * remove unused import module * Update timeout as the unit is milliseconds * Update for 1.2.3 (#91) * review update (#97) * Service Bus message transfer fixes (#96) * Support message delivery tag * Added headers * Removed null init * Added memory cleanup * Fix build error * Moved delivery tag to message * Cython fixes * Binary type * Attempt to set message tag * Converted to AMQP_VALUE * Syntax fixes * Build error * Renamed value * Get tag type * Fixed name * Extract tag bytes * Some C cleanup * More logging * Updated test * pylint fix * More logging * More logging * More logging * Fixed print formatting * More logging * Syntax error * TLSIO logging * Log socket error * Added sleep * Fixed sleep * Reduced sleep * Another attempt * Ping CI * Attempt to move outgoing flow * Moved send flow frame * Removed debug logging * Update link status * Fix diff * pylint fixes * Py2.7 * Updated status description * Fixed executor * Some review feedback * Performance improvement (#98) * Remove deepcopy and increase buffer size * Move parse into cython * Small fix * lazy parse * small fix * Update name * Update message property * remove unused import * put deepcopy back as it influences the performance little * Add footer setter * Update setters of message * fix bug in batch message (#99) * Bug fix, properties of message can be None type (#100) * on_transfer_received should not fail due to lack of a delivery tag (#101) * on_transfer_received should not fail due to lack of a delivery tag * Test reversing if-statements * Try initialize value * Trying to make Windows happy * Fix proxy test (#104) * Update docs (#103) * Update docs * fix typo * More typo
Initialize the
_received_messages
when in the__init__
function of the AMQP client so that received messages won't be simply dropped during the authentication process of multiple clients.@annatisch
I'm wondering whether the
self._streaming_receive
should be thread-safe or not.The situation where thread-unsafe may happen to the
_streaming_receive
variable is that both functionsreceive_message_batch
andreceive_messages
are called simultaneously.But I don't think our SDK support such kind of operation, do I miss something?