Skip to content

Commit

Permalink
Added partition key param for querying change feed (#13857)
Browse files Browse the repository at this point in the history
* initia; changes for partitionkey for query changefeed

* Added test

* updated changelog

* moved partition_key to kwargs
  • Loading branch information
Srinath Narayanan authored Sep 22, 2020
1 parent 695bd8f commit 12ff1b2
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 14 deletions.
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
**Bug fixes**
- Fixed bug where continuation token is not honored when query_iterable is used to get results by page. Issue #13265.

**New features**
- Added support for passing partitionKey while querying changefeed. Issue #11689.

## 4.1.0 (2020-08-10)

Expand Down
4 changes: 4 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ def query_items_change_feed(
:param partition_key_range_id: ChangeFeed requests can be executed against specific partition key ranges.
This is used to process the change feed in parallel across multiple consumers.
:param partition_key: partition key at which ChangeFeed requests are targetted.
:param is_start_from_beginning: Get whether change feed should start from
beginning (true) or from current (false). By default it's start from current (false).
:param continuation: e_tag value to be used as continuation for reading change feed.
Expand All @@ -261,6 +262,9 @@ def query_items_change_feed(
response_hook = kwargs.pop('response_hook', None)
if partition_key_range_id is not None:
feed_options["partitionKeyRangeId"] = partition_key_range_id
partition_key = kwargs.pop("partitionKey", None)
if partition_key is not None:
feed_options["partitionKey"] = partition_key
if is_start_from_beginning is not None:
feed_options["isStartFromBeginning"] = is_start_from_beginning
if max_item_count is not None:
Expand Down
36 changes: 22 additions & 14 deletions sdk/cosmos/azure-cosmos/test/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,27 +47,35 @@ def test_first_and_last_slashes_trimmed_for_query_string (self):
iter_list = list(query_iterable)
self.assertEqual(iter_list[0]['id'], 'myId')

def test_query_change_feed(self):
def test_query_change_feed_with_pk(self):
self.query_change_feed(True)

def test_query_change_feed_with_pk_range_id(self):
self.query_change_feed(False)

def query_change_feed(self, use_partition_key):
created_collection = self.config.create_multi_partition_collection_with_custom_pk_if_not_exist(self.client)
# The test targets partition #3
pkRangeId = "2"
partition_key = "pk"
partition_key_range_id = 2
partitionParam = {"partition_key": partition_key} if use_partition_key else {"partition_key_range_id": partition_key_range_id}

# Read change feed without passing any options
query_iterable = created_collection.query_items_change_feed()
iter_list = list(query_iterable)
self.assertEqual(len(iter_list), 0)

# Read change feed from current should return an empty list
query_iterable = created_collection.query_items_change_feed(partition_key_range_id=pkRangeId)
query_iterable = created_collection.query_items_change_feed(**partitionParam)
iter_list = list(query_iterable)
self.assertEqual(len(iter_list), 0)
self.assertTrue('etag' in created_collection.client_connection.last_response_headers)
self.assertNotEqual(created_collection.client_connection.last_response_headers['etag'], '')

# Read change feed from beginning should return an empty list
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
is_start_from_beginning=True
is_start_from_beginning=True,
**partitionParam
)
iter_list = list(query_iterable)
self.assertEqual(len(iter_list), 0)
Expand All @@ -79,8 +87,8 @@ def test_query_change_feed(self):
document_definition = {'pk': 'pk', 'id':'doc1'}
created_collection.create_item(body=document_definition)
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
is_start_from_beginning=True,
**partitionParam
)
iter_list = list(query_iterable)
self.assertEqual(len(iter_list), 1)
Expand All @@ -100,9 +108,9 @@ def test_query_change_feed(self):
for pageSize in [1, 100]:
# verify iterator
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
continuation=continuation2,
max_item_count=pageSize
max_item_count=pageSize,
**partitionParam
)
it = query_iterable.__iter__()
expected_ids = 'doc2.doc3.'
Expand All @@ -114,9 +122,9 @@ def test_query_change_feed(self):
# verify by_page
# the options is not copied, therefore it need to be restored
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
continuation=continuation2,
max_item_count=pageSize
max_item_count=pageSize,
**partitionParam
)
count = 0
expected_count = 2
Expand All @@ -134,8 +142,8 @@ def test_query_change_feed(self):

# verify reading change feed from the beginning
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
is_start_from_beginning=True
is_start_from_beginning=True,
**partitionParam
)
expected_ids = ['doc1', 'doc2', 'doc3']
it = query_iterable.__iter__()
Expand All @@ -147,9 +155,9 @@ def test_query_change_feed(self):

# verify reading empty change feed
query_iterable = created_collection.query_items_change_feed(
partition_key_range_id=pkRangeId,
continuation=continuation3,
is_start_from_beginning=True
is_start_from_beginning=True,
**partitionParam
)
iter_list = list(query_iterable)
self.assertEqual(len(iter_list), 0)
Expand Down

0 comments on commit 12ff1b2

Please sign in to comment.