Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subpartitioning Python Cosmos DB SDK #31121

Merged
merged 30 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
80cb361
sub partitioning
bambriz Jul 4, 2023
59d3d9d
Additional Sub Partitioning Updates
bambriz Jul 13, 2023
398bf88
Merge branch 'main' into subpartitioning
bambriz Jul 13, 2023
0bea4a7
remove uneeded line
bambriz Jul 13, 2023
4aa2da4
Merge branch 'subpartitioning' of https://github.com/bambriz/azure-sd…
bambriz Jul 13, 2023
fa7225c
update changelog
bambriz Jul 13, 2023
c61942e
pylint fixes
bambriz Jul 13, 2023
8529239
remove debug code on subpartition test
bambriz Jul 13, 2023
6b6fc2a
Merge remote-tracking branch 'upstream/main' into subpartitioning
bambriz Aug 22, 2023
542793e
Adding support for prefix partition queries
bambriz Sep 12, 2023
6ddc172
pylint and cspell fixes
bambriz Sep 13, 2023
1e7e8d4
Merge branch 'Azure:main' into subpartitioning
bambriz Sep 14, 2023
bf0b519
Additional Updates and fixes
bambriz Sep 14, 2023
9f8a930
removing uneeded lines from test config
bambriz Sep 14, 2023
f8c1346
Test fix
bambriz Sep 14, 2023
1349513
update test crud subpartition
bambriz Sep 14, 2023
5c0ce6d
Update test_config.py
bambriz Sep 15, 2023
e224bae
additional feedback fixes
bambriz Sep 15, 2023
373c171
Merge branch 'subpartitioning' of https://github.com/bambriz/azure-sd…
bambriz Sep 15, 2023
572f154
Fixed Python Version Compatibility
bambriz Sep 15, 2023
78d23e4
Fixed small issue causing tests to fail
bambriz Sep 15, 2023
50c9bbc
Testing fix for subpartitioning
bambriz Sep 15, 2023
209ab93
Update test_crud_subpartition_async.py
simorenoh Sep 15, 2023
ebd097f
Update test_crud_subpartition_async.py
simorenoh Sep 15, 2023
b9cc291
Update dev_requirements.txt
simorenoh Sep 18, 2023
710d48e
Update async test and samples
bambriz Oct 3, 2023
1990bed
Change public method to be private
bambriz Oct 3, 2023
763b021
Added support for prefix query involving multiple over lapping ranges
bambriz Oct 6, 2023
1de4181
Better over lapping support and new over lapping range tests
bambriz Oct 9, 2023
fef66b7
Clarified information in some comments
bambriz Oct 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2552,29 +2552,41 @@ def __GetBodiesFromQueryResult(result):
over_lapping_ranges = self._routing_map_provider.get_overlapping_ranges(id_, [feedrangeEPK])
# It is possible to get more than one over lapping range. We need to get the query results for each one
results = None
# For each over lapping range we will take a sub range of the feed range EPK that overlaps with the over
# lapping physical partition. The EPK sub range will be one of three:
# 1) Will have a range min equal to the feed range EPK min, and a range max equal to the over lapping
# partition
# 2) Will have a range min equal to the over lapping partition range min, and a range max equal to the
# feed range EPK range max.
# 3) will match exactly with the current over lapping physical partition, so we just return the over lapping
# physical partition's partition key id.
for over_lapping_range in over_lapping_ranges:
single_range = routing_range.Range.PartitionKeyRangeToRange(over_lapping_range)
if single_range.min == feedrangeEPK.min and single_range.max == feedrangeEPK.max:
# The EpkRange spans exactly one physical partition
# Since the range min and max are all Upper Cased string Hex Values,
# we can compare the values lexicographically
EPK_sub_range = routing_range.Range(range_min=max(single_range.min, feedrangeEPK.min),
bambriz marked this conversation as resolved.
Show resolved Hide resolved
range_max=min(single_range.max, feedrangeEPK.max),
isMinInclusive=True, isMaxInclusive=False)
if single_range.min == EPK_sub_range.min and EPK_sub_range.max == single_range.max:
# The Epk Sub Range spans exactly one physical partition
# In this case we can route to the physical pk range id
req_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_range["id"]
else:
# The EpkRange spans less than single physical partition
# The Epk Sub Range spans less than a single physical partition
# In this case we route to the physical partition and
# pass the epk range headers to filter within partition
# pass the epk sub range to the headers to filter within partition
req_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_range["id"]
req_headers[http_constants.HttpHeaders.StartEpkString] = feedrangeEPK.min
req_headers[http_constants.HttpHeaders.EndEpkString] = feedrangeEPK.max
req_headers[http_constants.HttpHeaders.StartEpkString] = EPK_sub_range.min
req_headers[http_constants.HttpHeaders.EndEpkString] = EPK_sub_range.max
req_headers[http_constants.HttpHeaders.ReadFeedKeyType] = "EffectivePartitionKeyRange"
r, self.last_response_headers = self.__Post(path, request_params, query, req_headers, **kwargs)
if results:
# add up all the query results from all over lapping ranges
results["Documents"].extend(r["Documents"])
results["_count"] += r["_count"]
else:
results = r
if response_hook:
response_hook(self.last_response_headers, results)
response_hook(self.last_response_headers, r)
bambriz marked this conversation as resolved.
Show resolved Hide resolved
# if the prefix partition query has results lets return it
if results:
return __GetBodiesFromQueryResult(results)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2371,31 +2371,41 @@ def __GetBodiesFromQueryResult(result):
feedrangeEPK = partition_key_definition._get_epk_range_for_prefix_partition_key(partition_key) # cspell:disable-line # pylint: disable=line-too-long
over_lapping_ranges = await self._routing_map_provider.get_overlapping_ranges(id_, [feedrangeEPK])
results = None
# For each over lapping range we will take a sub range of the feed range EPK that overlaps with the over
# lapping physical partition. The EPK sub range will be one of three:
# 1) Will have a range min equal to the feed range EPK min, and a range max equal to the over lapping
# partition
# 2) Will have a range min equal to the over lapping partition range min, and a range max equal to the
# feed range EPK range max.
# 3) will match exactly with the current over lapping physical partition, so we just return the over lapping
# physical partition's partition key id.
for over_lapping_range in over_lapping_ranges:
# It is possible for the over lapping range to include multiple physical partitions
# we should return query results for all the partitions that are overlapped.
single_range = routing_range.Range.PartitionKeyRangeToRange(over_lapping_range)
if single_range.min == feedrangeEPK.min and single_range.max == feedrangeEPK.max:
# The EpkRange spans exactly one physical partition
# Since the range min and max are all Upper Cased string Hex Values,
# we can compare the values lexicographically
EPK_sub_range = routing_range.Range(range_min=max(single_range.min, feedrangeEPK.min),
bambriz marked this conversation as resolved.
Show resolved Hide resolved
range_max=min(single_range.max, feedrangeEPK.max),
isMinInclusive=True, isMaxInclusive=False)
if single_range.min == EPK_sub_range.min and EPK_sub_range.max == single_range.max:
# The Epk Sub Range spans exactly one physical partition
# In this case we can route to the physical pk range id
req_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_range["id"]
else:
# The EpkRange spans less than single physical partition
# The Epk Sub Range spans less than a single physical partition
# In this case we route to the physical partition and
# pass the epk range headers to filter within partition
# pass the epk sub range to the headers to filter within partition
req_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_range["id"]
req_headers[http_constants.HttpHeaders.StartEpkString] = feedrangeEPK.min
req_headers[http_constants.HttpHeaders.EndEpkString] = feedrangeEPK.max
req_headers[http_constants.HttpHeaders.StartEpkString] = EPK_sub_range.min
req_headers[http_constants.HttpHeaders.EndEpkString] = EPK_sub_range.max
req_headers[http_constants.HttpHeaders.ReadFeedKeyType] = "EffectivePartitionKeyRange"
r, self.last_response_headers = await self.__Post(path, request_params, query, req_headers, **kwargs)
if results:
# add up all the query results from all over lapping ranges
results["Documents"].extend(r["Documents"])
results["_count"] += r["_count"]
else:
results = r
if response_hook:
response_hook(self.last_response_headers, results)
response_hook(self.last_response_headers, r)
# if the prefix partition query has results lets return it
if results:
return __GetBodiesFromQueryResult(results)
Expand Down
99 changes: 99 additions & 0 deletions sdk/cosmos/azure-cosmos/test/test_crud_subpartition.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
from azure.core.pipeline.transport import RequestsTransport, RequestsTransportResponse
import azure.cosmos.documents as documents
import azure.cosmos.exceptions as exceptions
from azure.cosmos._routing import routing_range
from azure.cosmos._routing.collection_routing_map import CollectionRoutingMap
from azure.cosmos.http_constants import HttpHeaders, StatusCodes
import test_config
import azure.cosmos.cosmos_client as cosmos_client
Expand Down Expand Up @@ -580,6 +582,103 @@ def test_partitioned_collection_prefix_partition_query(self):
self.assertTrue("Cross partition query is required but disabled"
in error.message)

