diff --git a/sdk/cosmos/azure-cosmos/CHANGELOG.md b/sdk/cosmos/azure-cosmos/CHANGELOG.md index 4017eee05c8a..825e2d909a9b 100644 --- a/sdk/cosmos/azure-cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure-cosmos/CHANGELOG.md @@ -3,10 +3,12 @@ ### 4.6.1 (Unreleased) #### Features Added +* Added support for using the start time option for change feed pulls. See [PR 34694](https://github.com/Azure/azure-sdk-for-python/pull/34694) #### Breaking Changes #### Bugs Fixed +* Fixed bug where change feed pulls in Async clients weren't returning all pages. See [PR 34694](https://github.com/Azure/azure-sdk-for-python/pull/34694) #### Other Changes diff --git a/sdk/cosmos/azure-cosmos/README.md b/sdk/cosmos/azure-cosmos/README.md index 4bd8666c682b..f34d2d3df0b2 100644 --- a/sdk/cosmos/azure-cosmos/README.md +++ b/sdk/cosmos/azure-cosmos/README.md @@ -158,7 +158,6 @@ Currently, the features below are **not supported**. For alternatives options, c Streamable queries like `SELECT * FROM WHERE` *do* support continuation tokens. * Change Feed: Processor * Change Feed: Read multiple partitions key values -* Change Feed: Read specific time * Cross-partition ORDER BY for mixed types * Enabling diagnostics for async query-type methods diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py index f8ab0ca15961..d6b64a758648 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_base.py @@ -23,6 +23,7 @@ """ import base64 +from datetime import timezone, datetime from email.utils import formatdate import json import uuid @@ -291,8 +292,17 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches if_none_match_value = options["continuation"] elif options.get("isStartFromBeginning") and not options["isStartFromBeginning"]: if_none_match_value = "*" + elif options.get("startTime"): + dt = options.get("startTime") + # If datetime has specified timezone, and it's not utc, convert to utc. + # Otherwise, assume it is in UTC. + if isinstance(dt, datetime): + if dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None: + dt = dt.astimezone(timezone.utc) + headers[http_constants.HttpHeaders.IfModified_since] = dt.strftime('%a, %d %b %Y %H:%M:%S GMT') if if_none_match_value: headers[http_constants.HttpHeaders.IfNoneMatch] = if_none_match_value + headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue else: if options.get("continuation"): diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py index 3f52e342d641..c8e13940ed5d 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/base_execution_context.py @@ -119,11 +119,15 @@ async def _fetch_items_helper_no_retries(self, fetch_function): fetched_items = [] # Continues pages till finds a non-empty page or all results are exhausted while self._continuation or not self._has_started: + # Check if this is first fetch for read from specific time change feed. + # For read specific time the first fetch will return empty even if we have more pages. + is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started if not self._has_started: self._has_started = True new_options = copy.deepcopy(self._options) new_options["continuation"] = self._continuation + response_headers = {} (fetched_items, response_headers) = await fetch_function(new_options) continuation_key = http_constants.HttpHeaders.Continuation # Use Etag as continuation token for change feed queries. @@ -131,7 +135,8 @@ async def _fetch_items_helper_no_retries(self, fetch_function): continuation_key = http_constants.HttpHeaders.ETag # In change feed queries, the continuation token is always populated. The hasNext() test is whether # there is any items in the response or not. - if not self._is_change_feed or fetched_items: + # For start time however we get no initial results, so we need to pass continuation token + if not self._is_change_feed or fetched_items or is_s_time_first_fetch: self._continuation = response_headers.get(continuation_key) else: self._continuation = None diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py index f0769f6caf25..8eddb72e3f80 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/base_execution_context.py @@ -117,11 +117,15 @@ def _fetch_items_helper_no_retries(self, fetch_function): fetched_items = [] # Continues pages till finds a non-empty page or all results are exhausted while self._continuation or not self._has_started: + # Check if this is first fetch for read from specific time change feed. + # For read specific time the first fetch will return empty even if we have more pages. + is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started if not self._has_started: self._has_started = True new_options = copy.deepcopy(self._options) new_options["continuation"] = self._continuation + response_headers = {} (fetched_items, response_headers) = fetch_function(new_options) continuation_key = http_constants.HttpHeaders.Continuation # Use Etag as continuation token for change feed queries. @@ -129,7 +133,8 @@ def _fetch_items_helper_no_retries(self, fetch_function): continuation_key = http_constants.HttpHeaders.ETag # In change feed queries, the continuation token is always populated. The hasNext() test is whether # there is any items in the response or not. - if not self._is_change_feed or fetched_items: + # For start time however we get no initial results, so we need to pass continuation token + if not self._is_change_feed or fetched_items or is_s_time_first_fetch: self._continuation = response_headers.get(continuation_key) else: self._continuation = None diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py index 7dea1f2abec4..5663b8aa9faa 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py @@ -21,7 +21,7 @@ """Synchronized request in the Azure Cosmos database service. """ - +import copy import json import time @@ -139,7 +139,7 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin ) response = response.http_response - headers = dict(response.headers) + headers = copy.copy(response.headers) data = response.body() if data: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py index a3e8b25a823c..176b33807ddd 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_asynchronous_request.py @@ -21,7 +21,7 @@ """Asynchronous request in the Azure Cosmos database service. """ - +import copy import json import time @@ -107,7 +107,7 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p ) response = response.http_response - headers = dict(response.headers) + headers = copy.copy(response.headers) data = response.body() if data: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py index eebd6c552c64..7de7662c5ad1 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py @@ -21,7 +21,7 @@ """Create, read, update and delete items in the Azure Cosmos DB SQL API service. """ - +from datetime import datetime from typing import Any, Dict, Mapping, Optional, Sequence, Type, Union, List, Tuple, cast from typing_extensions import Literal @@ -476,6 +476,7 @@ def query_items_change_feed( *, partition_key_range_id: Optional[str] = None, is_start_from_beginning: bool = False, + start_time: Optional[datetime] = None, continuation: Optional[str] = None, max_item_count: Optional[int] = None, partition_key: Optional[PartitionKeyType] = None, @@ -486,6 +487,9 @@ def query_items_change_feed( :keyword bool is_start_from_beginning: Get whether change feed should start from beginning (true) or from current (false). By default, it's start from current (false). + :keyword datetime start_time: Specifies a point of time to start change feed. Start time in + http://www.ietf.org/rfc/rfc2616.txt format. Converts datetime to UTC if timezone is defined. Assumes + datetime is in UTC if timezone is not defined. :keyword str partition_key_range_id: ChangeFeed requests can be executed against specific partition key ranges. This is used to process the change feed in parallel across multiple consumers. :keyword str continuation: e_tag value to be used as continuation for reading change feed. @@ -505,6 +509,7 @@ def query_items_change_feed( kwargs['priority'] = priority feed_options = _build_options(kwargs) feed_options["isStartFromBeginning"] = is_start_from_beginning + feed_options["startTime"] = start_time if partition_key_range_id is not None: feed_options["partitionKeyRangeId"] = partition_key_range_id if partition_key is not None: diff --git a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py index 1262d96c7327..400735a28906 100644 --- a/sdk/cosmos/azure-cosmos/azure/cosmos/container.py +++ b/sdk/cosmos/azure-cosmos/azure/cosmos/container.py @@ -21,7 +21,7 @@ """Create, read, update and delete items in the Azure Cosmos DB SQL API service. """ - +from datetime import datetime import warnings from typing import Any, Dict, List, Optional, Sequence, Union, Tuple, Mapping, Type, cast from typing_extensions import Literal @@ -307,6 +307,7 @@ def query_items_change_feed( self, partition_key_range_id: Optional[str] = None, is_start_from_beginning: bool = False, + start_time: Optional[datetime] = None, continuation: Optional[str] = None, max_item_count: Optional[int] = None, *, @@ -320,6 +321,9 @@ def query_items_change_feed( This is used to process the change feed in parallel across multiple consumers. :param bool is_start_from_beginning: Get whether change feed should start from beginning (true) or from current (false). By default, it's start from current (false). + :param datetime start_time: Specifies a point of time to start change feed. Start time in + http://www.ietf.org/rfc/rfc2616.txt format. Converts datetime to UTC if timezone is defined. Assumes + datetime is in UTC if timezone is not defined. :param max_item_count: Max number of items to be returned in the enumeration operation. :param str continuation: e_tag value to be used as continuation for reading change feed. :param int max_item_count: Max number of items to be returned in the enumeration operation. @@ -342,6 +346,8 @@ def query_items_change_feed( feed_options["partitionKey"] = self._set_partition_key(partition_key) if is_start_from_beginning is not None: feed_options["isStartFromBeginning"] = is_start_from_beginning + if start_time is not None and is_start_from_beginning is False: + feed_options["startTime"] = start_time if max_item_count is not None: feed_options["maxItemCount"] = max_item_count if continuation is not None: diff --git a/sdk/cosmos/azure-cosmos/test/test_query.py b/sdk/cosmos/azure-cosmos/test/test_query.py index 4d1acfee34c0..8120ee0084dc 100644 --- a/sdk/cosmos/azure-cosmos/test/test_query.py +++ b/sdk/cosmos/azure-cosmos/test/test_query.py @@ -3,7 +3,8 @@ import unittest import uuid - +from datetime import datetime, timedelta +from time import sleep import pytest import azure.cosmos._retry_utility as retry_utility @@ -278,6 +279,72 @@ def test_query_change_feed_with_pk_range_id(self): self.assertEqual(len(iter_list), 0) self.created_db.delete_container(created_collection.id) + def test_query_change_feed_with_start_time(self): + created_collection = self.created_db.create_container_if_not_exists("query_change_feed_start_time_test", + PartitionKey(path="/pk")) + batchSize = 50 + + def round_time(): + utc_now = datetime.utcnow() + return utc_now - timedelta(microseconds=utc_now.microsecond) + def create_random_items(container, batch_size): + for _ in range(batch_size): + # Generate a Random partition key + partition_key = 'pk' + str(uuid.uuid4()) + + # Generate a random item + item = { + 'id': 'item' + str(uuid.uuid4()), + 'partitionKey': partition_key, + 'content': 'This is some random content', + } + + try: + # Create the item in the container + container.upsert_item(item) + except exceptions.CosmosHttpResponseError as e: + self.fail(e) + + # Create first batch of random items + create_random_items(created_collection, batchSize) + + # wait for 1 second and record the time, then wait another second + sleep(1) + start_time = round_time() + sleep(1) + + # now create another batch of items + create_random_items(created_collection, batchSize) + + # now query change feed based on start time + change_feed_iter = list(created_collection.query_items_change_feed(start_time=start_time)) + totalCount = len(change_feed_iter) + + # now check if the number of items that were changed match the batch size + self.assertEqual(totalCount, batchSize) + + # negative test: pass in a valid time in the future + future_time = start_time + timedelta(hours=1) + change_feed_iter = list(created_collection.query_items_change_feed(start_time=future_time)) + totalCount = len(change_feed_iter) + # A future time should return 0 + self.assertEqual(totalCount, 0) + + # test a date that is not utc, will ignore start time option + not_utc_time = datetime.now() + change_feed_iter = list(created_collection.query_items_change_feed(start_time=not_utc_time)) + totalCount = len(change_feed_iter) + # should return 100 (double the batch size) + self.assertEqual(totalCount, batchSize*2) + + # test an invalid value, will ignore start time option + invalid_time = "Invalid value" + change_feed_iter = list(created_collection.query_items_change_feed(start_time=invalid_time)) + totalCount = len(change_feed_iter) + # should return 100 (double the batch size) + self.assertEqual(totalCount, batchSize * 2) + + def test_populate_query_metrics(self): created_collection = self.created_db.create_container("query_metrics_test", PartitionKey(path="/pk")) diff --git a/sdk/cosmos/azure-cosmos/test/test_query_async.py b/sdk/cosmos/azure-cosmos/test/test_query_async.py index 3258de2343b1..6acdbb462845 100644 --- a/sdk/cosmos/azure-cosmos/test/test_query_async.py +++ b/sdk/cosmos/azure-cosmos/test/test_query_async.py @@ -3,6 +3,8 @@ import unittest import uuid +from asyncio import sleep +from datetime import datetime, timedelta import pytest @@ -316,8 +318,76 @@ async def test_query_change_feed_with_pk_range_id_async(self): iter_list = [item async for item in query_iterable] assert len(iter_list) == 0 + @pytest.mark.asyncio + async def test_query_change_feed_with_start_time(self): + created_collection = await self.created_db.create_container_if_not_exists("query_change_feed_start_time_test", + PartitionKey(path="/pk")) + batchSize = 50 + + def round_time(): + utc_now = datetime.utcnow() + return utc_now - timedelta(microseconds=utc_now.microsecond) + + async def create_random_items(container, batch_size): + for _ in range(batch_size): + # Generate a Random partition key + partition_key = 'pk' + str(uuid.uuid4()) + + # Generate a random item + item = { + 'id': 'item' + str(uuid.uuid4()), + 'partitionKey': partition_key, + 'content': 'This is some random content', + } + + try: + # Create the item in the container + await container.upsert_item(item) + except exceptions.CosmosHttpResponseError as e: + pytest.fail(e) + + # Create first batch of random items + await create_random_items(created_collection, batchSize) + + # wait for 1 second and record the time, then wait another second + await sleep(1) + start_time = round_time() + await sleep(1) + + # now create another batch of items + await create_random_items(created_collection, batchSize) + + # now query change feed based on start time + change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=start_time)] + totalCount = len(change_feed_iter) + + # now check if the number of items that were changed match the batch size + assert totalCount == batchSize + + # negative test: pass in a valid time in the future + future_time = start_time + timedelta(hours=1) + change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=future_time)] + totalCount = len(change_feed_iter) + # A future time should return 0 + assert totalCount == 0 + + # test a date that is not utc, will ignore start time option + not_utc_time = datetime.now() + change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=not_utc_time)] + totalCount = len(change_feed_iter) + # should return 100 (double the batch size) + assert totalCount == batchSize * 2 + + # test an invalid value, will ignore start time option + invalid_time = "Invalid value" + change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=invalid_time)] + totalCount = len(change_feed_iter) + # should return 100 (double the batch size) + assert totalCount == batchSize * 2 + await self.created_db.delete_container(created_collection.id) + @pytest.mark.asyncio async def test_populate_query_metrics_async(self): created_collection = await self.created_db.create_container( "query_metrics_test" + str(uuid.uuid4()),