Skip to content

Commit

Permalink
Availability: Adds non-blocking cache to partition key ranges (#3080)
Browse files Browse the repository at this point in the history
1. The current implementation is once a refresh occurs all other requests are blocked until the refresh completes. Even if they have 1000 partitions and only 1 is splitting all 999 partitions will be blocked until the refresh completes.
2. If the refresh fails the old cache value is lost. Then the SDK has to reload the entire which cause additional load to the service on the meta data requests.
3. Removed cancellation token being passed in. Most scenario it was already set to none. The cancellation should not be passed into cache because multiple other requests are likely to be waiting on that result. If the first request cancels it would negatively impact the rest of the requests. It's better to let the cache finish then let the request fail after.
  • Loading branch information
j82w authored Mar 14, 2022
1 parent a74c7bf commit 804feea
Show file tree
Hide file tree
Showing 11 changed files with 320 additions and 59 deletions.
2 changes: 0 additions & 2 deletions Microsoft.Azure.Cosmos/src/GatewayStoreModel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKey
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);

if (refreshCache && collectionRoutingMap != null)
Expand All @@ -378,7 +377,6 @@ private static async Task<Tuple<bool, PartitionKeyRange>> TryResolvePartitionKey
collectionRid: collection.ResourceId,
previousValue: collectionRoutingMap,
request: request,
cancellationToken: CancellationToken.None,
NoOpTrace.Singleton);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,16 +121,20 @@ private async Task<ShouldRetryResult> ShouldRetryInternalAsync(
AuthorizationTokenType.PrimaryMasterKey))
{
ContainerProperties collection = await this.collectionCache.ResolveCollectionAsync(request, cancellationToken, this.trace);
CollectionRoutingMap routingMap = await this.partitionKeyRangeCache.TryLookupAsync(collection.ResourceId, null, request, cancellationToken, this.trace);
CollectionRoutingMap routingMap = await this.partitionKeyRangeCache.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
trace: this.trace);

if (routingMap != null)
{
// Force refresh.
await this.partitionKeyRangeCache.TryLookupAsync(
collection.ResourceId,
routingMap,
request,
cancellationToken,
this.trace);
collectionRid: collection.ResourceId,
previousValue: routingMap,
request: request,
trace: this.trace);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,7 +508,6 @@ public override async Task<CollectionRoutingMap> GetRoutingMapAsync(Cancellation
collectionRid,
previousValue: null,
request: null,
cancellationToken,
NoOpTrace.Singleton);

// Not found.
Expand All @@ -523,7 +522,6 @@ public override async Task<CollectionRoutingMap> GetRoutingMapAsync(Cancellation
collectionRid,
previousValue: null,
request: null,
cancellationToken,
NoOpTrace.Singleton);
}

Expand Down
29 changes: 17 additions & 12 deletions Microsoft.Azure.Cosmos/src/Routing/AddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,22 @@ private async Task<ResolutionResult> ResolveAddressesAndIdentityAsync(

ContainerProperties collection = await this.collectionCache.ResolveCollectionAsync(request, cancellationToken, NoOpTrace.Singleton);
CollectionRoutingMap routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collection.ResourceId, null, request, cancellationToken, NoOpTrace.Singleton);
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
trace: NoOpTrace.Singleton);

if (routingMap != null && request.ForceCollectionRoutingMapRefresh)
{
DefaultTrace.TraceInformation(
"AddressResolver.ResolveAddressesAndIdentityAsync ForceCollectionRoutingMapRefresh collection.ResourceId = {0}",
collection.ResourceId);

routingMap = await this.collectionRoutingMapCache.TryLookupAsync(collection.ResourceId, routingMap, request, cancellationToken, NoOpTrace.Singleton);
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: routingMap,
request: request,
trace: NoOpTrace.Singleton);
}