def test_partition_key_range_overlap(self):
Id = 'id'
bambriz marked this conversation as resolved.
Show resolved Hide resolved
MinInclusive = 'minInclusive'
MaxExclusive = 'maxExclusive'
partitionKeyRanges = \
[
({Id: "2",
MinInclusive: "0000000050",
MaxExclusive: "0000000070"},
2),
({Id: "0",
MinInclusive: "",
MaxExclusive: "0000000030"},
0),
({Id: "1",
MinInclusive: "0000000030",
MaxExclusive: "0000000050"},
1),
({Id: "3",
MinInclusive: "0000000070",
MaxExclusive: "FF"},
3)
]

crm = CollectionRoutingMap.CompleteRoutingMap(partitionKeyRanges, "")

# Case 1: EPK range matches a single entire physical partition
EPK_range_1 = routing_range.Range(range_min="0000000030", range_max="0000000050",
isMinInclusive=True, isMaxInclusive=False)
over_lapping_ranges_1 = crm.get_overlapping_ranges([EPK_range_1])
# Should only have 1 over lapping range
self.assertEqual(len(over_lapping_ranges_1), 1)
# EPK range 1 should be overlapping physical partition 1
self.assertEqual(over_lapping_ranges_1[0][Id], "1")
# Partition 1 and EPK range 1 should have same range min and range max
over_lapping_range_1 = routing_range.Range.PartitionKeyRangeToRange(over_lapping_ranges_1[0])
self.assertEqual(over_lapping_range_1.min, EPK_range_1.min)
self.assertEqual(over_lapping_range_1.max, EPK_range_1.max)

