Skip to content

Commit

Permalink
Subpartitioning Python Cosmos DB SDK (#31121)
Browse files Browse the repository at this point in the history
* sub partitioning

adding subpartitioning

* Additional Sub Partitioning Updates

Fixes some edge cases. This also adds tests for subpartitioning CRUD operations that match Java SDK as well as some python specific edge cases. This also adds samples for subpartitioning in python.

* remove uneeded line

remove line of code that was used for testing

* update changelog

update changelog to include new feature

* pylint fixes

fixes for pylint issues

* remove debug code on subpartition test

removes left over debugging code on subpartition test

* Adding support for prefix partition queries

Adding support for using incomplete partition key values (prefix partition key) for subpartition queries.

* pylint and cspell fixes

Adds additional fixes related to pylint and cspell

* Additional Updates and fixes

Includes fixes: making methods and properties in partitionkey class private, fix for partitionkeydelete for subpartitioning, and fixes that broke some tests.

* removing uneeded lines from test config

* Test fix

Partitionkey headers expects no spaces in subpartition pks

* update test crud subpartition

* Update test_config.py

* additional feedback fixes

* Fixed Python Version Compatibility

Fixed issue with importing Literal to make it compatible with all python versions

* Fixed small issue causing tests to fail

Fixed issue where self.last_headers wasn't being properly cleared up between tests.

* Testing fix for subpartitioning

Test was using wrong index to get the right pk value from last_headers. This bug only appears when all tests are run together.

* Update test_crud_subpartition_async.py

* Update test_crud_subpartition_async.py

* Update dev_requirements.txt

* Update async test and samples

Updated async test to work with pytest async testing. Updated async samples to match non async samples.

* Change public method to be private

changing get epk range for prefix partition key to be private

* Added support for prefix query involving multiple over lapping ranges

In the case of large databases, a prefix query involving a container with subpartitioning may involve multiple physical partitions. This allows for a prefix query to properly query items from all the partitions that contain the prefix partition keys.

* Better over lapping support and new over lapping range tests

This commit adds better support for the case of a prefix query needing to query multiple physical partitions. It will query each partition with the needed partition key range for each physical partition. New tests were also added to test this functionality.

* Clarified information in some comments

Added a comment explaining the fourth case of what EPK sub range could equal. In that case the epk sub range equals the feed range EPK as it is within the range of a physical partition without spanning the entire physical partition.

---------

Co-authored-by: simorenoh <simonmorenohe@gmail.com>
  • Loading branch information
bambriz and simorenoh authored Oct 10, 2023
1 parent 9bd8c3f commit 37b322f
Show file tree
Hide file tree
Showing 19 changed files with 2,371 additions and 62 deletions.
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
### 4.5.2 (Unreleased)

#### Features Added
* Added Support for Subpartitioning in Python SDK. See [PR 31121](https://github.com/Azure/azure-sdk-for-python/pull/31121)

#### Breaking Changes

Expand Down
100 changes: 51 additions & 49 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,12 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
headers[http_constants.HttpHeaders.PartitionKey] = []
# else serialize using json dumps method which apart from regular values will serialize None into null
else:
headers[http_constants.HttpHeaders.PartitionKey] = json.dumps([options["partitionKey"]])
# single partitioning uses a string and needs to be turned into a list
if isinstance(options["partitionKey"], list) and options["partitionKey"]:
pk_val = json.dumps(options["partitionKey"], separators=(',', ':'))
else:
pk_val = json.dumps([options["partitionKey"]])
headers[http_constants.HttpHeaders.PartitionKey] = pk_val

if options.get("enableCrossPartitionQuery"):
headers[http_constants.HttpHeaders.EnableCrossPartitionQuery] = options["enableCrossPartitionQuery"]
Expand All @@ -224,7 +229,7 @@ def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
headers[http_constants.HttpHeaders.PopulateQueryMetrics] = options["populateQueryMetrics"]

if options.get("responseContinuationTokenLimitInKb"):
headers[http_constants.HttpHeaders.ResponseContinuationTokenLimitInKb] = options["responseContinuationTokenLimitInKb"] # pylint: disable=line-too-long
headers[http_constants.HttpHeaders.ResponseContinuationTokenLimitInKb] = options["responseContinuationTokenLimitInKb"] # pylint: disable=line-too-long

if cosmos_client_connection.master_key:
#formatedate guarantees RFC 1123 date format regardless of current locale
Expand Down Expand Up @@ -606,56 +611,53 @@ def TrimBeginningAndEndingSlashes(path):

# Parses the paths into a list of token each representing a property
def ParsePaths(paths):
if len(paths) != 1:
raise ValueError("Unsupported paths count.")

segmentSeparator = "/"
path = paths[0]
tokens = []
currentIndex = 0

while currentIndex < len(path):
if path[currentIndex] != segmentSeparator:
raise ValueError("Invalid path character at index " + currentIndex)

currentIndex += 1
if currentIndex == len(path):
break

# " and ' are treated specially in the sense that they can have the / (segment separator)
# between them which is considered part of the token
if path[currentIndex] == '"' or path[currentIndex] == "'":
quote = path[currentIndex]
newIndex = currentIndex + 1

while True:
newIndex = path.find(quote, newIndex)
if newIndex == -1:
raise ValueError("Invalid path character at index " + currentIndex)

# check if the quote itself is escaped by a preceding \ in which case it's part of the token
if path[newIndex - 1] != "\\":
break
newIndex += 1

# This will extract the token excluding the quote chars
token = path[currentIndex + 1: newIndex]
tokens.append(token)
currentIndex = newIndex + 1
else:
newIndex = path.find(segmentSeparator, currentIndex)
token = None
if newIndex == -1:
# This will extract the token from currentIndex to end of the string
token = path[currentIndex:]
currentIndex = len(path)
for path in paths:
currentIndex = 0

while currentIndex < len(path):
if path[currentIndex] != segmentSeparator:
raise ValueError("Invalid path character at index " + currentIndex)

currentIndex += 1
if currentIndex == len(path):
break

# " and ' are treated specially in the sense that they can have the / (segment separator)
# between them which is considered part of the token
if path[currentIndex] == '"' or path[currentIndex] == "'":
quote = path[currentIndex]
newIndex = currentIndex + 1

while True:
newIndex = path.find(quote, newIndex)
if newIndex == -1:
raise ValueError("Invalid path character at index " + currentIndex)

# check if the quote itself is escaped by a preceding \ in which case it's part of the token
if path[newIndex - 1] != "\\":
break
newIndex += 1

# This will extract the token excluding the quote chars
token = path[currentIndex + 1: newIndex]
tokens.append(token)
currentIndex = newIndex + 1
else:
# This will extract the token from currentIndex to the char before the segmentSeparator
token = path[currentIndex:newIndex]
currentIndex = newIndex

token = token.strip()
tokens.append(token)
newIndex = path.find(segmentSeparator, currentIndex)
token = None
if newIndex == -1:
# This will extract the token from currentIndex to end of the string
token = path[currentIndex:]
currentIndex = len(path)
else:
# This will extract the token from currentIndex to the char before the segmentSeparator
token = path[currentIndex:newIndex]
currentIndex = newIndex

token = token.strip()
tokens.append(token)

return tokens

Expand Down
76 changes: 73 additions & 3 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,11 @@
from . import _request_object
from . import _synchronized_request as synchronized_request
from . import _global_endpoint_manager as global_endpoint_manager
from ._routing import routing_map_provider
from ._routing import routing_map_provider, routing_range
from ._retry_utility import ConnectionRetryPolicy
from . import _session
from . import _utils
from .partition_key import _Undefined, _Empty
from .partition_key import _Undefined, _Empty, PartitionKey
from ._auth_policy import CosmosBearerTokenCredentialPolicy
from ._cosmos_http_logging_policy import CosmosHttpLoggingPolicy

Expand Down Expand Up @@ -2539,6 +2539,59 @@ def __GetBodiesFromQueryResult(result):
# Query operations will use ReadEndpoint even though it uses POST(for regular query operations)
request_params = _request_object.RequestObject(typ, documents._OperationType.SqlQuery)
req_headers = base.GetHeaders(self, initial_headers, "post", path, id_, typ, options, partition_key_range_id)

#check if query has prefix partition key
isPrefixPartitionQuery = kwargs.pop("isPrefixPartitionQuery", None)
if isPrefixPartitionQuery:
# here get the over lapping ranges
partition_key_definition = kwargs.pop("partitionKeyDefinition", None)
pk_properties = partition_key_definition
partition_key_definition = PartitionKey(path=pk_properties["paths"], kind=pk_properties["kind"])
partition_key_value = pk_properties["partition_key"]
feedrangeEPK = partition_key_definition._get_epk_range_for_prefix_partition_key(partition_key_value) # cspell:disable-line # pylint: disable=line-too-long
over_lapping_ranges = self._routing_map_provider.get_overlapping_ranges(id_, [feedrangeEPK])
# It is possible to get more than one over lapping range. We need to get the query results for each one
results = None
# For each over lapping range we will take a sub range of the feed range EPK that overlaps with the over
# lapping physical partition. The EPK sub range will be one of four:
# 1) Will have a range min equal to the feed range EPK min, and a range max equal to the over lapping
# partition
# 2) Will have a range min equal to the over lapping partition range min, and a range max equal to the
# feed range EPK range max.
# 3) will match exactly with the current over lapping physical partition, so we just return the over lapping
# physical partition's partition key id.
# 4) Will equal the feed range EPK since it is a sub range of a single physical partition
for over_lapping_range in over_lapping_ranges:
single_range = routing_range.Range.PartitionKeyRangeToRange(over_lapping_range)
# Since the range min and max are all Upper Cased string Hex Values,
# we can compare the values lexicographically
EPK_sub_range = routing_range.Range(range_min=max(single_range.min, feedrangeEPK.min),
range_max=min(single_range.max, feedrangeEPK.max),
isMinInclusive=True, isMaxInclusive=False)
if single_range.min == EPK_sub_range.min and EPK_sub_range.max == single_range.max:
# The Epk Sub Range spans exactly one physical partition
# In this case we can route to the physical pk range id
req_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_range["id"]
else:
# The Epk Sub Range spans less than a single physical partition
# In this case we route to the physical partition and
# pass the epk sub range to the headers to filter within partition
req_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_range["id"]
req_headers[http_constants.HttpHeaders.StartEpkString] = EPK_sub_range.min
req_headers[http_constants.HttpHeaders.EndEpkString] = EPK_sub_range.max
req_headers[http_constants.HttpHeaders.ReadFeedKeyType] = "EffectivePartitionKeyRange"
r, self.last_response_headers = self.__Post(path, request_params, query, req_headers, **kwargs)
if results:
# add up all the query results from all over lapping ranges
results["Documents"].extend(r["Documents"])
else:
results = r
if response_hook:
response_hook(self.last_response_headers, r)
# if the prefix partition query has results lets return it
if results:
return __GetBodiesFromQueryResult(results)

result, self.last_response_headers = self.__Post(path, request_params, query, req_headers, **kwargs)

if response_hook:
Expand Down Expand Up @@ -2576,6 +2629,8 @@ def _GetQueryPlanThroughGateway(self, query, resource_link, **kwargs):
is_query_plan=True,
**kwargs)



def __CheckAndUnifyQueryFormat(self, query_body):
"""Checks and unifies the format of the query body.
Expand Down Expand Up @@ -2650,21 +2705,36 @@ def _AddPartitionKey(self, collection_link, document, options):

# Extracts the partition key from the document using the partitionKey definition
def _ExtractPartitionKey(self, partitionKeyDefinition, document):
if partitionKeyDefinition["kind"] == "MultiHash":
ret = []
for partition_key_level in partitionKeyDefinition.get("paths"):
# Parses the paths into a list of token each representing a property
partition_key_parts = base.ParsePaths([partition_key_level])
# Check if the partitionKey is system generated or not
is_system_key = partitionKeyDefinition["systemKey"] if "systemKey" in partitionKeyDefinition else False

# Navigates the document to retrieve the partitionKey specified in the paths
val = self._retrieve_partition_key(partition_key_parts, document, is_system_key)
if val is _Undefined:
break
ret.append(val)
return ret


# Parses the paths into a list of token each representing a property
partition_key_parts = base.ParsePaths(partitionKeyDefinition.get("paths"))
# Check if the partitionKey is system generated or not
is_system_key = partitionKeyDefinition["systemKey"] if "systemKey" in partitionKeyDefinition else False

# Navigates the document to retrieve the partitionKey specified in the paths

return self._retrieve_partition_key(partition_key_parts, document, is_system_key)

# Navigates the document to retrieve the partitionKey specified in the partition key parts
def _retrieve_partition_key(self, partition_key_parts, document, is_system_key):
expected_matchCount = len(partition_key_parts)
matchCount = 0
partitionKey = document

for part in partition_key_parts:
# At any point if we don't find the value of a sub-property in the document, we return as Undefined
if part not in partitionKey:
Expand Down
3 changes: 2 additions & 1 deletion sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
from ..http_constants import StatusCodes
from ..offer import ThroughputProperties
from ._scripts import ScriptsProxy
from ..partition_key import NonePartitionKeyValue
from ..partition_key import NonePartitionKeyValue, PartitionKey

__all__ = ("ContainerProxy",)

Expand Down Expand Up @@ -361,6 +361,7 @@ def query_items(
partition_key = kwargs.pop('partition_key', None)
if partition_key is not None:
feed_options["partitionKey"] = self._set_partition_key(partition_key)
kwargs["containerProperties"] = self._get_properties
else:
feed_options["enableCrossPartitionQuery"] = True
max_integrated_cache_staleness_in_ms = kwargs.pop('max_integrated_cache_staleness_in_ms', None)
Expand Down
Loading

0 comments on commit 37b322f

Please sign in to comment.