Skip to content

Commit

Permalink
Replace Endpoints with Regional Endpoints (#39390)
Browse files Browse the repository at this point in the history
* add new policy, add logic to use policy

* added small test file I was using

* initial regional endpoint work

* groundwork

* re-add AzureError logic, refactor, fix tests

* Update _retry_utility.py

* Updated location_cache with new design

* Fixed key error with most_preferred_location

* Update test_cosmos_http_logging_policy.py

* Update _retry_utility.py

* Added logic to refresh cache on previous endpoint usage

* Added business logic update the regional endpoint based on success or failures

* implementation

* Update _retry_utility_async.py

* fix some tests

* changelog, versions, fixes

* fixes

* fix some tests

* remove fake logic, count fix

* fix some tests

* Update _service_request_retry_policy.py

* Update _retry_utility_async.py

* retry utilities fixing

* Update _retry_utility.py

* additional enhancements

* Update setup.py

* Update _retry_utility_async.py

* add tests, remove previous retry logic for ServiceRequestExceptions

* clean up with finally

* tests

* retry utilities

* disable tests

* add logging to policies

* GetDatabaseAccount Fix

* Update _base.py

* retry utilities fixes

* Update _retry_utility.py

* retry utulities part 34

* Update _service_request_retry_policy.py

* remove extra logs

* policy updates

* Update _service_response_retry_policy.py

* Update _service_response_retry_policy.py

* policies updates and update operation types

* trying out fixes

* Update sdk/cosmos/azure-cosmos/CHANGELOG.md

Co-authored-by: Abhijeet Mohanty <mabhijeet1995@gmail.com>

* Update sdk/cosmos/azure-cosmos/CHANGELOG.md

Co-authored-by: Abhijeet Mohanty <mabhijeet1995@gmail.com>

* Skipped proxy test for debugging

* annotation fix

* Fixed some tests cases

* test fixes

* Update test_service_retry_policies_async.py

* Fixed some mocking behavior

* fixed pylint issues

* Added aiohttp minimum dependency

* Updated changelog and setup.py

* Updated changelog

* Add changelog and fix tests.

* Fix tests

* bootstrapping with global endpoint as previous for writes

* Add headers and cleanup

* cleanup and retry all service request headers

* Don't retry on a none previous

* Updated the business logic with current and previous, fixed database account refresh and some retry policies

* fix client id

* Reacting to comments

* Added print statements and fixed some retry logic

* Revert getDatabase in mark endpoint

* Fixed some pylint and changelog issues

* Fixed version

* fix bug with type check, update tests

* Update test_service_retry_policies_async.py

* sync tests updates

* Reacting to comments and fixing service request retry policy

* Code review comments and pylint issues

* Fixed tests and pylint

* more sync mock tests - missing async copies

* Fixed min aiohttp requirements

* Update _retry_utility_async.py

* Change to check operation type in operations

* push initial GEM mock test

* Update test_service_retry_policies.py

* Fixed extra retries

* sync tests

* Update test_service_retry_policies_async.py

* Fixed extra retries and relevant tests

* Only delay retry by one second

* async tests - need to split up inheritance ones since endpoint unavailable stops extra retries

* Change retry strategy

* add sub-class errors tests

* change old tests, refactoring, fix mocking bleed

* Fix a test

* clear last routed location pythonic

* Removed aiohttp dependency

* catch import errors

* Skipped global endpoint manager test for debugging

* Fixed tests

* Removed skips

* fix live tests and print statements for debugging

* cleanup of few tests

* updated globaldb mock

* Moved some of the high  offer throughput tests to live tests

* Fixed global endpoint retry async test

* Tried fixing global endpoint retry async test

* no swaps on success test

* fix import

* Tried fixing global endpoint retry async test

* Added separate split live tests

* Added live platform matrix

* some test fixes

* Fixed live test pipeline

* Moved test resource id to cosmosLong

* Updated live tests

* Running live tests with proper flag

* testing logging experiments

* fix tests

* honor testmark argument through a safe environment variable, versus accessing the value directly

* more test fixes

* remove accidental log files

* Fixed issues with swapping and retry policies

* Fixed issues with swapping and retry policies

* Marking endpoint as down fix

* more test fixes

* Remove print statements

* Fixed some minor issues with emulator tests

* split change feed tests

* Fixed emulator tests

* updated changelog

* Fixed emulator tests again

* Fixed emulator tests and event loop

* vector/fts query tests

* Fix session token live tests

* hybrid search query fixes

* Fixed live test name

* fallback to regional

* fix ci tests

* Update conftest.py

* Database accounts call will timeout in 5 seconds

* Change timeouts and update docs

* call updates to endpoint policy and location cache

* Health check for endpoitns

* database account retry policy

* Fix parameter error

* Retry on cosmos error fix

* Retry on service request error fix

* None checks for request in retry utilities

* lowercase constructed regional endpoint

* fix global endpoint as unhealthy

* fix parsing test

* Added logic for swapping on health check failed

* Fixed log statement

* fix pylint, docs, and remove print statements

* fix pylint

* fix some tests

* Prepared for release

---------

Co-authored-by: Simon Moreno <30335873+simorenoh@users.noreply.github.com>
Co-authored-by: Kushagra Thapar <kuthapar@microsoft.com>
Co-authored-by: Abhijeet Mohanty <mabhijeet1995@gmail.com>
Co-authored-by: Scott Beddall <scbedd@microsoft.com>
  • Loading branch information
5 people authored Feb 5, 2025
1 parent aee42e7 commit fe6b4c7
Show file tree
Hide file tree
Showing 77 changed files with 2,365 additions and 1,187 deletions.
19 changes: 17 additions & 2 deletions eng/pipelines/templates/steps/build-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,18 @@ steps:
}
Write-Host (Get-Command python).Source
if ($env:TESTMARKARGUMENT) {
$markArg = $env:TESTMARKARGUMENT
}
else {
$markArg = "${{ parameters.TestMarkArgument }}"
}
python scripts/devops_tasks/dispatch_tox.py
"$(TargetingString)"
${{ parameters.AdditionalTestArgs }}
${{ parameters.CoverageArg }}
--mark_arg="${{ parameters.TestMarkArgument }}"
--mark_arg="$markArg"
--service="${{ parameters.ServiceDirectory }}"
--toxenv="${{ parameters.ToxTestEnv }}"
--injected-packages="${{ parameters.InjectedPackages }}"
Expand All @@ -104,10 +111,18 @@ steps:
. $(VENV_LOCATION)/bin/activate.ps1
}
Write-Host (Get-Command python).Source
if ($env:TESTMARKARGUMENT) {
$markArg = $env:TESTMARKARGUMENT
}
else {
$markArg = "${{ parameters.TestMarkArgument }}"
}
python scripts/devops_tasks/dispatch_tox.py "$(TargetingString)" `
${{ parameters.AdditionalTestArgs }} `
${{ parameters.CoverageArg }} `
--mark_arg="${{ parameters.TestMarkArgument }}" `
--mark_arg="$markArg" `
--service="${{ parameters.ServiceDirectory }}" `
--toxenv="${{ parameters.ToxTestEnv }}" `
--injected-packages="${{ parameters.InjectedPackages }}" `
Expand Down
17 changes: 17 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,22 @@
## Release History