# Case 2: EPK range is a sub range of a single physical partition

EPK_range_2 = routing_range.Range(range_min="0000000035", range_max="0000000045",
isMinInclusive=True, isMaxInclusive=False)
over_lapping_ranges_2 = crm.get_overlapping_ranges([EPK_range_2])
# Should only have 1 over lapping range
self.assertEqual(len(over_lapping_ranges_2), 1)
# EPK range 2 should be overlapping physical partition 1
self.assertEqual(over_lapping_ranges_2[0][Id], "1")
# EPK range 2 min should be higher than over lapping partition and the max should be lower
over_lapping_range_2 = routing_range.Range.PartitionKeyRangeToRange(over_lapping_ranges_2[0])
self.assertLess(over_lapping_range_2.min, EPK_range_2.min)
self.assertLess(EPK_range_2.max, over_lapping_range_2.max)

# Case 3: EPK range partially spans 2 physical partitions

EPK_range_3 = routing_range.Range(range_min="0000000035", range_max="0000000055",
isMinInclusive=True, isMaxInclusive=False)
over_lapping_ranges_3 = crm.get_overlapping_ranges([EPK_range_3])
# Should overlap exactly two partition ranges
self.assertEqual(len(over_lapping_ranges_3), 2)
# EPK range 3 should be over lapping partition 1 and partition 2
self.assertEqual(over_lapping_ranges_3[0][Id], "1")
self.assertEqual(over_lapping_ranges_3[1][Id], "2")
# EPK Range 3 range min should be higher than partition 1's min, but lower than partition 2's, vice versa with max
over_lapping_range_3A = routing_range.Range.PartitionKeyRangeToRange(over_lapping_ranges_3[0])
over_lapping_range_3B = routing_range.Range.PartitionKeyRangeToRange(over_lapping_ranges_3[1])
self.assertLess(over_lapping_range_3A.min, EPK_range_3.min)
self.assertLess(EPK_range_3.min, over_lapping_range_3B.min)
self.assertGreater(EPK_range_3.max, over_lapping_range_3A.max)
self.assertGreater(over_lapping_range_3B.max, EPK_range_3.max)

# Case 4: EPK range spans multiple physical partitions, including entire physical partitions

EPK_range_4 = routing_range.Range(range_min="0000000020", range_max="0000000060",
isMinInclusive=True, isMaxInclusive=False)
over_lapping_ranges_4 = crm.get_overlapping_ranges([EPK_range_4])
# should overlap 3 partitions
self.assertEqual(len(over_lapping_ranges_4), 3)
# EPK range 4 should be over lapping partitions 0, 1, and 2
self.assertEqual(over_lapping_ranges_4[0][Id], "0")
self.assertEqual(over_lapping_ranges_4[1][Id], "1")
self.assertEqual(over_lapping_ranges_4[2][Id], "2")

# individual ranges for each partition
olr_4_a = routing_range.Range.PartitionKeyRangeToRange(over_lapping_ranges_4[0])
olr_4_b = routing_range.Range.PartitionKeyRangeToRange(over_lapping_ranges_4[1])
olr_4_c = routing_range.Range.PartitionKeyRangeToRange(over_lapping_ranges_4[2])
# both EPK range 4 min and max should be greater than partitions 0 min and max
self.assertGreater(EPK_range_4.min, olr_4_a.min)
self.assertGreater(EPK_range_4.max, olr_4_a.max)
# EPK range 4 should contain partition 1's range entirely
self.assertTrue(EPK_range_4.contains(olr_4_b.min))
self.assertTrue(EPK_range_4.contains(olr_4_b.max))
# Both EPK range 4 min and max should be less than partition 2's min and max
self.assertLess(EPK_range_4.min, olr_4_c.min)
self.assertLess(EPK_range_4.max, olr_4_c.max)

# Commenting out delete items by pk until test pipelines support it
# def test_delete_all_items_by_partition_key(self):
# # create database
Expand Down
Loading