Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge pr 35022 into pr34694 - Add Start Time To CF, Fix Retryable Exception in First Page Bug, and Fix Case Sensitive Headers Bug #35090

Merged
merged 27 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
a593860
Add start time to change feed query
bambriz Mar 7, 2024
4ddfe02
Added condition to make change feed work with start time
bambriz Mar 28, 2024
ca59d5e
Merge remote-tracking branch 'upstream/main' into read-specific-time
bambriz Mar 29, 2024
79cd79b
Add new HTTP Header for Async Etag
bambriz Mar 29, 2024
c07dbfc
Update start time tests
bambriz Mar 29, 2024
34d06d4
comment fix
bambriz Mar 29, 2024
08c3f59
Readme and changelog updates
bambriz Mar 29, 2024
995189e
Fix bug with retryable exception in first page
nicknotfun Mar 31, 2024
7c36ee0
Merge remote-tracking branch 'upstream/main' into read-specific-time
bambriz Apr 2, 2024
0f800b0
Merge PR 35022
bambriz Apr 2, 2024
5e5cd89
Adds fix to response header type and other needed changes for the mer…
bambriz Apr 5, 2024
d89d904
Merge remote-tracking branch 'upstream/main' into Merge-PR-35022-into…
bambriz Apr 5, 2024
5ced092
Testing retry on execution context bug
bambriz Apr 5, 2024
a019b35
Debugging additions
bambriz Apr 5, 2024
5b92d09
Moved has_Started flag to after fetch function
bambriz Apr 5, 2024
075551f
update _base.py
bambriz Apr 8, 2024
320a8b8
Pylint fixes
bambriz Apr 8, 2024
9b4fd26
update start time tests
bambriz Apr 8, 2024
5e4d328
Removed testing artifacts updated Changelog
bambriz Apr 8, 2024
7ffb5d4
Added Test Coverage for retryable exception bug
bambriz Apr 10, 2024
6b362c9
Fixed Comments
bambriz Apr 10, 2024
21f9c69
Update sdk/cosmos/azure-cosmos/CHANGELOG.md
bambriz Apr 10, 2024
ce6ab05
Update sdk/cosmos/azure-cosmos/CHANGELOG.md
bambriz Apr 10, 2024
41455b6
Update sdk/cosmos/azure-cosmos/CHANGELOG.md
bambriz Apr 10, 2024
a440fde
optimization changes
bambriz Apr 10, 2024
1512fa2
update _base.py
bambriz Apr 10, 2024
1aa7b1d
Add samples for start time change feed
bambriz Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@
### 4.6.1 (Unreleased)

