Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added partition key param for querying change feed #13857

Merged
merged 4 commits into from
Sep 22, 2020
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
**Bug fixes**
- Fixed bug where continuation token is not honored when query_iterable is used to get results by page. Issue #13265.

**New features**
- Added support for passing partitionKey while querying changefeed. Issue #11689.

## 4.1.0 (2020-08-10)

Expand Down
4 changes: 4 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ def read_all_items(
def query_items_change_feed(
self,
partition_key_range_id=None, # type: Optional[str]
partition_key=None, # type: Optional[str]
srinathnarayanan marked this conversation as resolved.
Show resolved Hide resolved
is_start_from_beginning=False, # type: bool
continuation=None, # type: Optional[str]
max_item_count=None, # type: Optional[int]
Expand All @@ -249,6 +250,7 @@ def query_items_change_feed(

:param partition_key_range_id: ChangeFeed requests can be executed against specific partition key ranges.
This is used to process the change feed in parallel across multiple consumers.
:param partition_key: partition key at which ChangeFeed requests are targetted.
:param is_start_from_beginning: Get whether change feed should start from
beginning (true) or from current (false). By default it's start from current (false).
:param continuation: e_tag value to be used as continuation for reading change feed.
Expand All @@ -261,6 +263,8 @@ def query_items_change_feed(
response_hook = kwargs.pop('response_hook', None)
if partition_key_range_id is not None:
feed_options["partitionKeyRangeId"] = partition_key_range_id
if partition_key is not None:
feed_options["partitionKey"] = partition_key
if is_start_from_beginning is not None:
feed_options["isStartFromBeginning"] = is_start_from_beginning
if max_item_count is not None:
Expand Down
36 changes: 22 additions & 14 deletions sdk/cosmos/azure-cosmos/test/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,35 @@ def test_first_and_last_slashes_trimmed_for_query_string (self):
iter_list = list(query_iterable)
self.assertEqual(iter_list[0]['id'], 'myId')

def test_query_change_feed(self):
def test_query_change_feed_with_pk(self):
self.query_change_feed(True)

def test_query_change_feed_with_pk_range_id(self):
self.query_change_feed(False)

def query_change_feed(self, use_partition_key):
created_collection = self.config.create_multi_partition_collection_with_custom_pk_if_not_exist(self.client)
# The test targets partition #3
pkRangeId = "2"
partition_key = "pk"
partition_key_range_id = 2
partitionParam = {"partition_key": partition_key} if use_partition_key else {"partition_key_range_id": partition_key_range_id}

# Read change feed without passing any options
query_iterable = created_collection.query_items_change_feed()
iter_list = list(query_iterable)
self.assertEqual(len(iter_list), 0)

# Read change feed from current should return an empty list
query_iterable = created_collection.query_items_change_feed(partition_key_range_id=pkRangeId)
query_iterable = created_collection.query_items_change_feed(**partitionParam)
iter_list = list(query_iterable)
self.assertEqual(len(iter_list), 0)
self.assertTrue('etag' in created_collection.client_connection.last_response_headers)
self.assertNotEqual(created_collection.client_connection.last_response_headers['etag'], '')

# Read change feed from beginning should return an empty list
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
is_start_from_beginning=True
is_start_from_beginning=True,
**partitionParam
)
iter_list = list(query_iterable)
self.assertEqual(len(iter_list), 0)
Expand All @@ -79,8 +87,8 @@ def test_query_change_feed(self):
document_definition = {'pk': 'pk', 'id':'doc1'}
created_collection.create_item(body=document_definition)
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
is_start_from_beginning=True,
**partitionParam
)
iter_list = list(query_iterable)
self.assertEqual(len(iter_list), 1)
Expand All @@ -100,9 +108,9 @@ def test_query_change_feed(self):
for pageSize in [1, 100]:
# verify iterator
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
continuation=continuation2,
max_item_count=pageSize
max_item_count=pageSize,
**partitionParam
)
it = query_iterable.__iter__()
expected_ids = 'doc2.doc3.'
Expand All @@ -114,9 +122,9 @@ def test_query_change_feed(self):
# verify by_page
# the options is not copied, therefore it need to be restored
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
continuation=continuation2,
max_item_count=pageSize
max_item_count=pageSize,
**partitionParam
)
count = 0
expected_count = 2
Expand All @@ -134,8 +142,8 @@ def test_query_change_feed(self):

# verify reading change feed from the beginning
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
is_start_from_beginning=True
is_start_from_beginning=True,
**partitionParam
)
expected_ids = ['doc1', 'doc2', 'doc3']
it = query_iterable.__iter__()
Expand All @@ -147,9 +155,9 @@ def test_query_change_feed(self):

# verify reading empty change feed
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
continuation=continuation3,
is_start_from_beginning=True
is_start_from_beginning=True,
**partitionParam
)
iter_list = list(query_iterable)
self.assertEqual(len(iter_list), 0)
Expand Down