### 4.9.1b3 (2025-02-04)

#### Features Added
* Improved retry logic by retrying alternative endpoint for writes within a region before performing a cross region retry. See [PR 39390](https://github.com/Azure/azure-sdk-for-python/pull/39390)
* Added endpoint health check logic during database account calls. See [PR 39390](https://github.com/Azure/azure-sdk-for-python/pull/39390)

#### Bugs Fixed
* Fixed unnecessary retries on the wrong region for timout retry policy. See [PR 39390](https://github.com/Azure/azure-sdk-for-python/pull/39390)
* All client connection errors from aiohttp will be retried. See [PR 39390](https://github.com/Azure/azure-sdk-for-python/pull/39390)

#### Other Changes
* Changed defaults for retry delays. See [PR 39390](https://github.com/Azure/azure-sdk-for-python/pull/39390)
* Changed default connection timeout to be 5 seconds. See [PR 39390](https://github.com/Azure/azure-sdk-for-python/pull/39390)
* Changed default read timeout to be 65 seconds. See [PR 39390](https://github.com/Azure/azure-sdk-for-python/pull/39390)
* On database account calls send a client id header for load balancing. See [PR 39390](https://github.com/Azure/azure-sdk-for-python/pull/39390)
* Removed aiohttp dependency. See [PR 39390](https://github.com/Azure/azure-sdk-for-python/pull/39390)

### 4.9.1b2 (2025-01-24)

#### Features Added
Expand Down
5 changes: 5 additions & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
operation_type: str,
options: Mapping[str, Any],
partition_key_range_id: Optional[str] = None,
client_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Gets HTTP request headers.
Expand All @@ -131,6 +132,7 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
:param str operation_type:
:param dict options:
:param str partition_key_range_id:
:param str client_id:
:return: The HTTP request headers.
:rtype: dict
"""
Expand Down Expand Up @@ -280,6 +282,9 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
if partition_key_range_id is not None:
headers[http_constants.HttpHeaders.PartitionKeyRangeID] = partition_key_range_id

if client_id is not None:
headers[http_constants.HttpHeaders.ClientId] = client_id

if options.get("enableScriptLogging"):
headers[http_constants.HttpHeaders.EnableScriptLogging] = options["enableScriptLogging"]

Expand Down
26 changes: 24 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
"""
import os
import urllib.parse
import uuid
from typing import Callable, Dict, Any, Iterable, List, Mapping, Optional, Sequence, Tuple, Union, cast, Type
from typing_extensions import TypedDict
from urllib3.util.retry import Retry
Expand Down Expand Up @@ -109,7 +110,7 @@ class _QueryCompatibilityMode:
_DefaultStringHashPrecision = 3
_DefaultStringRangePrecision = -1

def __init__(
def __init__( # pylint: disable=too-many-statements
self,
url_connection: str,
auth: CredentialDict,
Expand All @@ -131,6 +132,7 @@ def __init__(
The default consistency policy for client operations.
"""
self.client_id = str(uuid.uuid4())
self.url_connection = url_connection
self.master_key: Optional[str] = None
self.resource_tokens: Optional[Mapping[str, Any]] = None
Expand Down Expand Up @@ -2555,7 +2557,7 @@ def GetDatabaseAccount(
url_connection = self.url_connection

headers = base.GetHeaders(self, self.default_headers, "get", "", "", "",
documents._OperationType.Read,{})
documents._OperationType.Read,{}, client_id=self.client_id)
request_params = RequestObject("databaseaccount", documents._OperationType.Read, url_connection)
result, last_response_headers = self.__Get("", request_params, headers, **kwargs)
self.last_response_headers = last_response_headers
Expand Down Expand Up @@ -2589,6 +2591,26 @@ def GetDatabaseAccount(
response_hook(last_response_headers, result)
return database_account

def _GetDatabaseAccountCheck(
self,
url_connection: Optional[str] = None,
**kwargs: Any
):
"""Gets database account info.
:param str url_connection: the endpoint used to get the database account
:return: The Database Account.
:rtype: documents.DatabaseAccount
"""
if url_connection is None:
url_connection = self.url_connection

headers = base.GetHeaders(self, self.default_headers, "get", "", "", "",
documents._OperationType.Read,{}, client_id=self.client_id)
request_params = RequestObject("databaseaccount", documents._OperationType.Read, url_connection)
self.__Get("", request_params, headers, **kwargs)


def Create(
self,
body: Dict[str, Any],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# The MIT License (MIT)
# Copyright (c) 2014 Microsoft Corporation

# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.

# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.

"""Internal class for database account retry policy implementation in the
Azure Cosmos database service.
"""

class DatabaseAccountRetryPolicy(object):
"""The database account retry policy which should only retry once regardless of errors.
"""

def __init__(self, connection_policy):
self.retry_count = 0
self.retry_after_in_milliseconds = 0
self.max_retry_attempt_count = 1
self.connection_policy = connection_policy

def ShouldRetry(self, exception): # pylint: disable=unused-argument
"""Returns true if the request should retry based on the passed-in exception.
:param exceptions.CosmosHttpResponseError exception:
:returns: a boolean stating whether the request should be retried
:rtype: bool
"""

if self.retry_count >= self.max_retry_attempt_count:
return False

self.retry_count += 1

return True
5 changes: 3 additions & 2 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_default_retry_policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Cosmos database service.
"""
from . import http_constants
from .documents import _OperationType

# pylint: disable=protected-access

Expand Down Expand Up @@ -36,12 +37,12 @@ def __init__(self, *args):
self.current_retry_attempt_count = 0
self.retry_after_in_milliseconds = 1000
self.args = args
self.request = args[0] if args else None

def needsRetry(self, error_code):
if error_code in DefaultRetryPolicy.CONNECTION_ERROR_CODES:
if self.args:
if (self.args[3].method == "GET") or (http_constants.HttpHeaders.IsQuery in self.args[3].headers) \
or (http_constants.HttpHeaders.IsQueryPlanRequest in self.args[3].headers):
if _OperationType.IsReadOnlyOperation(self.request.operation_type):
return True
return False
return True
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,7 @@ def __init__(self, connection_policy, global_endpoint_manager, *args):
self.retry_after_in_milliseconds = EndpointDiscoveryRetryPolicy.Retry_after_in_milliseconds
self.connection_policy = connection_policy
self.request = args[0] if args else None
# clear previous location-based routing directive
if self.request:
self.request.clear_route_to_location()

# Resolve the endpoint for the request and pin the resolution to the resolved endpoint
# This enables marking the endpoint unavailability on endpoint failover/unreachability
self.location_endpoint = self.global_endpoint_manager.resolve_service_endpoint(self.request)
self.request.route_to_location(self.location_endpoint)

def ShouldRetry(self, exception): # pylint: disable=unused-argument
"""Returns true if the request should retry based on the passed-in exception.
Expand All @@ -77,12 +70,16 @@ def ShouldRetry(self, exception): # pylint: disable=unused-argument

self.failover_retry_count += 1

if self.location_endpoint:
if self.request.location_endpoint_to_route:
if _OperationType.IsReadOnlyOperation(self.request.operation_type):
# Mark current read endpoint as unavailable
self.global_endpoint_manager.mark_endpoint_unavailable_for_read(self.location_endpoint)
self.global_endpoint_manager.mark_endpoint_unavailable_for_read(
self.request.location_endpoint_to_route,
True)
else:
self.global_endpoint_manager.mark_endpoint_unavailable_for_write(self.location_endpoint)
self.global_endpoint_manager.mark_endpoint_unavailable_for_write(
self.request.location_endpoint_to_route,
True)

# set the refresh_needed flag to ensure that endpoint list is
# refreshed with new writable and readable locations
Expand Down
79 changes: 41 additions & 38 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_global_endpoint_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
"""

import threading
from urllib.parse import urlparse

from azure.core.exceptions import AzureError

Expand Down Expand Up @@ -64,25 +63,28 @@ def get_refresh_time_interval_in_ms_stub(self):
return constants._Constants.DefaultUnavailableLocationExpirationTime

def get_write_endpoint(self):
return self.location_cache.get_write_endpoint()
return self.location_cache.get_write_regional_endpoint()

def get_read_endpoint(self):
return self.location_cache.get_read_endpoint()
return self.location_cache.get_read_regional_endpoint()

def swap_regional_endpoint_values(self, request):
return self.location_cache.swap_regional_endpoint_values(request)

def resolve_service_endpoint(self, request):
return self.location_cache.resolve_service_endpoint(request)

def mark_endpoint_unavailable_for_read(self, endpoint):
self.location_cache.mark_endpoint_unavailable_for_read(endpoint)
def mark_endpoint_unavailable_for_read(self, endpoint, refresh_cache):
self.location_cache.mark_endpoint_unavailable_for_read(endpoint, refresh_cache)

def mark_endpoint_unavailable_for_write(self, endpoint):
self.location_cache.mark_endpoint_unavailable_for_write(endpoint)
def mark_endpoint_unavailable_for_write(self, endpoint, refresh_cache):
self.location_cache.mark_endpoint_unavailable_for_write(endpoint, refresh_cache)

def get_ordered_write_endpoints(self):
return self.location_cache.get_ordered_write_endpoints()
def get_ordered_write_locations(self):
return self.location_cache.get_ordered_write_locations()

def get_ordered_read_endpoints(self):
return self.location_cache.get_ordered_read_endpoints()
def get_ordered_read_locations(self):
return self.location_cache.get_ordered_read_locations()

def can_use_multiple_write_locations(self, request):
return self.location_cache.can_use_multiple_write_locations_for_request(request)
Expand All @@ -91,6 +93,9 @@ def force_refresh(self, database_account):
self.refresh_needed = True
self.refresh_endpoint_list(database_account)

def update_location_cache(self):
self.location_cache.update_location_cache()

def refresh_endpoint_list(self, database_account, **kwargs):
if self.location_cache.current_time_millis() - self.last_refresh_time > self.refresh_time_interval_in_ms:
self.refresh_needed = True
Expand All @@ -115,6 +120,8 @@ def _refresh_endpoint_list_private(self, database_account=None, **kwargs):
self.last_refresh_time = self.location_cache.current_time_millis()
database_account = self._GetDatabaseAccount(**kwargs)
self.location_cache.perform_on_database_account_read(database_account)
# this will perform getDatabaseAccount calls to check endpoint health
self._endpoints_health_check(**kwargs)

def _GetDatabaseAccount(self, **kwargs):
"""Gets the database account.
Expand All @@ -137,7 +144,7 @@ def _GetDatabaseAccount(self, **kwargs):
# to get that info from any endpoints
except (exceptions.CosmosHttpResponseError, AzureError):
for location_name in self.PreferredLocations:
locational_endpoint = _GlobalEndpointManager.GetLocationalEndpoint(self.DefaultEndpoint, location_name)
locational_endpoint = LocationCache.GetLocationalEndpoint(self.DefaultEndpoint, location_name)
try:
database_account = self._GetDatabaseAccountStub(locational_endpoint, **kwargs)
self._database_account_cache = database_account
Expand All @@ -146,6 +153,28 @@ def _GetDatabaseAccount(self, **kwargs):
pass
raise

def _endpoints_health_check(self, **kwargs):
"""Gets the database account for each endpoint.
Validating if the endpoint is healthy else marking it as unavailable.
"""
all_endpoints = [self.location_cache.read_regional_endpoints[0]]
all_endpoints.extend(self.location_cache.write_regional_endpoints)
count = 0
for endpoint in all_endpoints:
count += 1
if count > 3:
break
try:
self.Client._GetDatabaseAccountCheck(endpoint.get_current(), **kwargs)
except (exceptions.CosmosHttpResponseError, AzureError):
if endpoint in self.location_cache.read_regional_endpoints:
self.mark_endpoint_unavailable_for_read(endpoint.get_current(), False)
if endpoint in self.location_cache.write_regional_endpoints:
self.mark_endpoint_unavailable_for_write(endpoint.get_current(), False)
endpoint.swap()
self.location_cache.update_location_cache()

def _GetDatabaseAccountStub(self, endpoint, **kwargs):
"""Stub for getting database account from the client.
This can be used for mocking purposes as well.
Expand All @@ -155,29 +184,3 @@ def _GetDatabaseAccountStub(self, endpoint, **kwargs):
:rtype: ~azure.cosmos.DatabaseAccount
"""
return self.Client.GetDatabaseAccount(endpoint, **kwargs)

@staticmethod
def GetLocationalEndpoint(default_endpoint, location_name):
# For default_endpoint like 'https://contoso.documents.azure.com:443/' parse it to
# generate URL format. This default_endpoint should be global endpoint(and cannot
# be a locational endpoint) and we agreed to document that
endpoint_url = urlparse(default_endpoint)

# hostname attribute in endpoint_url will return 'contoso.documents.azure.com'
if endpoint_url.hostname is not None:
hostname_parts = str(endpoint_url.hostname).lower().split(".")
if hostname_parts is not None:
# global_database_account_name will return 'contoso'
global_database_account_name = hostname_parts[0]

# Prepare the locational_database_account_name as contoso-EastUS for location_name 'East US'
locational_database_account_name = global_database_account_name + "-" + location_name.replace(" ", "")

# Replace 'contoso' with 'contoso-EastUS' and return locational_endpoint
# as https://contoso-EastUS.documents.azure.com:443/
locational_endpoint = default_endpoint.lower().replace(
global_database_account_name, locational_database_account_name, 1
)
return locational_endpoint

return None
Loading

0 comments on commit fe6b4c7

Please sign in to comment.