Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add start time to change feed query #34694

Closed
wants to merge 11 commits into from
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
### 4.6.1 (Unreleased)

#### Features Added
* Added support for using the start time option for change feed pulls. See [PR 34694](https://github.com/Azure/azure-sdk-for-python/pull/34694)

#### Breaking Changes

#### Bugs Fixed
* Fixed bug where change feed pulls in Async clients weren't returning all pages. See [PR 34694](https://github.com/Azure/azure-sdk-for-python/pull/34694)

#### Other Changes

Expand Down
1 change: 0 additions & 1 deletion sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ Currently, the features below are **not supported**. For alternatives options, c
Streamable queries like `SELECT * FROM WHERE` *do* support continuation tokens.
* Change Feed: Processor
* Change Feed: Read multiple partitions key values
* Change Feed: Read specific time
* Cross-partition ORDER BY for mixed types
* Enabling diagnostics for async query-type methods

Expand Down
10 changes: 10 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""

import base64
from datetime import timezone, datetime
from email.utils import formatdate
import json
import uuid
Expand Down Expand Up @@ -291,8 +292,17 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if_none_match_value = options["continuation"]
elif options.get("isStartFromBeginning") and not options["isStartFromBeginning"]:
if_none_match_value = "*"
elif options.get("startTime"):
dt = options.get("startTime")
# If datetime has specified timezone, and it's not utc, convert to utc.
# Otherwise, assume it is in UTC.
if isinstance(dt, datetime):
if dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None:
dt = dt.astimezone(timezone.utc)
headers[http_constants.HttpHeaders.IfModified_since] = dt.strftime('%a, %d %b %Y %H:%M:%S GMT')
if if_none_match_value:
headers[http_constants.HttpHeaders.IfNoneMatch] = if_none_match_value

headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue
else:
if options.get("continuation"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,19 +119,24 @@ async def _fetch_items_helper_no_retries(self, fetch_function):
fetched_items = []
# Continues pages till finds a non-empty page or all results are exhausted
while self._continuation or not self._has_started:
# Check if this is first fetch for read from specific time change feed.
# For read specific time the first fetch will return empty even if we have more pages.
is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started
if not self._has_started:
self._has_started = True
new_options = copy.deepcopy(self._options)
new_options["continuation"] = self._continuation

response_headers = {}
(fetched_items, response_headers) = await fetch_function(new_options)
continuation_key = http_constants.HttpHeaders.Continuation
# Use Etag as continuation token for change feed queries.
if self._is_change_feed:
continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
# there is any items in the response or not.
if not self._is_change_feed or fetched_items:
# For start time however we get no initial results, so we need to pass continuation token
if not self._is_change_feed or fetched_items or is_s_time_first_fetch:
self._continuation = response_headers.get(continuation_key)
else:
self._continuation = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,24 @@ def _fetch_items_helper_no_retries(self, fetch_function):
fetched_items = []
# Continues pages till finds a non-empty page or all results are exhausted
while self._continuation or not self._has_started:
# Check if this is first fetch for read from specific time change feed.
# For read specific time the first fetch will return empty even if we have more pages.
is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started
if not self._has_started:
self._has_started = True
new_options = copy.deepcopy(self._options)
new_options["continuation"] = self._continuation

response_headers = {}
(fetched_items, response_headers) = fetch_function(new_options)
continuation_key = http_constants.HttpHeaders.Continuation
# Use Etag as continuation token for change feed queries.
if self._is_change_feed:
continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
# there is any items in the response or not.
if not self._is_change_feed or fetched_items:
# For start time however we get no initial results, so we need to pass continuation token
if not self._is_change_feed or fetched_items or is_s_time_first_fetch:
self._continuation = response_headers.get(continuation_key)
else:
self._continuation = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Synchronized request in the Azure Cosmos database service.
"""

import copy
import json
import time

Expand Down Expand Up @@ -139,7 +139,7 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
)

response = response.http_response
headers = dict(response.headers)
headers = copy.copy(response.headers)

data = response.body()
if data:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Asynchronous request in the Azure Cosmos database service.
"""

import copy
import json
import time

Expand Down Expand Up @@ -107,7 +107,7 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p
)

response = response.http_response
headers = dict(response.headers)
headers = copy.copy(response.headers)

data = response.body()
if data:
Expand Down
7 changes: 6 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Create, read, update and delete items in the Azure Cosmos DB SQL API service.
"""

from datetime import datetime
from typing import Any, Dict, Mapping, Optional, Sequence, Type, Union, List, Tuple, cast
from typing_extensions import Literal

Expand Down Expand Up @@ -476,6 +476,7 @@ def query_items_change_feed(
*,
partition_key_range_id: Optional[str] = None,
is_start_from_beginning: bool = False,
start_time: Optional[datetime] = None,
continuation: Optional[str] = None,
max_item_count: Optional[int] = None,
partition_key: Optional[PartitionKeyType] = None,
Expand All @@ -486,6 +487,9 @@ def query_items_change_feed(

:keyword bool is_start_from_beginning: Get whether change feed should start from
beginning (true) or from current (false). By default, it's start from current (false).
:keyword datetime start_time: Specifies a point of time to start change feed. Start time in
http://www.ietf.org/rfc/rfc2616.txt format. Converts datetime to UTC if timezone is defined. Assumes
datetime is in UTC if timezone is not defined.
:keyword str partition_key_range_id: ChangeFeed requests can be executed against specific partition key
ranges. This is used to process the change feed in parallel across multiple consumers.
:keyword str continuation: e_tag value to be used as continuation for reading change feed.
Expand All @@ -505,6 +509,7 @@ def query_items_change_feed(
kwargs['priority'] = priority
feed_options = _build_options(kwargs)
feed_options["isStartFromBeginning"] = is_start_from_beginning
feed_options["startTime"] = start_time
if partition_key_range_id is not None:
feed_options["partitionKeyRangeId"] = partition_key_range_id
if partition_key is not None:
Expand Down
8 changes: 7 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Create, read, update and delete items in the Azure Cosmos DB SQL API service.
"""

from datetime import datetime
import warnings
from typing import Any, Dict, List, Optional, Sequence, Union, Tuple, Mapping, Type, cast
from typing_extensions import Literal
Expand Down Expand Up @@ -307,6 +307,7 @@ def query_items_change_feed(
self,
partition_key_range_id: Optional[str] = None,
is_start_from_beginning: bool = False,
start_time: Optional[datetime] = None,
continuation: Optional[str] = None,
max_item_count: Optional[int] = None,
*,
Expand All @@ -320,6 +321,9 @@ def query_items_change_feed(
This is used to process the change feed in parallel across multiple consumers.
:param bool is_start_from_beginning: Get whether change feed should start from
beginning (true) or from current (false). By default, it's start from current (false).
:param datetime start_time: Specifies a point of time to start change feed. Start time in
http://www.ietf.org/rfc/rfc2616.txt format. Converts datetime to UTC if timezone is defined. Assumes
datetime is in UTC if timezone is not defined.
:param max_item_count: Max number of items to be returned in the enumeration operation.
:param str continuation: e_tag value to be used as continuation for reading change feed.
:param int max_item_count: Max number of items to be returned in the enumeration operation.
Expand All @@ -342,6 +346,8 @@ def query_items_change_feed(
feed_options["partitionKey"] = self._set_partition_key(partition_key)
if is_start_from_beginning is not None:
feed_options["isStartFromBeginning"] = is_start_from_beginning
if start_time is not None and is_start_from_beginning is False:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this logic different from the async client?
Additionally - it looks like these parameters are mutually exclusive, so what happens if I pass both a start_time as well as start_from_beginning=True? Will the service raise an exception?

feed_options["startTime"] = start_time
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
if continuation is not None:
Expand Down
69 changes: 68 additions & 1 deletion sdk/cosmos/azure-cosmos/test/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import unittest
import uuid

from datetime import datetime, timedelta
from time import sleep
import pytest

import azure.cosmos._retry_utility as retry_utility
Expand Down Expand Up @@ -278,6 +279,72 @@ def test_query_change_feed_with_pk_range_id(self):
self.assertEqual(len(iter_list), 0)
self.created_db.delete_container(created_collection.id)

def test_query_change_feed_with_start_time(self):
created_collection = self.created_db.create_container_if_not_exists("query_change_feed_start_time_test",
PartitionKey(path="/pk"))
batchSize = 50

def round_time():
utc_now = datetime.utcnow()
return utc_now - timedelta(microseconds=utc_now.microsecond)
def create_random_items(container, batch_size):
for _ in range(batch_size):
# Generate a Random partition key
partition_key = 'pk' + str(uuid.uuid4())

# Generate a random item
item = {
'id': 'item' + str(uuid.uuid4()),
'partitionKey': partition_key,
'content': 'This is some random content',
}

try:
# Create the item in the container
container.upsert_item(item)
except exceptions.CosmosHttpResponseError as e:
self.fail(e)

# Create first batch of random items
create_random_items(created_collection, batchSize)

# wait for 1 second and record the time, then wait another second
sleep(1)
start_time = round_time()
sleep(1)

# now create another batch of items
create_random_items(created_collection, batchSize)

# now query change feed based on start time
change_feed_iter = list(created_collection.query_items_change_feed(start_time=start_time))
totalCount = len(change_feed_iter)

# now check if the number of items that were changed match the batch size
self.assertEqual(totalCount, batchSize)

# negative test: pass in a valid time in the future
future_time = start_time + timedelta(hours=1)
change_feed_iter = list(created_collection.query_items_change_feed(start_time=future_time))
totalCount = len(change_feed_iter)
# A future time should return 0
self.assertEqual(totalCount, 0)

# test a date that is not utc, will ignore start time option
not_utc_time = datetime.now()
change_feed_iter = list(created_collection.query_items_change_feed(start_time=not_utc_time))
totalCount = len(change_feed_iter)
# should return 100 (double the batch size)
self.assertEqual(totalCount, batchSize*2)

# test an invalid value, will ignore start time option
invalid_time = "Invalid value"
change_feed_iter = list(created_collection.query_items_change_feed(start_time=invalid_time))
totalCount = len(change_feed_iter)
# should return 100 (double the batch size)
self.assertEqual(totalCount, batchSize * 2)


def test_populate_query_metrics(self):
created_collection = self.created_db.create_container("query_metrics_test",
PartitionKey(path="/pk"))
Expand Down
70 changes: 70 additions & 0 deletions sdk/cosmos/azure-cosmos/test/test_query_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

import unittest
import uuid
from asyncio import sleep
from datetime import datetime, timedelta

import pytest

Expand Down Expand Up @@ -316,8 +318,76 @@ async def test_query_change_feed_with_pk_range_id_async(self):
iter_list = [item async for item in query_iterable]
assert len(iter_list) == 0

@pytest.mark.asyncio
async def test_query_change_feed_with_start_time(self):
created_collection = await self.created_db.create_container_if_not_exists("query_change_feed_start_time_test",
PartitionKey(path="/pk"))
batchSize = 50

def round_time():
utc_now = datetime.utcnow()
return utc_now - timedelta(microseconds=utc_now.microsecond)

async def create_random_items(container, batch_size):
for _ in range(batch_size):
# Generate a Random partition key
partition_key = 'pk' + str(uuid.uuid4())

# Generate a random item
item = {
'id': 'item' + str(uuid.uuid4()),
'partitionKey': partition_key,
'content': 'This is some random content',
}

try:
# Create the item in the container
await container.upsert_item(item)
except exceptions.CosmosHttpResponseError as e:
pytest.fail(e)

# Create first batch of random items
await create_random_items(created_collection, batchSize)

# wait for 1 second and record the time, then wait another second
await sleep(1)
start_time = round_time()
await sleep(1)

# now create another batch of items
await create_random_items(created_collection, batchSize)

# now query change feed based on start time
change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=start_time)]
totalCount = len(change_feed_iter)

# now check if the number of items that were changed match the batch size
assert totalCount == batchSize

# negative test: pass in a valid time in the future
future_time = start_time + timedelta(hours=1)
change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=future_time)]
totalCount = len(change_feed_iter)
# A future time should return 0
assert totalCount == 0

# test a date that is not utc, will ignore start time option
not_utc_time = datetime.now()
change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=not_utc_time)]
totalCount = len(change_feed_iter)
# should return 100 (double the batch size)
assert totalCount == batchSize * 2

# test an invalid value, will ignore start time option
invalid_time = "Invalid value"
change_feed_iter = [i async for i in created_collection.query_items_change_feed(start_time=invalid_time)]
totalCount = len(change_feed_iter)
# should return 100 (double the batch size)
assert totalCount == batchSize * 2

await self.created_db.delete_container(created_collection.id)

@pytest.mark.asyncio
async def test_populate_query_metrics_async(self):
created_collection = await self.created_db.create_container(
"query_metrics_test" + str(uuid.uuid4()),
Expand Down
Loading