if (request.ForcePartitionKeyRangeRefresh)
Expand All @@ -280,7 +287,11 @@ private async Task<ResolutionResult> ResolveAddressesAndIdentityAsync(
request.ForcePartitionKeyRangeRefresh = false;
if (routingMap != null)
{
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(collection.ResourceId, routingMap, request, cancellationToken, NoOpTrace.Singleton);
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: routingMap,
request: request,
trace: NoOpTrace.Singleton);
}
}

Expand All @@ -293,10 +304,9 @@ private async Task<ResolutionResult> ResolveAddressesAndIdentityAsync(
collectionRoutingMapCacheIsUptoDate = false;
collection = await this.collectionCache.ResolveCollectionAsync(request, cancellationToken, NoOpTrace.Singleton);
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collection.ResourceId,
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: cancellationToken,
trace: NoOpTrace.Singleton);
}

Expand All @@ -319,30 +329,26 @@ private async Task<ResolutionResult> ResolveAddressesAndIdentityAsync(
if (!collectionCacheIsUptoDate)
{
request.ForceNameCacheRefresh = true;
collectionCacheIsUptoDate = true;
collection = await this.collectionCache.ResolveCollectionAsync(request, cancellationToken, NoOpTrace.Singleton);
if (collection.ResourceId != routingMap.CollectionUniqueId)
{
// Collection cache was stale. We resolved to new Rid. routing map cache is potentially stale
// for this new collection rid. Mark it as such.
collectionRoutingMapCacheIsUptoDate = false;
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collection.ResourceId,
collectionRid: collection.ResourceId,
previousValue: null,
request: request,
cancellationToken: cancellationToken,
trace: NoOpTrace.Singleton);
}
}

if (!collectionRoutingMapCacheIsUptoDate)
{
collectionRoutingMapCacheIsUptoDate = true;
routingMap = await this.collectionRoutingMapCache.TryLookupAsync(
collection.ResourceId,
previousValue: routingMap,
request: request,
cancellationToken: cancellationToken,
trace: NoOpTrace.Singleton);
}

