Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Share CBS Session #88

Merged
merged 9 commits into from
Aug 30, 2019
Merged

Conversation

yunhaoling
Copy link
Contributor

Initialize the _received_messages when in the __init__ function of the AMQP client so that received messages won't be simply dropped during the authentication process of multiple clients.


@annatisch
I'm wondering whether the self._streaming_receive should be thread-safe or not.

The situation where thread-unsafe may happen to the _streaming_receive variable is that both functions receive_message_batch and receive_messages are called simultaneously.

But I don't think our SDK support such kind of operation, do I miss something?

@yunhaoling yunhaoling requested a review from annatisch August 19, 2019 03:10
@yunhaoling
Copy link
Contributor Author

The function release_async in connection_async.py isn't a co-routine function.
The name is a bit misleading as we may think release-async is an async function and call it by await release_async().

@yunhaoling yunhaoling requested a review from YijunXieMS August 22, 2019 23:26
Copy link
Member

@annatisch annatisch left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just one comment (x2) regarding the assignment statements :)
Otherwise looks good!

uamqp/client.py Outdated
@@ -1097,7 +1102,7 @@ def receive_messages_iter(self, on_message_received=None):
:type on_message_received: callable[~uamqp.message.Message]
"""
self._message_received_callback = on_message_received
self._received_messages = compat.queue.Queue()
self._received_messages = self._received_messages or compat.queue.Queue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assignment statement still necessary? self._received_messages should never be None now right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right! it's no longer needed.
Removed 😀

@@ -945,7 +950,7 @@ def receive_messages_iter_async(self, on_message_received=None):
:rtype: Generator[~uamqp.message.Message]
"""
self._message_received_callback = on_message_received
self._received_messages = queue.Queue()
self._received_messages = self._received_messages or compat.queue.Queue()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this assignment statement still necessary? self._received_messages should never be None now right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed 😀

@@ -131,7 +131,6 @@ def on_message_received(message):

@pytest.mark.asyncio
async def test_event_hubs_shared_connection_async(live_eventhub_config):
pytest.skip("Unstable on OSX and Linux - need to fix") # TODO
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome that these are now resolved :)
Once this is merged I think we have an issue for these that we can now close?

Copy link
Contributor Author

@yunhaoling yunhaoling Aug 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yup, we have #83 tracking the issue.
also I find the receive timeout = 1s (too short) in these two tests may also be the reason for the unstability.

I extend the timeout to be 3 seconds and now the tests are more stable(all green!!) now.

Copy link
Contributor Author

@yunhaoling yunhaoling Aug 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@annatisch

Hello Anna,
One thing I noticed when receiving messages using the receive_message_batch function, is that if the operation is timeout, we would set self._shutdown = True:

self._shutdown = True

But we don't close or reset the state of the receiver client.
The result is that I can still call the receive_message_batch but it would simply return nothing, as the do_work function would return immediately after checking that self._shutdown is True.

I think that behavior may seem to be wrong. Should we close the receiver here?
Is it by design or it's actually a bug.

@yunhaoling yunhaoling requested a review from annatisch August 29, 2019 03:58
@yunhaoling yunhaoling merged commit 419fe90 into Azure:share-cbs-session Aug 30, 2019
yunhaoling added a commit to yunhaoling/azure-uamqp-python that referenced this pull request Sep 25, 2019
* initial _received_messages object

* Use flag to control the streaming behavior

* Update comments and code

* release_async isn't a sync function

* Add missing module

* Wait too short cause the receive client to close so that the test would fail

* Remove unnecessary queue assignment

* remove unused import module

* Update timeout as the unit is milliseconds
annatisch pushed a commit that referenced this pull request Sep 30, 2019
* Share CBS Session (#88)

* initial _received_messages object

* Use flag to control the streaming behavior

* Update comments and code

* release_async isn't a sync function

* Add missing module

* Wait too short cause the receive client to close so that the test would fail

* Remove unnecessary queue assignment

* remove unused import module

* Update timeout as the unit is milliseconds

* Update for 1.2.3 (#91)

* review update (#97)
yunhaoling added a commit that referenced this pull request Oct 5, 2019
* Runtime metric (#95)

* Init commit for desired compatibility

* runtime metric init commit

* Small fix of runtime receiver metric

* Fix pylint error

* Update according to review

* update link destroy

* Remove offered capabilities for now

* add sample/test code

* Update test

* Share CBS Session (#89)

* Share CBS Session (#88)

* initial _received_messages object

* Use flag to control the streaming behavior

* Update comments and code

* release_async isn't a sync function

* Add missing module

* Wait too short cause the receive client to close so that the test would fail

* Remove unnecessary queue assignment

* remove unused import module

* Update timeout as the unit is milliseconds

* Update for 1.2.3 (#91)

* review update (#97)

* Service Bus message transfer fixes (#96)

* Support message delivery tag

* Added headers

* Removed null init

* Added memory cleanup

* Fix build error

* Moved delivery tag to message

* Cython fixes

* Binary type

* Attempt to set message tag

* Converted to AMQP_VALUE

* Syntax fixes

* Build error

* Renamed value

* Get tag type

* Fixed name

* Extract tag bytes

* Some C cleanup

* More logging

* Updated test

* pylint fix

* More logging

* More logging

* More logging

* Fixed print formatting

* More logging

* Syntax error

* TLSIO logging

* Log socket error

* Added sleep

* Fixed sleep

* Reduced sleep

* Another attempt

* Ping CI

* Attempt to move outgoing flow

* Moved send flow frame

* Removed debug logging

* Update link status

* Fix diff

* pylint fixes

* Py2.7

* Updated status description

* Fixed executor

* Some review feedback

* Performance improvement (#98)

* Remove deepcopy and increase buffer size

* Move parse into cython

* Small fix

* lazy parse

* small fix

* Update name

* Update message property

* remove unused import

* put deepcopy back as it influences the performance little

* Add footer setter

* Update setters of message

* fix bug in batch message (#99)

* Bug fix, properties of message can be None type (#100)

* on_transfer_received should not fail due to lack of a delivery tag (#101)

* on_transfer_received should not fail due to lack of a delivery tag

* Test reversing if-statements

* Try initialize value

* Trying to make Windows happy

* Fix proxy test (#104)

* Update docs (#103)

* Update docs

* fix typo

* More typo
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants