Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cosmos] Resiliency and Documentation Improvements #36514

Merged
merged 44 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
fc5c059
403.3 loop fix, regional routing fix, improvement on service request …
simorenoh Jul 18, 2024
be8d893
Update ErrorCodesAndRetries.md
simorenoh Jul 18, 2024
7aabc1e
Update TimeoutAndRetriesConfig.md
simorenoh Jul 18, 2024
f7ae57b
Update http_constants.py
simorenoh Jul 18, 2024
94882e4
Update CHANGELOG.md
simorenoh Jul 18, 2024
3f8006b
test improvements for 403 retry
simorenoh Jul 19, 2024
fca7aff
fix emulator tests
simorenoh Jul 20, 2024
60188bc
Update test_globaldb.py
simorenoh Jul 20, 2024
45a0862
Update test_globaldb.py
simorenoh Jul 20, 2024
cff2e25
add ServiceRequestError test and doc update
simorenoh Jul 23, 2024
ef4aa85
addressing comments
simorenoh Jul 24, 2024
b1f33f5
Merge branch 'main' into retry_validations
simorenoh Jul 24, 2024
2fba121
Update test_globaldb.py
simorenoh Jul 24, 2024
48e2743
Update test_globaldb.py
simorenoh Jul 26, 2024
667e1ee
Update test_globaldb.py
simorenoh Jul 26, 2024
16d4110
Update test_globaldb.py
simorenoh Jul 26, 2024
62af4b7
move policy
simorenoh Jul 26, 2024
533c114
revert
simorenoh Jul 26, 2024
3aa409d
fixes
simorenoh Jul 27, 2024
bd5ef6a
Update test_globaldb.py
simorenoh Jul 27, 2024
e8adf10
Update test_globaldb.py
simorenoh Jul 28, 2024
a9fc8ee
Update test_globaldb.py
simorenoh Jul 29, 2024
d2f36a0
Merge branch 'main' into retry_validations
simorenoh Aug 1, 2024
42bea72
Update test_globaldb.py
simorenoh Aug 6, 2024
c4091ad
Update test_globaldb.py
simorenoh Aug 7, 2024
c7e2a64
Update CHANGELOG.md
simorenoh Aug 7, 2024
d882bf8
503 retries
simorenoh Aug 28, 2024
a4c22ea
align readme with changelog
simorenoh Sep 5, 2024
4dc977c
Merge branch 'main' into retry_validations
simorenoh Sep 10, 2024
cd07dba
forceful db account refresh
simorenoh Sep 11, 2024
f587f9c
remove premature locational endpoint
simorenoh Sep 28, 2024
3f5e6b0
make GEM refresh every 5 mins as it should have
simorenoh Sep 28, 2024
0bf94a3
Delete drz3-drill.txt
simorenoh Sep 28, 2024
85e96e4
Update CHANGELOG.md
simorenoh Sep 28, 2024
1c9d828
Merge branch 'main' into retry_validations
simorenoh Sep 29, 2024
95b8384
Update test_location_cache.py
simorenoh Sep 29, 2024
3fae165
Update _global_endpoint_manager.py
simorenoh Sep 29, 2024
2e34d59
ensure only one initial database account call
simorenoh Sep 29, 2024
0879b0c
Delete dr-zdrill-005.txt
simorenoh Sep 29, 2024
acfcb3c
Update test_location_cache.py
simorenoh Sep 29, 2024
16bda74
Update test_location_cache.py
simorenoh Sep 29, 2024
98df1b4
Update test_location_cache.py
simorenoh Sep 29, 2024
22faf8e
Merge branch 'main' into retry_validations
simorenoh Sep 30, 2024
629e919
overhaul location_cache tests
simorenoh Oct 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,19 @@
### 4.7.1 (Unreleased)

#### Features Added
* SDK will now retry all ServiceRequestErrors (failing outgoing requests) before failing. Default number of retries is 3. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).
* Added Retry Policy for Container Recreate in the Python SDK. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)

#### Breaking Changes