#### Features Added
* Added support for using the start time option for change feed pulls. See [PR 34694](https://github.com/Azure/azure-sdk-for-python/pull/34694)

#### Breaking Changes

#### Bugs Fixed
* Fixed bug where change feed pulls in Async clients weren't returning all pages. See [PR 34694](https://github.com/Azure/azure-sdk-for-python/pull/34694)
* Fixed bug when a retryable exception occurs in the first page of a query execution. See [PR](https://github.com/Azure/azure-sdk-for-python/pull/).


#### Other Changes

Expand Down
1 change: 0 additions & 1 deletion sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ Currently, the features below are **not supported**. For alternatives options, c
Streamable queries like `SELECT * FROM WHERE` *do* support continuation tokens.
* Change Feed: Processor
* Change Feed: Read multiple partitions key values
* Change Feed: Read specific time
* Cross-partition ORDER BY for mixed types
* Enabling diagnostics for async query-type methods

Expand Down
9 changes: 9 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"""

import base64
from datetime import timezone
from email.utils import formatdate
import json
import uuid
Expand Down Expand Up @@ -291,8 +292,16 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if_none_match_value = options["continuation"]
elif options.get("isStartFromBeginning") and not options["isStartFromBeginning"]:
if_none_match_value = "*"
elif options.get("startTime"):
dt = options.get("startTime")
bambriz marked this conversation as resolved.
Show resolved Hide resolved
# If datetime has specified timezone, and it's not utc, convert to utc.
# Otherwise, assume it is in UTC.
if dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None:
dt = dt.astimezone(timezone.utc)
headers[http_constants.HttpHeaders.IfModified_since] = dt.strftime('%a, %d %b %Y %H:%M:%S GMT')
if if_none_match_value:
headers[http_constants.HttpHeaders.IfNoneMatch] = if_none_match_value

headers[http_constants.HttpHeaders.AIM] = http_constants.HttpHeaders.IncrementalFeedHeaderValue
else:
if options.get("continuation"):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

from ...aio import _retry_utility_async
from ... import http_constants
from ... import exceptions

# pylint: disable=protected-access

Expand Down Expand Up @@ -56,7 +57,18 @@ def _get_initial_continuation(self):
return None

def _has_more_pages(self):
return not self._has_started or self._continuation
return not self._has_finished

async def _ensure(self):
if not self._has_more_pages():
return

if not self._buffer:
results = await self._fetch_next_block()
self._buffer.extend(results)

if not self._buffer:
self._has_finished = True

async def fetch_next_block(self):
"""Returns a block of results with respecting retry policy.
Expand All @@ -67,17 +79,10 @@ async def fetch_next_block(self):
:return: List of results.
:rtype: list
"""
if not self._has_more_pages():
return []

if self._buffer:
# if there is anything in the buffer returns that
res = list(self._buffer)
self._buffer.clear()
return res

# fetches the next block
return await self._fetch_next_block()
await self._ensure()
res = list(self._buffer)
self._buffer.clear()
return res

async def _fetch_next_block(self):
raise NotImplementedError
Expand All @@ -96,13 +101,7 @@ async def __anext__(self):
:rtype: dict
:raises StopAsyncIteration: If no more result is left.
"""
if self._has_finished:
raise StopAsyncIteration

if not self._buffer:

results = await self.fetch_next_block()
self._buffer.extend(results)
await self._ensure()

if not self._buffer:
raise StopAsyncIteration
Expand All @@ -115,23 +114,29 @@ async def _fetch_items_helper_no_retries(self, fetch_function):
:param Callable fetch_function: The function that fetches the items.
:return: List of fetched items.
:rtype: list
"""
"""
fetched_items = []
# Continues pages till finds a non-empty page or all results are exhausted
while self._continuation or not self._has_started:
bambriz marked this conversation as resolved.
Show resolved Hide resolved
# Check if this is first fetch for read from specific time change feed.
# For read specific time the first fetch will return empty even if we have more pages.
is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started
if not self._has_started:
self._has_started = True
new_options = copy.deepcopy(self._options)
bambriz marked this conversation as resolved.
Show resolved Hide resolved
new_options["continuation"] = self._continuation

response_headers = {}
(fetched_items, response_headers) = await fetch_function(new_options)


continuation_key = http_constants.HttpHeaders.Continuation
# Use Etag as continuation token for change feed queries.
if self._is_change_feed:
continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
# there is any items in the response or not.
if not self._is_change_feed or fetched_items:
# No initial fetch for start time change feed, so we need to pass continuation token for first fetch
if not self._is_change_feed or fetched_items or is_s_time_first_fetch:
self._continuation = response_headers.get(continuation_key)
else:
self._continuation = None
Expand All @@ -141,7 +146,12 @@ async def _fetch_items_helper_no_retries(self, fetch_function):

async def _fetch_items_helper_with_retries(self, fetch_function):
async def callback():
return await self._fetch_items_helper_no_retries(fetch_function)
try:
return await self._fetch_items_helper_no_retries(fetch_function)
except exceptions.CosmosHttpResponseError as e:
if e.status_code == 429:
self._has_started = False
bambriz marked this conversation as resolved.
Show resolved Hide resolved
return await self._fetch_items_helper_no_retries(fetch_function)

return await _retry_utility_async.ExecuteAsync(self._client, self._client._global_endpoint_manager, callback)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from collections import deque
import copy
from .. import _retry_utility, http_constants

from .. import exceptions
# pylint: disable=protected-access


Expand Down Expand Up @@ -54,7 +54,18 @@ def _get_initial_continuation(self):
return None

def _has_more_pages(self):
return not self._has_started or self._continuation
return not self._has_finished

def _ensure(self):
if not self._has_more_pages():
return

if not self._buffer:
results = self._fetch_next_block()
self._buffer.extend(results)

if not self._buffer:
self._has_finished = True

def fetch_next_block(self):
"""Returns a block of results with respecting retry policy.
Expand All @@ -65,17 +76,10 @@ def fetch_next_block(self):
:return: List of results.
:rtype: list
"""
if not self._has_more_pages():
return []

if self._buffer:
# if there is anything in the buffer returns that
res = list(self._buffer)
self._buffer.clear()
return res

# fetches the next block
return self._fetch_next_block()
self._ensure()
res = list(self._buffer)
self._buffer.clear()
return res

def _fetch_next_block(self):
raise NotImplementedError
Expand All @@ -94,13 +98,7 @@ def __next__(self):
:rtype: dict
:raises StopIteration: If no more result is left.
"""
if self._has_finished:
raise StopIteration

if not self._buffer:

results = self.fetch_next_block()
self._buffer.extend(results)
self._ensure()

if not self._buffer:
raise StopIteration
Expand All @@ -117,19 +115,26 @@ def _fetch_items_helper_no_retries(self, fetch_function):
fetched_items = []
# Continues pages till finds a non-empty page or all results are exhausted
while self._continuation or not self._has_started:
# Check if this is first fetch for read from specific time change feed.
# For read specific time the first fetch will return empty even if we have more pages.
is_s_time_first_fetch = self._is_change_feed and self._options.get("startTime") and not self._has_started
if not self._has_started:
self._has_started = True
new_options = copy.deepcopy(self._options)
bambriz marked this conversation as resolved.
Show resolved Hide resolved
new_options["continuation"] = self._continuation

response_headers = {}
(fetched_items, response_headers) = fetch_function(new_options)


continuation_key = http_constants.HttpHeaders.Continuation
# Use Etag as continuation token for change feed queries.
if self._is_change_feed:
continuation_key = http_constants.HttpHeaders.ETag
# In change feed queries, the continuation token is always populated. The hasNext() test is whether
# there is any items in the response or not.
if not self._is_change_feed or fetched_items:
# For start time however we get no initial results, so we need to pass continuation token
if not self._is_change_feed or fetched_items or is_s_time_first_fetch:
self._continuation = response_headers.get(continuation_key)
else:
self._continuation = None
Expand All @@ -139,7 +144,12 @@ def _fetch_items_helper_no_retries(self, fetch_function):

def _fetch_items_helper_with_retries(self, fetch_function):
def callback():
return self._fetch_items_helper_no_retries(fetch_function)
try:
return self._fetch_items_helper_no_retries(fetch_function)
except exceptions.CosmosHttpResponseError as e:
if e.status_code == 429:
self._has_started = False
return self._fetch_items_helper_no_retries(fetch_function)

return _retry_utility.Execute(self._client, self._client._global_endpoint_manager, callback)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Synchronized request in the Azure Cosmos database service.
"""

import copy
import json
import time

Expand Down Expand Up @@ -139,7 +139,7 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
)

response = response.http_response
headers = dict(response.headers)
headers = copy.copy(response.headers)

data = response.body()
if data:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Asynchronous request in the Azure Cosmos database service.
"""

import copy
import json
import time

Expand Down Expand Up @@ -107,7 +107,7 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p
)

response = response.http_response
headers = dict(response.headers)
headers = copy.copy(response.headers)

data = response.body()
if data:
Expand Down
7 changes: 6 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Create, read, update and delete items in the Azure Cosmos DB SQL API service.
"""

from datetime import datetime
from typing import Any, Dict, Mapping, Optional, Sequence, Type, Union, List, Tuple, cast
from typing_extensions import Literal

Expand Down Expand Up @@ -476,6 +476,7 @@ def query_items_change_feed(
*,
partition_key_range_id: Optional[str] = None,
is_start_from_beginning: bool = False,
start_time: Optional[datetime] = None,
continuation: Optional[str] = None,
max_item_count: Optional[int] = None,
partition_key: Optional[PartitionKeyType] = None,
Expand All @@ -486,6 +487,9 @@ def query_items_change_feed(

:keyword bool is_start_from_beginning: Get whether change feed should start from
beginning (true) or from current (false). By default, it's start from current (false).
:keyword datetime start_time: Specifies a point of time to start change feed. Start time in
bambriz marked this conversation as resolved.
Show resolved Hide resolved
http://www.ietf.org/rfc/rfc2616.txt format. Converts datetime to UTC if timezone is defined. Assumes
datetime is in UTC if timezone is not defined.
:keyword str partition_key_range_id: ChangeFeed requests can be executed against specific partition key
ranges. This is used to process the change feed in parallel across multiple consumers.
:keyword str continuation: e_tag value to be used as continuation for reading change feed.
Expand All @@ -505,6 +509,7 @@ def query_items_change_feed(
kwargs['priority'] = priority
feed_options = _build_options(kwargs)
feed_options["isStartFromBeginning"] = is_start_from_beginning
feed_options["startTime"] = start_time
if partition_key_range_id is not None:
feed_options["partitionKeyRangeId"] = partition_key_range_id
if partition_key is not None:
Expand Down
8 changes: 7 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

"""Create, read, update and delete items in the Azure Cosmos DB SQL API service.
"""

from datetime import datetime
import warnings
from typing import Any, Dict, List, Optional, Sequence, Union, Tuple, Mapping, Type, cast
from typing_extensions import Literal
Expand Down Expand Up @@ -307,6 +307,7 @@ def query_items_change_feed(
self,
partition_key_range_id: Optional[str] = None,
is_start_from_beginning: bool = False,
start_time: Optional[datetime] = None,
continuation: Optional[str] = None,
max_item_count: Optional[int] = None,
*,
Expand All @@ -320,6 +321,9 @@ def query_items_change_feed(
This is used to process the change feed in parallel across multiple consumers.
:param bool is_start_from_beginning: Get whether change feed should start from
beginning (true) or from current (false). By default, it's start from current (false).
:param datetime start_time: Specifies a point of time to start change feed. Start time in
bambriz marked this conversation as resolved.
Show resolved Hide resolved
http://www.ietf.org/rfc/rfc2616.txt format. Converts datetime to UTC if timezone is defined. Assumes
datetime is in UTC if timezone is not defined.
:param max_item_count: Max number of items to be returned in the enumeration operation.
:param str continuation: e_tag value to be used as continuation for reading change feed.
:param int max_item_count: Max number of items to be returned in the enumeration operation.
Expand All @@ -342,6 +346,8 @@ def query_items_change_feed(
feed_options["partitionKey"] = self._set_partition_key(partition_key)
if is_start_from_beginning is not None:
feed_options["isStartFromBeginning"] = is_start_from_beginning
if start_time is not None and is_start_from_beginning is False:
feed_options["startTime"] = start_time
if max_item_count is not None:
feed_options["maxItemCount"] = max_item_count
if continuation is not None:
Expand Down
Loading
Loading