Expand Down Expand Up @@ -449,7 +455,6 @@ private async Task<ResolutionResult> TryResolveServerPartitionAsync(
PartitionKeyRange range;
string partitionKeyString = request.Headers[HttpConstants.HttpHeaders.PartitionKey];

object effectivePartitionKeyStringObject = null;
if (partitionKeyString != null)
{
range = AddressResolver.TryResolveServerPartitionByPartitionKey(
Expand All @@ -461,7 +466,7 @@ private async Task<ResolutionResult> TryResolveServerPartitionAsync(
}
else if (request.Properties != null && request.Properties.TryGetValue(
WFConstants.BackendHeaders.EffectivePartitionKeyString,
out effectivePartitionKeyStringObject))
out object effectivePartitionKeyStringObject))
{
// Allow EPK only for partitioned collection (excluding migrated fixed collections)
if (!collection.HasPartitionKey || collection.PartitionKey.IsSystemKey.GetValueOrDefault(false))
Expand Down
7 changes: 5 additions & 2 deletions Microsoft.Azure.Cosmos/src/Routing/GlobalAddressResolver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,11 @@ public async Task OpenAsync(
ContainerProperties collection,
CancellationToken cancellationToken)
{
CollectionRoutingMap routingMap =
await this.routingMapProvider.TryLookupAsync(collection.ResourceId, null, null, cancellationToken, NoOpTrace.Singleton);
CollectionRoutingMap routingMap = await this.routingMapProvider.TryLookupAsync(
collectionRid: collection.ResourceId,
previousValue: null,
request: null,
trace: NoOpTrace.Singleton);

if (routingMap == null)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ Task<CollectionRoutingMap> TryLookupAsync(
string collectionRid,
CollectionRoutingMap previousValue,
DocumentServiceRequest request,
CancellationToken cancellationToken,
ITrace trace);
}
}
98 changes: 68 additions & 30 deletions Microsoft.Azure.Cosmos/src/Routing/PartitionKeyRangeCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal class PartitionKeyRangeCache : IRoutingMapProvider, ICollectionRoutingM
{
private const string PageSizeString = "-1";

private readonly AsyncCache<string, CollectionRoutingMap> routingMapCache;
private readonly AsyncCacheNonBlocking<string, CollectionRoutingMap> routingMapCache;

private readonly ICosmosAuthorizationTokenProvider authorizationTokenProvider;
private readonly IStoreModel storeModel;
Expand All @@ -36,9 +36,8 @@ public PartitionKeyRangeCache(
IStoreModel storeModel,
CollectionCache collectionCache)
{
this.routingMapCache = new AsyncCache<string, CollectionRoutingMap>(
EqualityComparer<CollectionRoutingMap>.Default,
StringComparer.Ordinal);
this.routingMapCache = new AsyncCacheNonBlocking<string, CollectionRoutingMap>(
keyEqualityComparer: StringComparer.Ordinal);
this.authorizationTokenProvider = authorizationTokenProvider;
this.storeModel = storeModel;
this.collectionCache = collectionCache;
Expand All @@ -54,12 +53,19 @@ public virtual async Task<IReadOnlyList<PartitionKeyRange>> TryGetOverlappingRan
{
Debug.Assert(ResourceId.TryParse(collectionRid, out ResourceId collectionRidParsed), "Could not parse CollectionRid from ResourceId.");

CollectionRoutingMap routingMap =
await this.TryLookupAsync(collectionRid, null, null, CancellationToken.None, childTrace);
CollectionRoutingMap routingMap = await this.TryLookupAsync(
collectionRid: collectionRid,
previousValue: null,
request: null,
trace: childTrace);

if (forceRefresh && routingMap != null)
{
routingMap = await this.TryLookupAsync(collectionRid, routingMap, null, CancellationToken.None, childTrace);
routingMap = await this.TryLookupAsync(
collectionRid: collectionRid,
previousValue: routingMap,
request: null,
trace: childTrace);
}

if (routingMap == null)
Expand All @@ -78,15 +84,21 @@ public virtual async Task<PartitionKeyRange> TryGetPartitionKeyRangeByIdAsync(
ITrace trace,
bool forceRefresh = false)
{
ResourceId collectionRidParsed;
Debug.Assert(ResourceId.TryParse(collectionResourceId, out collectionRidParsed), "Could not parse CollectionRid from ResourceId.");
Debug.Assert(ResourceId.TryParse(collectionResourceId, out _), "Could not parse CollectionRid from ResourceId.");

CollectionRoutingMap routingMap =
await this.TryLookupAsync(collectionResourceId, null, null, CancellationToken.None, trace);
CollectionRoutingMap routingMap = await this.TryLookupAsync(
collectionRid: collectionResourceId,
previousValue: null,
request: null,
trace: trace);

if (forceRefresh && routingMap != null)
{
routingMap = await this.TryLookupAsync(collectionResourceId, routingMap, null, CancellationToken.None, trace);
routingMap = await this.TryLookupAsync(
collectionRid: collectionResourceId,
previousValue: routingMap,
request: null,
trace: trace);
}

if (routingMap == null)
Expand All @@ -102,20 +114,19 @@ public virtual async Task<CollectionRoutingMap> TryLookupAsync(
string collectionRid,
CollectionRoutingMap previousValue,
DocumentServiceRequest request,
CancellationToken cancellationToken,
ITrace trace)
{
try
{
return await this.routingMapCache.GetAsync(
collectionRid,
previousValue,
() => this.GetRoutingMapForCollectionAsync(collectionRid,
previousValue,
trace,
request?.RequestContext?.ClientRequestStatistics,
cancellationToken),
CancellationToken.None);
key: collectionRid,
singleValueInitFunc: () => this.GetRoutingMapForCollectionAsync(
collectionRid,
previousValue,
trace,
request?.RequestContext?.ClientRequestStatistics),
forceRefresh: (currentValue) => PartitionKeyRangeCache.ShouldForceRefresh(previousValue, currentValue),
callBackOnForceRefresh: null);
}
catch (DocumentClientException ex)
{
Expand All @@ -139,6 +150,31 @@ public virtual async Task<CollectionRoutingMap> TryLookupAsync(
}
}

private static bool ShouldForceRefresh(
CollectionRoutingMap previousValue,
CollectionRoutingMap currentValue)
{
// Previous is null then no need to force a refresh
// The request didn't access the cache before
if (previousValue == null)
{
return false;
}

// currentValue is null then the value just got initialized so
// is not possible for it to be stale
if (currentValue == null)
{
return false;
}

// CollectionRoutingMap uses changefeed to update the cache. The ChangeFeedNextIfNoneMatch
// is the continuation token for the changefeed operation. If the values do not match
// then another operation has already refresh the cache since this request was sent. So
// there is no reason to do another refresh.
return previousValue.ChangeFeedNextIfNoneMatch == currentValue.ChangeFeedNextIfNoneMatch;
}

public async Task<PartitionKeyRange> TryGetRangeByPartitionKeyRangeIdAsync(string collectionRid,
string partitionKeyRangeId,
ITrace trace,
Expand All @@ -147,10 +183,14 @@ public async Task<PartitionKeyRange> TryGetRangeByPartitionKeyRangeIdAsync(strin
try
{
CollectionRoutingMap routingMap = await this.routingMapCache.GetAsync(
collectionRid,
null,
() => this.GetRoutingMapForCollectionAsync(collectionRid, null, trace, clientSideRequestStatistics, CancellationToken.None),
CancellationToken.None);
key: collectionRid,
singleValueInitFunc: () => this.GetRoutingMapForCollectionAsync(
collectionRid: collectionRid,
previousRoutingMap: null,
trace: trace,
clientSideRequestStatistics: clientSideRequestStatistics),
forceRefresh: (_) => false,
callBackOnForceRefresh: null);

return routingMap.TryGetRangeByPartitionKeyRangeId(partitionKeyRangeId);
}
Expand All @@ -169,11 +209,10 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
string collectionRid,
CollectionRoutingMap previousRoutingMap,
ITrace trace,
IClientSideRequestStatistics clientSideRequestStatistics,
CancellationToken cancellationToken)
IClientSideRequestStatistics clientSideRequestStatistics)
{
List<PartitionKeyRange> ranges = new List<PartitionKeyRange>();
string changeFeedNextIfNoneMatch = previousRoutingMap == null ? null : previousRoutingMap.ChangeFeedNextIfNoneMatch;
string changeFeedNextIfNoneMatch = previousRoutingMap?.ChangeFeedNextIfNoneMatch;

HttpStatusCode lastStatusCode = HttpStatusCode.OK;
do
Expand All @@ -190,8 +229,7 @@ private async Task<CollectionRoutingMap> GetRoutingMapForCollectionAsync(
RetryOptions retryOptions = new RetryOptions();
using (DocumentServiceResponse response = await BackoffRetryUtility<DocumentServiceResponse>.ExecuteAsync(
() => this.ExecutePartitionKeyRangeReadChangeFeedAsync(collectionRid, headers, trace, clientSideRequestStatistics),
new ResourceThrottleRetryPolicy(retryOptions.MaxRetryAttemptsOnThrottledRequests, retryOptions.MaxRetryWaitTimeInSeconds),
cancellationToken))
new ResourceThrottleRetryPolicy(retryOptions.MaxRetryAttemptsOnThrottledRequests, retryOptions.MaxRetryWaitTimeInSeconds)))
{
lastStatusCode = response.StatusCode;
changeFeedNextIfNoneMatch = response.Headers[HttpConstants.HttpHeaders.ETag];
Expand Down
Loading

0 comments on commit 804feea

Please sign in to comment.