#### Bugs Fixed
* Consolidated Container Properties Cache to be in the Client to cache partition key definition and container rid to avoid unnecessary container reads. See [PR 35731](https://github.com/Azure/azure-sdk-for-python/pull/35731)
* Consolidated Container Properties Cache to be in the Client to cache partition key definition and container rid to avoid unnecessary container reads. See [PR 35731](https://github.com/Azure/azure-sdk-for-python/pull/35731).
* Fixed bug with client hangs when running into WriteForbidden exceptions. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).
* Added retry handling logic for DatabaseAccountNotFound exceptions. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).
* Fixed SDK regex validation that would not allow for item ids to be longer than 255 characters. See [PR 36569](https://github.com/Azure/azure-sdk-for-python/pull/36569).

#### Other Changes
* Getting offer thoughput when it has not been defined in a container will now give a 404/10004 instead of just a 404. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)
* Incomplete Partition Key Extractions in documents for Subpartitioning now gives 400/1001 instead of just a 400. See [PR 36043](https://github.com/Azure/azure-sdk-for-python/pull/36043)

* SDK will now make database account calls every 5 minutes to refresh location cache. See [PR 36514](https://github.com/Azure/azure-sdk-for-python/pull/36514).

### 4.7.0 (2024-05-15)

Expand Down
2 changes: 1 addition & 1 deletion sdk/cosmos/azure-cosmos/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ New releases of this SDK won't support Python 2.x starting January 1st, 2022. Pl

* Azure subscription - [Create a free account][azure_sub]
* Azure [Cosmos DB account][cosmos_account] - SQL API
* [Python 3.6+][python]
* [Python 3.8+][python]

If you need a Cosmos DB SQL API account, you can create one with this [Azure CLI][azure_cli] command:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,4 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument
# is set to false
self.request.route_to_location_with_preferred_location_flag(self.failover_retry_count, False)

# Resolve the endpoint for the request and pin the resolution to the resolved endpoint
# This enables marking the endpoint unavailability on endpoint failover/unreachability
self.location_endpoint = self.global_endpoint_manager.resolve_service_endpoint(self.request)
self.request.route_to_location(self.location_endpoint)
return True
33 changes: 17 additions & 16 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
from . import exceptions
from ._location_cache import LocationCache


# pylint: disable=protected-access


Expand Down Expand Up @@ -89,29 +90,29 @@ def force_refresh(self, database_account):
self.refresh_endpoint_list(database_account)

def refresh_endpoint_list(self, database_account, **kwargs):
with self.refresh_lock:
# if refresh is not needed or refresh is already taking place, return
if not self.refresh_needed:
return
try:
self._refresh_endpoint_list_private(database_account, **kwargs)
except Exception as e:
raise e
if self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms:
self.refresh_needed = True
if self.refresh_needed:
with self.refresh_lock:
# if refresh is not needed or refresh is already taking place, return
if not self.refresh_needed:
return
try:
self._refresh_endpoint_list_private(database_account, **kwargs)
except Exception as e:
raise e

def _refresh_endpoint_list_private(self, database_account=None, **kwargs):
if database_account:
self.location_cache.perform_on_database_account_read(database_account)
self.refresh_needed = False

if (
self.location_cache.should_refresh_endpoints()
and self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms
):
if not database_account:
self.last_refresh_time = self.location_cache.current_time_millis()
else:
if self.location_cache.should_refresh_endpoints() or self.refresh_needed:
self.refresh_needed = False
self.last_refresh_time = self.location_cache.current_time_millis()
database_account = self._GetDatabaseAccount(**kwargs)
self.location_cache.perform_on_database_account_read(database_account)
self.last_refresh_time = self.location_cache.current_time_millis()
self.refresh_needed = False

def _GetDatabaseAccount(self, **kwargs):
"""Gets the database account.
Expand Down
6 changes: 3 additions & 3 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_location_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,17 +200,17 @@ def clear_stale_endpoint_unavailability_info(self):

self.location_unavailability_info_by_endpoint = new_location_unavailability_info

def is_endpoint_unavailable(self, endpoint, expected_available_operations):
def is_endpoint_unavailable(self, endpoint: str, expected_available_operation: str):
unavailability_info = (
self.location_unavailability_info_by_endpoint[endpoint]
if endpoint in self.location_unavailability_info_by_endpoint
else None
)

if (
expected_available_operations == EndpointOperationType.NoneType
expected_available_operation == EndpointOperationType.NoneType
or not unavailability_info
or expected_available_operations not in unavailability_info["operationType"]
or expected_available_operation not in unavailability_info["operationType"]
):
return False

Expand Down
16 changes: 13 additions & 3 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import time
from typing import Optional

from azure.core.exceptions import AzureError, ClientAuthenticationError
from azure.core.exceptions import AzureError, ClientAuthenticationError, ServiceRequestError
from azure.core.pipeline import PipelineRequest
from azure.core.pipeline.policies import RetryPolicy
from azure.core.pipeline.transport._base import HttpRequest
Expand Down Expand Up @@ -124,7 +124,8 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):
except exceptions.CosmosHttpResponseError as e:
retry_policy = defaultRetry_policy
# Re-assign retry policy based on error code
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status == SubStatusCodes.WRITE_FORBIDDEN:
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status in\
[SubStatusCodes.DATABASE_ACCOUNT_NOT_FOUND, SubStatusCodes.WRITE_FORBIDDEN]:
retry_policy = endpointDiscovery_retry_policy
elif e.status_code == StatusCodes.TOO_MANY_REQUESTS:
retry_policy = resourceThrottle_retry_policy
Expand Down Expand Up @@ -161,7 +162,7 @@ def Execute(client, global_endpoint_manager, function, *args, **kwargs):

retry_policy.container_rid = cached_container["_rid"]
request.headers[retry_policy._intended_headers] = retry_policy.container_rid
elif e.status_code in (StatusCodes.REQUEST_TIMEOUT, e.status_code == StatusCodes.SERVICE_UNAVAILABLE):
elif e.status_code in [StatusCodes.REQUEST_TIMEOUT, StatusCodes.SERVICE_UNAVAILABLE]:
retry_policy = timeout_failover_retry_policy

# If none of the retry policies applies or there is no retry needed, set the
Expand Down Expand Up @@ -259,6 +260,15 @@ def send(self, request):
timeout_error.response = response
timeout_error.history = retry_settings['history']
raise
except ServiceRequestError as err:
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
# the request ran into a socket timeout or failed to establish a new connection
# since request wasn't sent, we retry up to however many connection retries are configured (default 3)
if retry_settings['connect'] > 0:
retry_active = self.increment(retry_settings, response=request, error=err)
if retry_active:
self.sleep(retry_settings, request.context.transport)
continue
raise err
except AzureError as err:
retry_error = err
if self._is_method_retryable(retry_settings, request.http_request):
Expand Down
40 changes: 25 additions & 15 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_synchronized_request.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ def _request_body_from_data(data):
if data is None or isinstance(data, str) or _is_readable_stream(data):
return data
if isinstance(data, (dict, list, tuple)):

json_dumped = json.dumps(data, separators=(",", ":"))

return json_dumped
Expand All @@ -70,9 +69,8 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
"""Makes one http request using the requests module.

:param _GlobalEndpointManager global_endpoint_manager:
:param dict request_params:
contains the resourceType, operationType, endpointOverride,
useWriteEndpoint, useAlternateWriteEndpoint information
:param ~azure.cosmos._request_object.RequestObject request_params:
contains information for the request, like the resource_type, operation_type, and endpoint_override
:param documents.ConnectionPolicy connection_policy:
:param azure.core.PipelineClient pipeline_client:
Pipeline client to process the request
Expand All @@ -90,7 +88,8 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
# Every request tries to perform a refresh
client_timeout = kwargs.get('timeout')
start_time = time.time()
global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
if request_params.resource_type != http_constants.ResourceType.DatabaseAccount:
global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
if client_timeout is not None:
kwargs['timeout'] = client_timeout - (time.time() - start_time)
if kwargs['timeout'] <= 0:
Expand All @@ -100,8 +99,8 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
base_url = request_params.endpoint_override
else:
base_url = global_endpoint_manager.resolve_service_endpoint(request_params)
if base_url != pipeline_client._base_url:
request.url = request.url.replace(pipeline_client._base_url, base_url)
if not request.url.startswith(base_url):
request.url = _replace_url_prefix(request.url, base_url)

parse_result = urlparse(request.url)

Expand Down Expand Up @@ -167,20 +166,31 @@ def _Request(global_endpoint_manager, request_params, connection_policy, pipelin
return result, headers


def _replace_url_prefix(original_url, new_prefix):
jeet1995 marked this conversation as resolved.
Show resolved Hide resolved
parts = original_url.split('/', 3)

if not new_prefix.endswith('/'):
new_prefix += '/'

new_url = new_prefix + parts[3] if len(parts) > 3 else new_prefix

return new_url


def _PipelineRunFunction(pipeline_client, request, **kwargs):
# pylint: disable=protected-access

return pipeline_client._pipeline.run(request, **kwargs)

def SynchronizedRequest(
client,
request_params,
global_endpoint_manager,
connection_policy,
pipeline_client,
request,
request_data,
**kwargs
client,
request_params,
global_endpoint_manager,
connection_policy,
pipeline_client,
request,
request_data,
**kwargs
):
"""Performs one synchronized http request according to the parameters.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
from .. import exceptions
from .. import http_constants
from . import _retry_utility_async
from .._synchronized_request import _request_body_from_data
from .._synchronized_request import _request_body_from_data, _replace_url_prefix


async def _Request(global_endpoint_manager, request_params, connection_policy, pipeline_client, request, **kwargs):
"""Makes one http request using the requests module.

:param _GlobalEndpointManager global_endpoint_manager:
:param dict request_params:
contains the resourceType, operationType, endpointOverride,
useWriteEndpoint, useAlternateWriteEndpoint information
:param ~azure.cosmos._request_object.RequestObject request_params:
contains information for the request, like the resource_type, operation_type, and endpoint_override
:param documents.ConnectionPolicy connection_policy:
:param azure.core.PipelineClient pipeline_client:
Pipeline client to process the request
Expand All @@ -58,7 +57,8 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p
# Every request tries to perform a refresh
client_timeout = kwargs.get('timeout')
start_time = time.time()
await global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
if request_params.resource_type != http_constants.ResourceType.DatabaseAccount:
await global_endpoint_manager.refresh_endpoint_list(None, **kwargs)
if client_timeout is not None:
kwargs['timeout'] = client_timeout - (time.time() - start_time)
if kwargs['timeout'] <= 0:
Expand All @@ -68,8 +68,8 @@ async def _Request(global_endpoint_manager, request_params, connection_policy, p
base_url = request_params.endpoint_override
else:
base_url = global_endpoint_manager.resolve_service_endpoint(request_params)
if base_url != pipeline_client._base_url:
request.url = request.url.replace(pipeline_client._base_url, base_url)
if not request.url.startswith(base_url):
request.url = _replace_url_prefix(request.url, base_url)

parse_result = urlparse(request.url)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from .. import exceptions
from .._location_cache import LocationCache


# pylint: disable=protected-access

class _GlobalEndpointManager(object):
Expand Down Expand Up @@ -83,6 +84,8 @@ async def force_refresh(self, database_account):
await self.refresh_endpoint_list(database_account)

async def refresh_endpoint_list(self, database_account, **kwargs):
if self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms:
self.refresh_needed = True
if self.refresh_needed:
async with self.refresh_lock:
# if refresh is not needed or refresh is already taking place, return
Expand All @@ -94,18 +97,16 @@ async def refresh_endpoint_list(self, database_account, **kwargs):
raise e

async def _refresh_endpoint_list_private(self, database_account=None, **kwargs):
self.refresh_needed = False
if database_account:
self.location_cache.perform_on_database_account_read(database_account)
self.refresh_needed = False
self.last_refresh_time = self.location_cache.current_time_millis()
else:
if (
self.location_cache.should_refresh_endpoints()
and
self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms
):
if self.location_cache.should_refresh_endpoints() or self.refresh_needed:
self.refresh_needed = False
self.last_refresh_time = self.location_cache.current_time_millis()
database_account = await self._GetDatabaseAccount(**kwargs)
self.location_cache.perform_on_database_account_read(database_account)
self.last_refresh_time = self.location_cache.current_time_millis()

async def _GetDatabaseAccount(self, **kwargs):
"""Gets the database account.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import asyncio
from typing import Optional

from azure.core.exceptions import AzureError, ClientAuthenticationError
from azure.core.exceptions import AzureError, ClientAuthenticationError, ServiceRequestError
from azure.core.pipeline.policies import AsyncRetryPolicy
from azure.core.pipeline.transport._base import HttpRequest

Expand Down Expand Up @@ -123,7 +123,8 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg
return result
except exceptions.CosmosHttpResponseError as e:
retry_policy = None
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status == SubStatusCodes.WRITE_FORBIDDEN:
if e.status_code == StatusCodes.FORBIDDEN and e.sub_status in \
[SubStatusCodes.DATABASE_ACCOUNT_NOT_FOUND, SubStatusCodes.WRITE_FORBIDDEN]:
retry_policy = endpointDiscovery_retry_policy
elif e.status_code == StatusCodes.TOO_MANY_REQUESTS:
retry_policy = resourceThrottle_retry_policy
Expand Down Expand Up @@ -160,7 +161,7 @@ async def ExecuteAsync(client, global_endpoint_manager, function, *args, **kwarg

retry_policy.container_rid = cached_container["_rid"]
request.headers[retry_policy._intended_headers] = retry_policy.container_rid
elif e.status_code in (StatusCodes.REQUEST_TIMEOUT, e.status_code == StatusCodes.SERVICE_UNAVAILABLE):
elif e.status_code in [StatusCodes.REQUEST_TIMEOUT, StatusCodes.SERVICE_UNAVAILABLE]:
retry_policy = timeout_failover_retry_policy
else:
retry_policy = defaultRetry_policy
Expand Down Expand Up @@ -245,6 +246,15 @@ async def send(self, request):
timeout_error.response = response
timeout_error.history = retry_settings['history']
raise
except ServiceRequestError as err:
jeet1995 marked this conversation as resolved.
Show resolved Hide resolved
# the request ran into a socket timeout or failed to establish a new connection
# since request wasn't sent, we retry up to however many connection retries are configured (default 3)
if retry_settings['connect'] > 0:
simorenoh marked this conversation as resolved.
Show resolved Hide resolved
retry_active = self.increment(retry_settings, response=request, error=err)
if retry_active:
await self.sleep(retry_settings, request.context.transport)
continue
raise err
except AzureError as err:
retry_error = err
if self._is_method_retryable(retry_settings, request.http_request):
Expand Down
1 change: 0 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/cosmos_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ def _build_connection_policy(kwargs: Dict[str, Any]) -> ConnectionPolicy:
"'connection_retry_policy' has been deprecated and will be removed from the SDK in a future release.",
DeprecationWarning
)
connection_retry = policy.ConnectionRetryConfiguration
if not connection_retry:
connection_retry = ConnectionRetryPolicy(
retry_total=total_retries,
Expand Down
3 changes: 3 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/http_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class HttpHeaders:
PartitionKey = "x-ms-documentdb-partitionkey"
EnableCrossPartitionQuery = "x-ms-documentdb-query-enablecrosspartition"
PartitionKeyRangeID = "x-ms-documentdb-partitionkeyrangeid"
PhysicalPartitionId = "x-ms-cosmos-physical-partition-id"
xinlian12 marked this conversation as resolved.
Show resolved Hide resolved
PartitionKeyDeletePending = "x-ms-cosmos-is-partition-key-delete-pending"
StartEpkString = "x-ms-start-epk"
EndEpkString = "x-ms-end-epk"
Expand Down Expand Up @@ -240,6 +241,8 @@ class HttpHeaders:
GlobalCommittedLsn = "x-ms-global-committed-lsn"
NumberOfReadRegions = "x-ms-number-of-read-regions"
TransportRequestId = "x-ms-transport-request-id"
ItemLsn = "x-ms-item-lsn"
CosmosItemLsn = "x-ms-cosmos-item-llsn" # cspell:disable-line
CosmosLsn = "x-ms-cosmos-llsn" # cspell:disable-line
CosmosQuorumAckedLsn = "x-ms-cosmos-quorum-acked-llsn" # cspell:disable-line
RequestDurationMs = "x-ms-request-duration-ms"
Expand Down
Loading