diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java index 8d0a3a151c696..f0fa6e06e781d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosAsyncContainer.java @@ -25,6 +25,7 @@ import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosItemResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.PartitionKey; @@ -1124,4 +1125,15 @@ private Mono replaceThroughputInternal(Mono> getFeedRanges() { + return this.getDatabase().getDocClientWrapper().getFeedRanges(getLink()); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java index 2b7379f1cc533..2785afb5669de 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/CosmosContainer.java @@ -10,6 +10,7 @@ import com.azure.cosmos.models.CosmosContainerResponse; import com.azure.cosmos.models.CosmosItemRequestOptions; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.SqlQuerySpec; @@ -556,4 +557,23 @@ private CosmosPagedIterable getCosmosPagedIterable(CosmosPagedFlux cos return UtilBridgeInternal.createCosmosPagedIterable(cosmosPagedFlux); } + /** + * Obtains a list of {@link FeedRange} that can be used to parallelize Feed + * operations. + * + * @return An unmodifiable list of {@link FeedRange} + */ + @Beta(Beta.SinceVersion.V4_9_0) + public List getFeedRanges() { + try { + return asyncContainer.getFeedRanges().block(); + } catch (Exception ex) { + final Throwable throwable = Exceptions.unwrap(ex); + if (throwable instanceof CosmosException) { + throw (CosmosException) throwable; + } else { + throw ex; + } + } + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java index 2837d33269c75..32098f0f6c699 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/AsyncDocumentClient.java @@ -10,6 +10,7 @@ import com.azure.cosmos.implementation.apachecommons.lang.StringUtils; import com.azure.cosmos.models.CosmosItemIdentity; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.PartitionKey; import com.azure.cosmos.models.SqlQuerySpec; @@ -668,6 +669,14 @@ Flux> queryDocumentChangeFeed(String collectionLink, */ Flux> readPartitionKeyRanges(String collectionLink, CosmosQueryRequestOptions options); + /** + * Gets the feed ranges of a container. + * + * @param collectionLink the link to the parent document collection. + * @return a {@link List} of @{link FeedRange} containing the feed ranges of a container. + */ + Mono> getFeedRanges(String collectionLink); + /** * Creates a stored procedure. *

diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java index ec4c55cc48edc..9a7a08ed06752 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ClientRetryPolicy.java @@ -4,11 +4,9 @@ import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; import com.azure.cosmos.implementation.directconnectivity.WebExceptionUtility; -import com.azure.cosmos.BridgeInternal; import com.azure.cosmos.CosmosException; import com.azure.cosmos.CosmosDiagnostics; import com.azure.cosmos.ThrottlingRetryOptions; -import io.netty.handler.timeout.ReadTimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java index 1437feb49a2bb..f1559ce97622e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/Constants.java @@ -201,6 +201,11 @@ public static final class Properties { public static final String KeyWrapMetadataValue = "value"; public static final String EncryptedInfo = "_ei"; + // Feed Ranges + public static final String RANGE = "Range"; + public static final String FEED_RANGE_PARTITION_KEY = "PartitionKey"; + public static final String FEED_RANGE_PARTITION_KEY_RANGE_ID = "PKRangeId"; + } public static final class UrlEncodingInfo { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java index e9ae3b407e2c6..ff0281744bc4d 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/HttpConstants.java @@ -272,6 +272,9 @@ public static class Versions { public static class StatusCodes { public static final int OK = 200; public static final int NOT_MODIFIED = 304; + // Success + public static final int MINIMUM_SUCCESS_STATUSCODE = 200; + public static final int MAXIMUM_SUCCESS_STATUSCODE = 299; // Client error public static final int MINIMUM_STATUSCODE_AS_ERROR_GATEWAY = 400; public static final int BADREQUEST = 400; @@ -323,5 +326,6 @@ public static class SubStatusCodes { public static class HeaderValues { public static final String NO_CACHE = "no-cache"; public static final String PREFER_RETURN_MINIMAL = "return=minimal"; + public static final String IF_NONE_MATCH_ALL = "*"; } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRetryPolicy.java index 741f59b6c381b..def325becc0ab 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/IRetryPolicy.java @@ -43,62 +43,4 @@ public interface IRetryPolicy { void addStatusAndSubStatusCode(Integer index, int statusCode, int subStatusCode); List getStatusAndSubStatusCodes(); - - class ShouldRetryResult { - ///

- /// How long to wait before next retry. 0 indicates retry immediately. - /// - public final Duration backOffTime; - public final Exception exception; - public boolean shouldRetry; - public final Quadruple policyArg; - - private ShouldRetryResult(Duration dur, Exception e, boolean shouldRetry, - Quadruple policyArg) { - this.backOffTime = dur; - this.exception = e; - this.shouldRetry = shouldRetry; - this.policyArg = policyArg; - } - - public static ShouldRetryResult retryAfter(Duration dur) { - Utils.checkNotNullOrThrow(dur, "duration", "cannot be null"); - return new ShouldRetryResult(dur, null, true, null); - } - - public static ShouldRetryResult retryAfter(Duration dur, - Quadruple policyArg) { - Utils.checkNotNullOrThrow(dur, "duration", "cannot be null"); - return new ShouldRetryResult(dur, null, true, policyArg); - } - - public static ShouldRetryResult error(Exception e) { - Utils.checkNotNullOrThrow(e, "exception", "cannot be null"); - return new ShouldRetryResult(null, e, false, null); - } - - public static ShouldRetryResult noRetry() { - return new ShouldRetryResult(null, null, false, null); - } - - public static ShouldRetryResult noRetry(Quadruple policyArg) { - return new ShouldRetryResult( - null, - null, - false, - policyArg); - } - - public void throwIfDoneTrying(Exception capturedException) throws Exception { - if (this.shouldRetry) { - return; - } - - if (this.exception == null) { - throw capturedException; - } else { - throw this.exception; - } - } - } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/PartitionKeyRange.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/PartitionKeyRange.java index 20b44c57dc52e..8483b9b4ca8c1 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/PartitionKeyRange.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/PartitionKeyRange.java @@ -74,16 +74,18 @@ public String getMinInclusive() { return super.getString("minInclusive"); } - public void setMinInclusive(String minInclusive) { + public PartitionKeyRange setMinInclusive(String minInclusive) { BridgeInternal.setProperty(this, "minInclusive", minInclusive); + return this; } public String getMaxExclusive() { return super.getString("maxExclusive"); } - public void setMaxExclusive(String maxExclusive) { + public PartitionKeyRange setMaxExclusive(String maxExclusive) { BridgeInternal.setProperty(this, "maxExclusive", maxExclusive); + return this; } public Range toRange() { diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java index 491058ba56ede..8d0df57f39a99 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RetryUtils.java @@ -28,7 +28,7 @@ static Function, Flux> toRetryWhenFunc(IRetryPolicy policy return Flux.error(t); } policy.captureStartTimeIfNotSet(); - Flux shouldRetryResultFlux = policy.shouldRetry(e).flux(); + Flux shouldRetryResultFlux = policy.shouldRetry(e).flux(); return shouldRetryResultFlux.flatMap(s -> { CosmosException clientException = Utils.as(e, CosmosException.class); if(clientException != null) { @@ -76,7 +76,7 @@ public static Function> toRetryWithAlternateFunc(Function return Mono.error(throwable); } retryPolicy.captureStartTimeIfNotSet(); - Mono shouldRetryResultFlux = retryPolicy.shouldRetry(e); + Mono shouldRetryResultFlux = retryPolicy.shouldRetry(e); return shouldRetryResultFlux.flatMap(shouldRetryResult -> { CosmosException clientException = Utils.as(e, CosmosException.class); if(clientException != null) { @@ -140,7 +140,7 @@ private static Mono recursiveFunc( Function, Mono> callbackMethod, IRetryPolicy retryPolicy, Function, Mono> inBackoffAlternateCallbackMethod, - IRetryPolicy.ShouldRetryResult shouldRetryResult, + ShouldRetryResult shouldRetryResult, Duration minBackoffForInBackoffCallback, RxDocumentServiceRequest rxDocumentServiceRequest, AddressSelector addressSelector) { @@ -153,7 +153,7 @@ private static Function> recursiveWithAlternateFunc( Function, Mono> callbackMethod, IRetryPolicy retryPolicy, Function, Mono> inBackoffAlternateCallbackMethod, - IRetryPolicy.ShouldRetryResult shouldRetryResult, + ShouldRetryResult shouldRetryResult, StopWatch stopwatch, Duration minBackoffForInBackoffCallback, RxDocumentServiceRequest rxDocumentServiceRequest, diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java index a2178ea4376c0..fa68b4c1db91f 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentClientImpl.java @@ -11,6 +11,7 @@ import com.azure.cosmos.ConsistencyLevel; import com.azure.cosmos.CosmosDiagnostics; import com.azure.cosmos.DirectConnectionConfig; +import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; import com.azure.cosmos.implementation.batch.BatchResponseParser; import com.azure.cosmos.implementation.batch.ServerBatchRequest; import com.azure.cosmos.implementation.batch.SinglePartitionKeyServerBatchRequest; @@ -26,6 +27,7 @@ import com.azure.cosmos.implementation.directconnectivity.ServerStoreModel; import com.azure.cosmos.implementation.directconnectivity.StoreClient; import com.azure.cosmos.implementation.directconnectivity.StoreClientFactory; +import com.azure.cosmos.implementation.feedranges.FeedRangePartitionKeyRangeImpl; import com.azure.cosmos.implementation.http.HttpClient; import com.azure.cosmos.implementation.http.HttpClientConfig; import com.azure.cosmos.implementation.http.HttpHeaders; @@ -40,8 +42,10 @@ import com.azure.cosmos.implementation.routing.PartitionKeyAndResourceTokenPair; import com.azure.cosmos.implementation.routing.PartitionKeyInternal; import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; +import com.azure.cosmos.implementation.routing.Range; import com.azure.cosmos.models.CosmosItemIdentity; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.FeedResponse; import com.azure.cosmos.models.ModelBridgeInternal; import com.azure.cosmos.models.PartitionKey; @@ -94,6 +98,9 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorizationTokenProvider, CpuListener, DiagnosticsClientContext { private static final AtomicInteger activeClientsCnt = new AtomicInteger(0); private static final AtomicInteger clientIdGenerator = new AtomicInteger(0); + private static final Range RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES = new Range( + PartitionKeyInternalHelper.MinimumInclusiveEffectivePartitionKey, + PartitionKeyInternalHelper.MaximumExclusiveEffectivePartitionKey, true, false); private static final String DUMMY_SQL_QUERY = "this is dummy and only used in creating " + "ParallelDocumentQueryExecutioncontext, but not used"; @@ -3634,4 +3641,56 @@ private static SqlQuerySpec createLogicalPartitionScanQuerySpec( return new SqlQuerySpec(queryStringBuilder.toString(), parameters); } + + @Override + public Mono> getFeedRanges(String collectionLink) { + + if (StringUtils.isEmpty(collectionLink)) { + throw new IllegalArgumentException("collectionLink"); + } + + RxDocumentServiceRequest request = RxDocumentServiceRequest.create( + this, + OperationType.Query, + ResourceType.Document, + collectionLink, + null); // This should not go to backend + Mono> collectionObs = collectionCache.resolveCollectionAsync(null, + request); + + return collectionObs.flatMap(documentCollectionResourceResponse -> { + final DocumentCollection collection = documentCollectionResourceResponse.v; + if (collection == null) { + throw new IllegalStateException("Collection cannot be null"); + } + + Mono>> valueHolderMono = partitionKeyRangeCache + .tryGetOverlappingRangesAsync( + BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), + collection.getResourceId(), RANGE_INCLUDING_ALL_PARTITION_KEY_RANGES, true, null); + + return valueHolderMono.map(partitionKeyRangeListResponse -> { + return toFeedRanges(partitionKeyRangeListResponse); + }); + }); + } + + private static List toFeedRanges( + Utils.ValueHolder> partitionKeyRangeListValueHolder) { + final List partitionKeyRangeList = partitionKeyRangeListValueHolder.v; + if (partitionKeyRangeList == null) { + throw new IllegalStateException("PartitionKeyRange list cannot be null"); + } + + List feedRanges = new ArrayList(partitionKeyRangeList.size()); + partitionKeyRangeList.forEach(pkRange -> { + feedRanges.add(toFeedRange(pkRange)); + }); + + return feedRanges; + } + + private static FeedRange toFeedRange(PartitionKeyRange pkRange) { + return new FeedRangePartitionKeyRangeImpl(pkRange.getId()); + } } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java index 74fec255cb556..e95c443dd7d67 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/RxDocumentServiceRequest.java @@ -1010,6 +1010,20 @@ public void dispose() { this.isDisposed = true; } + /** + * Gets the request properties. + * + * @return the request properties. + */ + public Map getPropertiesOrThrow() { + if (this.properties == null) { + throw new IllegalStateException( + "Only requests with properties (request options) can be used when using feed ranges"); + } + + return this.properties; + } + private static Map getProperties(Object options) { if (options == null) { return null; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ShouldRetryResult.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ShouldRetryResult.java new file mode 100644 index 0000000000000..2d5b7b188c830 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/ShouldRetryResult.java @@ -0,0 +1,63 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation; + +import java.time.Duration; + +public class ShouldRetryResult { + /// + /// How long to wait before next retry. 0 indicates retry immediately. + /// + public final Duration backOffTime; + public final Exception exception; + public final Quadruple policyArg; + public boolean shouldRetry; + + private ShouldRetryResult(Duration dur, Exception e, boolean shouldRetry, + Quadruple policyArg) { + this.backOffTime = dur; + this.exception = e; + this.shouldRetry = shouldRetry; + this.policyArg = policyArg; + } + + public static ShouldRetryResult error(Exception e) { + Utils.checkNotNullOrThrow(e, "exception", "cannot be null"); + return new ShouldRetryResult(null, e, false, null); + } + + public static ShouldRetryResult noRetry() { + return new ShouldRetryResult(null, null, false, null); + } + + public static ShouldRetryResult noRetry(Quadruple policyArg) { + return new ShouldRetryResult( + null, + null, + false, + policyArg); + } + + public static ShouldRetryResult retryAfter(Duration dur, + Quadruple policyArg) { + Utils.checkNotNullOrThrow(dur, "duration", "cannot be null"); + return new ShouldRetryResult(dur, null, true, policyArg); + } + + public static ShouldRetryResult retryAfter(Duration dur) { + Utils.checkNotNullOrThrow(dur, "duration", "cannot be null"); + return new ShouldRetryResult(dur, null, true, null); + } + + public void throwIfDoneTrying(Exception capturedException) throws Exception { + if (this.shouldRetry) { + return; + } + + if (this.exception == null) { + throw capturedException; + } else { + throw this.exception; + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java index 263f83050b3af..11ebf5ecf86b4 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicy.java @@ -14,6 +14,7 @@ import com.azure.cosmos.implementation.RetryPolicyWithDiagnostics; import com.azure.cosmos.implementation.RetryWithException; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.ShouldRetryResult; import com.azure.cosmos.implementation.apachecommons.lang.tuple.Pair; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/EpkRequestPropertyConstants.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/EpkRequestPropertyConstants.java new file mode 100644 index 0000000000000..33cab77a02376 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/EpkRequestPropertyConstants.java @@ -0,0 +1,9 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +final class EpkRequestPropertyConstants { + public final static String END_EPK_STRING = "x-ms-end-epk-string"; + public final static String START_EPK_STRING = "x-ms-start-epk-string"; +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeAsyncVisitor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeAsyncVisitor.java new file mode 100644 index 0000000000000..9a6f2383a8f09 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeAsyncVisitor.java @@ -0,0 +1,14 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +import reactor.core.publisher.Mono; + +abstract class FeedRangeAsyncVisitor { + public abstract Mono visit(FeedRangePartitionKeyImpl feedRange); + + public abstract Mono visit(FeedRangePartitionKeyRangeImpl feedRange); + + public abstract Mono visit(FeedRangeEpkImpl feedRange); +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeAsyncVisitorWithArg.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeAsyncVisitorWithArg.java new file mode 100644 index 0000000000000..3590a22ee1a84 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeAsyncVisitorWithArg.java @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +import reactor.core.publisher.Mono; + +abstract class FeedRangeAsyncVisitorWithArg { + public abstract Mono visit(FeedRangePartitionKeyImpl feedRange, TArg argument); + + public abstract Mono visit(FeedRangePartitionKeyRangeImpl feedRange, + TArg argument); + + public abstract Mono visit(FeedRangeEpkImpl feedRange, TArg argument); +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeCompositeContinuationImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeCompositeContinuationImpl.java new file mode 100644 index 0000000000000..19e1237c2304f --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeCompositeContinuationImpl.java @@ -0,0 +1,359 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.Integers; +import com.azure.cosmos.implementation.PartitionKeyRange; +import com.azure.cosmos.implementation.RxDocumentClientImpl; +import com.azure.cosmos.implementation.RxDocumentServiceResponse; +import com.azure.cosmos.implementation.ShouldRetryResult; +import com.azure.cosmos.implementation.Strings; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; +import com.azure.cosmos.implementation.directconnectivity.GatewayAddressCache; +import com.azure.cosmos.implementation.query.CompositeContinuationToken; +import com.azure.cosmos.implementation.routing.Range; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; +import java.util.Queue; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +/** + * FeedRangeContinuation using Composite Continuation Tokens and split proof. + * It uses a breath-first approach to transverse Composite Continuation Tokens. + */ +final class FeedRangeCompositeContinuationImpl extends FeedRangeContinuation { + + private final static Logger LOGGER = LoggerFactory.getLogger(FeedRangeCompositeContinuationImpl.class); + private final static ShouldRetryResult NO_RETRY = ShouldRetryResult.noRetry(); + private final static ShouldRetryResult RETRY = ShouldRetryResult.retryAfter(Duration.ZERO); + private final Queue compositeContinuationTokens; + private CompositeContinuationToken currentToken; + private String initialNoResultsRange; + + public FeedRangeCompositeContinuationImpl( + String containerRid, + FeedRangeInternal feedRange, + List> ranges) { + + this(containerRid, feedRange, ranges, null); + } + + public FeedRangeCompositeContinuationImpl( + String containerRid, + FeedRangeInternal feedRange, + List> ranges, + String continuation) { + + this(containerRid, feedRange); + + checkNotNull(ranges, "'ranges' must not be null"); + + if (ranges.size() == 0) { + throw new IllegalArgumentException("'ranges' must not be empty"); + } + + for (Range range : ranges) { + this.compositeContinuationTokens.add( + FeedRangeCompositeContinuationImpl.createCompositeContinuationTokenForRange( + range.getMin(), + range.getMax(), + continuation) + ); + } + + this.currentToken = this.getCompositeContinuationTokens().peek(); + } + + private FeedRangeCompositeContinuationImpl(String containerRid, FeedRangeInternal feedRange) { + super(containerRid, feedRange); + + this.compositeContinuationTokens = new LinkedList<>(); + } + + public Queue getCompositeContinuationTokens() { + return compositeContinuationTokens; + } + + public CompositeContinuationToken getCurrentToken() { + return this.currentToken; + } + + @Override + public FeedRangeInternal getFeedRange() { + if (!(this.feedRange instanceof FeedRangeEpkImpl)) { + return this.feedRange; + } + + if (this.currentToken != null) { + return new FeedRangeEpkImpl(this.currentToken.getRange()); + } + + return null; + } + + @Override + public String getContinuation() { + CompositeContinuationToken tokenSnapshot = this.currentToken; + if (tokenSnapshot == null) { + return null; + } + + return tokenSnapshot.getToken(); + } + + @Override + public void replaceContinuation(final String continuationToken) { + final CompositeContinuationToken continuationTokenSnapshot = this.currentToken; + + if (continuationTokenSnapshot == null) { + return; + } + + continuationTokenSnapshot.setToken(continuationToken); + this.moveToNextToken(); + } + + @Override + public boolean isDone() { + return this.compositeContinuationTokens.size() == 0; + } + + @Override + public void validateContainer(final String containerRid) throws IllegalArgumentException { + if (Strings.isNullOrEmpty(containerRid) || !containerRid.equals(this.getContainerRid())) { + + final String message = String.format( + "The continuation was generated for container %s but current container is %s.", + this.getContainerRid(), containerRid); + throw new IllegalArgumentException(message); + } + } + + @Override + public ShouldRetryResult handleChangeFeedNotModified(final RxDocumentServiceResponse response) { + checkNotNull(response, "Argument 'response' must not be null"); + final int statusCode = response.getStatusCode(); + if (statusCode >= HttpConstants.StatusCodes.MINIMUM_SUCCESS_STATUSCODE + && statusCode <= HttpConstants.StatusCodes.MAXIMUM_SUCCESS_STATUSCODE) { + + this.initialNoResultsRange = null; + return NO_RETRY; + } + + if (statusCode == HttpConstants.StatusCodes.NOT_MODIFIED && this.compositeContinuationTokens.size() > 1) { + + final String eTag = response.getResponseHeaders().get(HttpConstants.HttpHeaders.E_TAG); + if (this.initialNoResultsRange == null) { + + this.initialNoResultsRange = this.currentToken.getRange().getMin(); + this.replaceContinuation(eTag); + return RETRY; + } + + if (!this.initialNoResultsRange.equalsIgnoreCase(this.currentToken.getRange().getMin())) { + this.replaceContinuation(eTag); + return RETRY; + } + } + + return NO_RETRY; + } + + @Override + public Mono handleSplit(final RxDocumentClientImpl client, + final RxDocumentServiceResponse response) { + + checkNotNull(client, "Argument 'client' must not be null"); + checkNotNull(response, "Argument 'response' must not be null"); + + Integer nSubStatus = 0; + final String valueSubStatus = + response.getResponseHeaders().get(HttpConstants.HttpHeaders.SUB_STATUS); + if (!Strings.isNullOrEmpty(valueSubStatus)) { + nSubStatus = Integers.tryParse(valueSubStatus); + } + + final boolean partitionSplit = + response.getStatusCode() == HttpConstants.StatusCodes.GONE && nSubStatus != null + && (nSubStatus == HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE + || nSubStatus == HttpConstants.SubStatusCodes.COMPLETING_SPLIT); + + if (!partitionSplit) { + return Mono.just(NO_RETRY); + } + + final RxPartitionKeyRangeCache partitionKeyRangeCache = client.getPartitionKeyRangeCache(); + final Mono>> resolvedRangesTask = + this.tryGetOverlappingRanges( + partitionKeyRangeCache, this.currentToken.getRange().getMin(), + this.currentToken.getRange().getMax(), + true); + + return resolvedRangesTask.flatMap(resolvedRanges -> { + if (resolvedRanges.v != null && resolvedRanges.v.size() > 0) { + this.createChildRanges(resolvedRanges.v); + } + + return Mono.just(RETRY); + }); + } + + @Override + public void accept(final FeedRangeContinuationVisitor visitor) { + checkNotNull(visitor, "Argument 'visitor' must not be null"); + visitor.visit(this); + } + + /** + * Used for deserializtion only + */ + public static FeedRangeCompositeContinuationImpl createFromDeserializedTokens( + String containerRid, + FeedRangeInternal feedRange, + List deserializedTokens) { + + FeedRangeCompositeContinuationImpl thisPtr = + new FeedRangeCompositeContinuationImpl(containerRid, feedRange); + + checkNotNull(deserializedTokens, "'deserializedTokens' must not be null"); + + if (deserializedTokens.size() == 0) { + throw new IllegalArgumentException("'deserializedTokens' must not be empty"); + } + + thisPtr.compositeContinuationTokens.addAll(deserializedTokens); + + thisPtr.currentToken = thisPtr.getCompositeContinuationTokens().peek(); + + return thisPtr; + } + + public static FeedRangeContinuation parse(final String jsonString) throws IOException { + checkNotNull(jsonString, "Argument 'jsonString' must not be null"); + final ObjectMapper mapper = Utils.getSimpleObjectMapper(); + return mapper.readValue(jsonString, FeedRangeCompositeContinuationImpl.class); + } + + @Override + public String toString() { + try { + return Utils.getSimpleObjectMapper().writeValueAsString(this); + } catch (final IOException e) { + throw new IllegalArgumentException( + "Unable serialize the composite FeedRange continuation token into a JSON string", + e); + } + } + + private void createChildRanges(final List keyRanges) { + final PartitionKeyRange firstRange = keyRanges.get(0); + this.currentToken + .setRange(new Range<>(firstRange.getMinInclusive(), + firstRange.getMaxExclusive(), true, false)); + + final CompositeContinuationToken continuationAsComposite = + tryParseAsCompositeContinuationToken( + this.currentToken.getToken()); + + if (continuationAsComposite != null) { + // Update the internal composite continuation + continuationAsComposite.setRange(this.currentToken.getRange()); + this.currentToken.setToken(continuationAsComposite.toJson()); + // Add children + final int size = keyRanges.size(); + for (int i = 1; i < size; i++) { + final PartitionKeyRange keyRange = keyRanges.get(i); + continuationAsComposite.setRange(keyRange.toRange()); + this.compositeContinuationTokens.add(createCompositeContinuationTokenForRange( + keyRange.getMinInclusive(), keyRange.getMaxExclusive(), + continuationAsComposite.toJson())); + } + } else { + // Add children + final int size = keyRanges.size(); + for (int i = 1; i < size; i++) { + final PartitionKeyRange keyRange = keyRanges.get(i); + this.compositeContinuationTokens.add(createCompositeContinuationTokenForRange( + keyRange.getMinInclusive(), keyRange.getMaxExclusive(), + this.currentToken.getToken())); + } + } + } + + private static CompositeContinuationToken createCompositeContinuationTokenForRange( + String minInclusive, + String maxExclusive, + String token) { + return new CompositeContinuationToken( + token, + new Range<>(minInclusive, maxExclusive, true, false)); + } + + private void moveToNextToken() { + final CompositeContinuationToken recentToken = this.compositeContinuationTokens.poll(); + if (recentToken.getToken() != null) { + // Normal ReadFeed can signal termination by CT null, not NotModified + // Change Feed never lands here, as it always provides a CT + // Consider current range done, if this FeedToken contains multiple ranges due + // to splits, + // all of them need to be considered done + this.compositeContinuationTokens.add(recentToken); + } + + if (this.compositeContinuationTokens.size() > 0) { + this.currentToken = this.compositeContinuationTokens.peek(); + } else { + this.currentToken = null; + } + } + + private Mono>> tryGetOverlappingRanges( + final RxPartitionKeyRangeCache partitionKeyRangeCache, final String min, final String max, + final Boolean forceRefresh) { + + return partitionKeyRangeCache.tryGetOverlappingRangesAsync(null, this.getContainerRid(), + new Range<>(min, max, false, true), forceRefresh, null); + } + + private static CompositeContinuationToken tryParseAsCompositeContinuationToken( + final String providedContinuation) { + + try { + final ObjectMapper mapper = Utils.getSimpleObjectMapper(); + + if (providedContinuation.trim().startsWith("[")) { + final List compositeContinuationTokens = Arrays + .asList(mapper.readValue(providedContinuation, + CompositeContinuationToken[].class)); + + if (compositeContinuationTokens.size() > 0) { + return compositeContinuationTokens.get(0); + } + + return null; + } else if (providedContinuation.trim().startsWith("{")) { + return mapper.readValue(providedContinuation, CompositeContinuationToken.class); + } + + return null; + } catch (final IOException ioError) { + LOGGER.debug( + "Failed to parse as composite continuation token JSON ", + providedContinuation, + ioError); + return null; + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuation.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuation.java new file mode 100644 index 0000000000000..c44fb43c82ddd --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuation.java @@ -0,0 +1,58 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +import com.azure.cosmos.implementation.RxDocumentClientImpl; +import com.azure.cosmos.implementation.RxDocumentServiceResponse; +import com.azure.cosmos.implementation.ShouldRetryResult; +import reactor.core.publisher.Mono; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +public abstract class FeedRangeContinuation { + protected final FeedRangeInternal feedRange; + private final String containerRid; + + // for mocking + protected FeedRangeContinuation() { + this.feedRange = null; + this.containerRid = null; + } + + public FeedRangeContinuation(String containerRid, FeedRangeInternal feedRange) { + checkNotNull(feedRange, "expected non-null feedRange"); + this.feedRange = feedRange; + this.containerRid = containerRid; + } + + public String getContainerRid() { + return this.containerRid; + } + + public FeedRangeInternal getFeedRange() { + return this.feedRange; + } + + public abstract String getContinuation(); + + public abstract void replaceContinuation(String continuationToken); + + public abstract boolean isDone(); + + public abstract void validateContainer(String containerRid); + + /* TODO fabianm - infinite recursion + public static FeedRangeContinuation tryParse(String toStringValue) { + return FeedRangeCompositeContinuationImpl.tryParse(toStringValue); + }*/ + + public abstract ShouldRetryResult handleChangeFeedNotModified( + RxDocumentServiceResponse responseMessage); + + public abstract Mono handleSplit( + RxDocumentClientImpl client, + RxDocumentServiceResponse responseMessage); + + public abstract void accept(FeedRangeContinuationVisitor visitor); +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuationRxDocumentServiceRequestPopulatorVisitorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuationRxDocumentServiceRequestPopulatorVisitorImpl.java new file mode 100644 index 0000000000000..52bf9883ca9bb --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuationRxDocumentServiceRequestPopulatorVisitorImpl.java @@ -0,0 +1,47 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.routing.Range; + +import java.util.Map; +import java.util.function.BiConsumer; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +final class FeedRangeContinuationRxDocumentServiceRequestPopulatorVisitorImpl extends FeedRangeContinuationVisitor { + + private final RxDocumentServiceRequest request; + private final BiConsumer fillContinuation; + + public FeedRangeContinuationRxDocumentServiceRequestPopulatorVisitorImpl( + RxDocumentServiceRequest request, BiConsumer fillContinuation) + { + checkNotNull(request, "'request' must not be null"); + checkNotNull(fillContinuation, "'fillContinuation' must not be null"); + + this.request = request; + this.fillContinuation = fillContinuation; + } + + @Override + public void visit(FeedRangeCompositeContinuationImpl feedRangeCompositeContinuation) { + checkNotNull(feedRangeCompositeContinuation, "'feedRangeCompositeContinuation' must not be null"); + + final Map properties = this.request.getPropertiesOrThrow(); + + // In case EPK has already been set by compute + if (properties.containsKey(EpkRequestPropertyConstants.START_EPK_STRING)) { + return; + } + + final Range range = feedRangeCompositeContinuation.getCurrentToken().getRange(); + + properties.put(EpkRequestPropertyConstants.END_EPK_STRING, range.getMax()); + properties.put(EpkRequestPropertyConstants.START_EPK_STRING, range.getMin()); + + this.fillContinuation.accept(this.request, feedRangeCompositeContinuation.getContinuation()); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuationVisitor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuationVisitor.java new file mode 100644 index 0000000000000..5e625668a2177 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeContinuationVisitor.java @@ -0,0 +1,8 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +abstract class FeedRangeContinuationVisitor { + public abstract void visit(FeedRangeCompositeContinuationImpl feedRangeCompositeContinuation); +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeEpkImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeEpkImpl.java new file mode 100644 index 0000000000000..b14ea8f6fbb83 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeEpkImpl.java @@ -0,0 +1,127 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +import com.azure.cosmos.implementation.Constants; +import com.azure.cosmos.implementation.IRoutingMapProvider; +import com.azure.cosmos.implementation.PartitionKeyRange; +import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; +import com.azure.cosmos.implementation.routing.PartitionKeyInternalHelper; +import com.azure.cosmos.implementation.routing.Range; +import com.azure.cosmos.models.ModelBridgeInternal; +import com.azure.cosmos.models.PartitionKeyDefinition; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +import static com.azure.cosmos.BridgeInternal.setProperty; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +final class FeedRangeEpkImpl extends FeedRangeInternal { + private static final FeedRangeEpkImpl fullRangeEPK = + new FeedRangeEpkImpl(PartitionKeyInternalHelper.FullRange); + + private final Range range; + private final UnmodifiableList> rangeList; + + public FeedRangeEpkImpl(final Range range) { + checkNotNull(range, "Argument 'range' must not be null"); + this.range = range; + final ArrayList> temp = new ArrayList<>(); + temp.add(range); + + this.rangeList = (UnmodifiableList>)UnmodifiableList.unmodifiableList(temp); + } + + public Range getRange() { + return this.range; + } + + public static FeedRangeEpkImpl forFullRange() { + return fullRangeEPK; + } + + @Override + public void accept(final FeedRangeVisitor visitor) { + checkNotNull(visitor, "Argument 'visitor' must not be null"); + visitor.visit(this); + } + + @Override + public void accept(GenericFeedRangeVisitor visitor, TInput input) { + checkNotNull(visitor, "Argument 'visitor' must not be null"); + visitor.visit(this, input); + } + + @Override + public Mono accept(final FeedRangeAsyncVisitor visitor) { + checkNotNull(visitor, "Argument 'visitor' must not be null"); + return visitor.visit(this); + } + + @Override + public Mono>> getEffectiveRanges( + final IRoutingMapProvider routingMapProvider, + final String containerRid, + final PartitionKeyDefinition partitionKeyDefinition) { + + return Mono.just(this.rangeList); + } + + @Override + public Mono> getPartitionKeyRanges( + final IRoutingMapProvider routingMapProvider, + final String containerRid, + final PartitionKeyDefinition partitionKeyDefinition) { + + return routingMapProvider + .tryGetOverlappingRangesAsync( + null, + containerRid, + this.range, + false, + null) + .flatMap(pkRangeHolder -> { + final ArrayList rangeList = new ArrayList<>(); + + if (pkRangeHolder != null) { + final List pkRanges = pkRangeHolder.v; + for (final PartitionKeyRange pkRange : pkRanges) { + rangeList.add(pkRange.getId()); + } + } + + return Mono.just((UnmodifiableList)UnmodifiableList.unmodifiableList(rangeList)); + }); + } + + @Override + public String toString() { + return this.range.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FeedRangeEpkImpl that = (FeedRangeEpkImpl) o; + return Objects.equals(this.range, that.range); + } + + @Override + public int hashCode() { + return Objects.hash(range); + } + + public void populatePropertyBag() { + super.populatePropertyBag(); + + if (this.range != null) { + ModelBridgeInternal.populatePropertyBag(this.range); + setProperty(this, Constants.Properties.RANGE, this.range); + } + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java new file mode 100644 index 0000000000000..23824f6dafa7c --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeInternal.java @@ -0,0 +1,118 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +import com.azure.cosmos.implementation.Constants; +import com.azure.cosmos.implementation.IRoutingMapProvider; +import com.azure.cosmos.implementation.JsonSerializable; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; +import com.azure.cosmos.implementation.routing.PartitionKeyInternal; +import com.azure.cosmos.implementation.routing.Range; +import com.azure.cosmos.models.FeedRange; +import com.azure.cosmos.models.PartitionKeyDefinition; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +import java.io.IOException; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +public abstract class FeedRangeInternal extends JsonSerializable implements FeedRange { + private final static Logger LOGGER = LoggerFactory.getLogger(FeedRangeInternal.class); + + public abstract void accept(FeedRangeVisitor visitor); + + public abstract void accept(GenericFeedRangeVisitor visitor, TInput input); + + public abstract Mono accept(FeedRangeAsyncVisitor visitor); + + public static FeedRangeInternal convert(final FeedRange feedRange) { + checkNotNull(feedRange, "Argument 'feedRange' must not be null"); + if (feedRange instanceof FeedRangeInternal) { + return (FeedRangeInternal)feedRange; + } + + String json = feedRange.toJsonString(); + return fromJsonString(json); + } + + /** + * Creates a range from a previously obtained string representation. + * + * @param json A string representation of a feed range + * @return A feed range + */ + public static FeedRangeInternal fromJsonString(String json) { + FeedRangeInternal parsedRange = FeedRangeInternal.tryParse(json); + + if (parsedRange == null) { + throw new IllegalArgumentException( + String.format( + "The provided string '%s' does not represent any known format.", + json)); + } + + return parsedRange; + } + + public abstract Mono>> getEffectiveRanges( + IRoutingMapProvider routingMapProvider, + String containerRid, + PartitionKeyDefinition partitionKeyDefinition); + + public abstract Mono> getPartitionKeyRanges( + IRoutingMapProvider routingMapProvider, + String containerRid, + PartitionKeyDefinition partitionKeyDefinition); + + public void populatePropertyBag() { + super.populatePropertyBag(); + } + + @Override + public abstract String toString(); + + @Override + public String toJsonString() { + return this.toJson(); + } + + public static FeedRangeInternal tryParse(final String jsonString) { + checkNotNull(jsonString, "Argument 'jsonString' must not be null"); + final ObjectMapper mapper = Utils.getSimpleObjectMapper(); + + try { + JsonNode rootNode = mapper.readTree(jsonString); + + JsonNode rangeNode = rootNode.get(Constants.Properties.RANGE); + if (rangeNode != null && rangeNode.isObject()) { + Range range = new Range<>((ObjectNode)rangeNode); + return new FeedRangeEpkImpl(range); + } + + JsonNode pkNode = rootNode.get(Constants.Properties.FEED_RANGE_PARTITION_KEY); + if (pkNode != null && pkNode.isArray()) { + PartitionKeyInternal pk = mapper.convertValue(pkNode, PartitionKeyInternal.class); + return new FeedRangePartitionKeyImpl(pk); + } + + JsonNode pkRangeIdNode = + rootNode.get(Constants.Properties.FEED_RANGE_PARTITION_KEY_RANGE_ID); + if (pkRangeIdNode != null && pkRangeIdNode.isTextual()) { + return new FeedRangePartitionKeyRangeImpl(pkRangeIdNode.asText()); + } + + return null; + + } catch (final IOException ioError) { + LOGGER.debug("Failed to parse feed range JSON {}", jsonString, ioError); + return null; + } + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangePartitionKeyImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangePartitionKeyImpl.java new file mode 100644 index 0000000000000..ee8bb5d186347 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangePartitionKeyImpl.java @@ -0,0 +1,141 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +import com.azure.cosmos.implementation.Constants; +import com.azure.cosmos.implementation.IRoutingMapProvider; +import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; +import com.azure.cosmos.implementation.routing.PartitionKeyInternal; +import com.azure.cosmos.implementation.routing.Range; +import com.azure.cosmos.models.PartitionKeyDefinition; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.Objects; + +import static com.azure.cosmos.BridgeInternal.setProperty; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +final class FeedRangePartitionKeyImpl extends FeedRangeInternal { + private final PartitionKeyInternal partitionKey; + + public FeedRangePartitionKeyImpl(PartitionKeyInternal partitionKey) { + checkNotNull(partitionKey, "Argument 'partitionKey' must not be null"); + this.partitionKey = partitionKey; + } + + public PartitionKeyInternal getPartitionKeyInternal() { + return this.partitionKey; + } + + @Override + public void accept(FeedRangeVisitor visitor) { + checkNotNull(visitor, "Argument 'visitor' must not be null"); + visitor.visit(this); + } + + @Override + public void accept(GenericFeedRangeVisitor visitor, TInput input) { + checkNotNull(visitor, "Argument 'visitor' must not be null"); + visitor.visit(this, input); + } + + @Override + public Mono accept(FeedRangeAsyncVisitor visitor) { + checkNotNull(visitor, "Argument 'visitor' must not be null"); + return visitor.visit(this); + } + + @Override + public Mono>> getEffectiveRanges( + IRoutingMapProvider routingMapProvider, + String containerRid, + PartitionKeyDefinition partitionKeyDefinition) { + + String effectivePartitionKey = this.partitionKey.getEffectivePartitionKeyString( + this.partitionKey, + partitionKeyDefinition); + Range range = Range.getPointRange(effectivePartitionKey); + ArrayList> rangeList = new ArrayList<>(); + rangeList.add(range); + + return Mono.just((UnmodifiableList>)UnmodifiableList.unmodifiableList(rangeList)); + } + + @Override + public Mono> getPartitionKeyRanges( + IRoutingMapProvider routingMapProvider, + String containerRid, + PartitionKeyDefinition partitionKeyDefinition) { + + String effectivePartitionKey = this.partitionKey.getEffectivePartitionKeyString( + this.partitionKey, + partitionKeyDefinition); + return routingMapProvider + .tryGetOverlappingRangesAsync( + null, + containerRid, + Range.getPointRange(effectivePartitionKey), + false, + null) + .flatMap(pkRangeHolder -> { + ArrayList rangeList = new ArrayList<>(); + + if (pkRangeHolder != null) { + String rangeId = pkRangeHolder.v.get(0).getId(); + rangeList.add(rangeId); + } + + return Mono.just((UnmodifiableList)UnmodifiableList.unmodifiableList(rangeList)); + }); + } + + public void populatePropertyBag() { + super.populatePropertyBag(); + + if (this.partitionKey != null) { + setProperty(this, Constants.Properties.FEED_RANGE_PARTITION_KEY, this.partitionKey); + } + } + + @Override + public String toString() { + return this.partitionKey.toJson(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FeedRangePartitionKeyImpl that = (FeedRangePartitionKeyImpl) o; + return Objects.equals(this.partitionKey, that.partitionKey); + } + + @Override + public int hashCode() { + return Objects.hash(partitionKey); + } + + /* TODO fabianm - not needed yet + private static Mono tryGetRangeByEffectivePartitionKey( + IRoutingMapProvider routingMapProvider, + String containerRid, + String effectivePartitionKey) { + + return routingMapProvider + .tryGetOverlappingRangesAsync( + null, + containerRid, + Range.getPointRange(effectivePartitionKey), + false, + null) + .flatMap((pkRangeHolder) -> { + if (pkRangeHolder == null) { + return Mono.empty(); + } + + return Mono.just(pkRangeHolder.v.get(0)); + }); + }*/ +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangePartitionKeyRangeExtractorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangePartitionKeyRangeExtractorImpl.java new file mode 100644 index 0000000000000..21f2d03c261a7 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangePartitionKeyRangeExtractorImpl.java @@ -0,0 +1,99 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +import com.azure.cosmos.BridgeInternal; +import com.azure.cosmos.implementation.DocumentCollection; +import com.azure.cosmos.implementation.PartitionKeyRange; +import com.azure.cosmos.implementation.ResourceResponse; +import com.azure.cosmos.implementation.RxDocumentClientImpl; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; +import com.azure.cosmos.implementation.caches.RxPartitionKeyRangeCache; +import com.azure.cosmos.implementation.routing.Range; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.List; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +final class FeedRangePartitionKeyRangeExtractorImpl extends FeedRangeAsyncVisitor>> { + + private final RxDocumentClientImpl client; + private final String collectionLink; + + public FeedRangePartitionKeyRangeExtractorImpl( + RxDocumentClientImpl client, + String collectionLink) { + + checkNotNull(client, "'client' must not be null"); + checkNotNull(collectionLink, "'collectionLink' must not be null"); + + this.client = client; + this.collectionLink = collectionLink; + } + + @Override + public Mono>> visit(FeedRangePartitionKeyImpl feedRange) { + final RxPartitionKeyRangeCache partitionKeyRangeCache = + this.client.getPartitionKeyRangeCache(); + final Mono> collectionResponseObservable = this.client + .readCollection(this.collectionLink, null); + + return collectionResponseObservable.flatMap(collectionResponse -> { + final DocumentCollection collection = collectionResponse.getResource(); + return feedRange.getEffectiveRanges(partitionKeyRangeCache, + collection.getResourceId(), + collection.getPartitionKey()); + }); + } + + @Override + public Mono>> visit(FeedRangePartitionKeyRangeImpl feedRange) { + final RxPartitionKeyRangeCache partitionKeyRangeCache = + this.client.getPartitionKeyRangeCache(); + final Mono> collectionResponseObservable = this.client + .readCollection(this.collectionLink, null); + + return collectionResponseObservable.flatMap(collectionResponse -> { + final DocumentCollection collection = collectionResponse.getResource(); + return feedRange.getEffectiveRanges(partitionKeyRangeCache, + collection.getResourceId(), null); + }); + } + + @Override + public Mono>> visit(FeedRangeEpkImpl feedRange) { + final RxPartitionKeyRangeCache partitionKeyRangeCache = + this.client.getPartitionKeyRangeCache(); + final Mono> collectionResponseObservable = this.client + .readCollection(this.collectionLink, null); + + final Mono>> valueHolderMono = + collectionResponseObservable + .flatMap(collectionResponse -> { + final DocumentCollection collection = collectionResponse.getResource(); + return partitionKeyRangeCache.tryGetOverlappingRangesAsync( + BridgeInternal.getMetaDataDiagnosticContext(null), + collection.getResourceId(), + feedRange.getRange(), false, null); + }); + + return valueHolderMono.map(FeedRangePartitionKeyRangeExtractorImpl::toFeedRanges); + } + + private static UnmodifiableList> toFeedRanges( + final Utils.ValueHolder> partitionKeyRangeListValueHolder) { + final List partitionKeyRangeList = partitionKeyRangeListValueHolder.v; + if (partitionKeyRangeList == null) { + throw new IllegalStateException("PartitionKeyRange list cannot be null"); + } + + final List> feedRanges = new ArrayList<>(); + partitionKeyRangeList.forEach(pkRange -> feedRanges.add(pkRange.toRange())); + + return (UnmodifiableList>)UnmodifiableList.unmodifiableList(feedRanges); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangePartitionKeyRangeImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangePartitionKeyRangeImpl.java new file mode 100644 index 0000000000000..cde3f7582e6bb --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangePartitionKeyRangeImpl.java @@ -0,0 +1,150 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +import com.azure.cosmos.implementation.Constants; +import com.azure.cosmos.implementation.IRoutingMapProvider; +import com.azure.cosmos.implementation.PartitionKeyRange; +import com.azure.cosmos.implementation.PartitionKeyRangeGoneException; +import com.azure.cosmos.implementation.Utils.ValueHolder; +import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; +import com.azure.cosmos.implementation.routing.PartitionKeyRangeIdentity; +import com.azure.cosmos.implementation.routing.Range; +import com.azure.cosmos.models.PartitionKeyDefinition; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.Objects; + +import static com.azure.cosmos.BridgeInternal.setProperty; +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +public final class FeedRangePartitionKeyRangeImpl extends FeedRangeInternal { + private final String partitionKeyRangeId; + private final PartitionKeyRangeIdentity partitionKeyRangeIdentity; + + public FeedRangePartitionKeyRangeImpl(final String partitionKeyRangeId) { + checkNotNull(partitionKeyRangeId, "Argument 'partitionKeyRangeId' must not be null"); + this.partitionKeyRangeId = partitionKeyRangeId; + this.partitionKeyRangeIdentity = new PartitionKeyRangeIdentity(partitionKeyRangeId); + } + + public String getPartitionKeyRangeId() { + return this.partitionKeyRangeId; + } + + public PartitionKeyRangeIdentity getPartitionKeyRangeIdentity() { + return this.partitionKeyRangeIdentity; + } + + @Override + public void accept(final FeedRangeVisitor visitor) { + checkNotNull(visitor, "Argument 'visitor' must not be null"); + visitor.visit(this); + } + + @Override + public void accept(GenericFeedRangeVisitor visitor, TInput input) { + checkNotNull(visitor, "Argument 'visitor' must not be null"); + visitor.visit(this, input); + } + + @Override + public Mono accept(final FeedRangeAsyncVisitor visitor) { + checkNotNull(visitor, "Argument 'visitor' must not be null"); + return visitor.visit(this); + } + + @Override + public Mono>> getEffectiveRanges( + final IRoutingMapProvider routingMapProvider, + final String containerRid, + final PartitionKeyDefinition partitionKeyDefinition) { + + final Mono> getPkRangeTask = routingMapProvider + .tryGetPartitionKeyRangeByIdAsync( + null, + containerRid, + partitionKeyRangeId, + false, + null) + .flatMap((pkRangeHolder) -> { + if (pkRangeHolder.v == null) { + return routingMapProvider.tryGetPartitionKeyRangeByIdAsync( + null, + containerRid, + partitionKeyRangeId, + true, + null); + } else { + return Mono.just(pkRangeHolder); + } + }) + .flatMap((pkRangeHolder) -> { + if (pkRangeHolder.v == null) { + return Mono.error( + new PartitionKeyRangeGoneException( + String.format( + "The PartitionKeyRangeId: \"%s\" is not valid for the current " + + "container %s .", + partitionKeyRangeId, + containerRid) + )); + } else { + return Mono.just(pkRangeHolder); + } + }); + + return getPkRangeTask.flatMap((pkRangeHolder) -> { + final ArrayList> temp = new ArrayList<>(); + if (pkRangeHolder != null) { + temp.add(pkRangeHolder.v.toRange()); + } + + return Mono.just((UnmodifiableList>)UnmodifiableList.unmodifiableList(temp)); + }); + } + + @Override + public Mono> getPartitionKeyRanges( + final IRoutingMapProvider routingMapProvider, + final String containerRid, + final PartitionKeyDefinition partitionKeyDefinition) { + + final ArrayList temp = new ArrayList<>(); + temp.add(this.partitionKeyRangeId); + + return Mono.just( + (UnmodifiableList)UnmodifiableList.unmodifiableList(temp)); + } + + public void populatePropertyBag() { + super.populatePropertyBag(); + + if (this.partitionKeyRangeId != null) { + setProperty( + this, + Constants.Properties.FEED_RANGE_PARTITION_KEY_RANGE_ID, + this.partitionKeyRangeId); + } + } + + @Override + public String toString() { + return this.partitionKeyRangeId; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + FeedRangePartitionKeyRangeImpl that = (FeedRangePartitionKeyRangeImpl) o; + return Objects.equals(this.partitionKeyRangeId, that.partitionKeyRangeId); + } + + @Override + public int hashCode() { + return Objects.hash(partitionKeyRangeId); + } +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeRxDocumentServiceRequestPopulatorVisitorImpl.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeRxDocumentServiceRequestPopulatorVisitorImpl.java new file mode 100644 index 0000000000000..c6b16bab413d4 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeRxDocumentServiceRequestPopulatorVisitorImpl.java @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.routing.Range; + +import java.util.Map; + +import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull; + +final class FeedRangeRxDocumentServiceRequestPopulatorVisitorImpl + extends GenericFeedRangeVisitor { + + public final static FeedRangeRxDocumentServiceRequestPopulatorVisitorImpl SINGLETON = + new FeedRangeRxDocumentServiceRequestPopulatorVisitorImpl(); + + private FeedRangeRxDocumentServiceRequestPopulatorVisitorImpl() { + } + + @Override + public void visit(FeedRangeEpkImpl feedRange, + RxDocumentServiceRequest rxDocumentServiceRequest) { + + checkNotNull(feedRange, "'feedRange' must not be null"); + checkNotNull(rxDocumentServiceRequest, "'rxDocumentServiceRequest' must not be null"); + + final Map properties = rxDocumentServiceRequest.getPropertiesOrThrow(); + + // In case EPK has already been set by compute + if (properties.containsKey(EpkRequestPropertyConstants.START_EPK_STRING)) { + return; + } + + final Range range = feedRange.getRange(); + + properties.put(EpkRequestPropertyConstants.END_EPK_STRING, range.getMax()); + properties.put(EpkRequestPropertyConstants.START_EPK_STRING, range.getMin()); + } + + @Override + public void visit(FeedRangePartitionKeyRangeImpl feedRange, + RxDocumentServiceRequest rxDocumentServiceRequest) { + + checkNotNull(feedRange, "'feedRange' must not be null"); + checkNotNull(rxDocumentServiceRequest, "'rxDocumentServiceRequest' must not be null"); + + rxDocumentServiceRequest.routeTo(feedRange.getPartitionKeyRangeIdentity()); + } + + @Override + public void visit(FeedRangePartitionKeyImpl feedRange, + RxDocumentServiceRequest rxDocumentServiceRequest) { + checkNotNull(feedRange, "'feedRange' must not be null"); + checkNotNull(rxDocumentServiceRequest, "'rxDocumentServiceRequest' must not be null"); + + rxDocumentServiceRequest.getHeaders().put( + HttpConstants.HttpHeaders.PARTITION_KEY, + feedRange.getPartitionKeyInternal().toJson()); + rxDocumentServiceRequest.setPartitionKeyInternal(feedRange.getPartitionKeyInternal()); + } +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeTransformer.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeTransformer.java new file mode 100644 index 0000000000000..0c0a20a887582 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeTransformer.java @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +abstract class FeedRangeTransformer { + public abstract TResult visit(FeedRangePartitionKeyImpl feedRange); + + public abstract TResult visit(FeedRangePartitionKeyRangeImpl feedRange); + + public abstract TResult visit(FeedRangeEpkImpl feedRange); +} diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeVisitor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeVisitor.java new file mode 100644 index 0000000000000..f6c142c30e294 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/FeedRangeVisitor.java @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +abstract class FeedRangeVisitor { + public abstract void visit(FeedRangeEpkImpl feedRange); + + public abstract void visit(FeedRangePartitionKeyRangeImpl feedRange); + + public abstract void visit(FeedRangePartitionKeyImpl feedRange); +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/GenericFeedRangeVisitor.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/GenericFeedRangeVisitor.java new file mode 100644 index 0000000000000..d91e81de8ba61 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/feedranges/GenericFeedRangeVisitor.java @@ -0,0 +1,12 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.implementation.feedranges; + +abstract class GenericFeedRangeVisitor { + public abstract void visit(FeedRangeEpkImpl feedRange, TInput input); + + public abstract void visit(FeedRangePartitionKeyRangeImpl feedRange, TInput input); + + public abstract void visit(FeedRangePartitionKeyImpl feedRange, TInput input); +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/CompositeContinuationToken.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/CompositeContinuationToken.java index d30aec45745a9..fcde4efd9335e 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/CompositeContinuationToken.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/CompositeContinuationToken.java @@ -85,7 +85,7 @@ public Range getRange() { * @param token * the token to set */ - private void setToken(String token) { + public void setToken(String token) { BridgeInternal.setProperty(this, TokenPropertyName, token); } @@ -93,7 +93,7 @@ private void setToken(String token) { * @param range * the range to set */ - private void setRange(Range range) { + public void setRange(Range range) { /* TODO: Don't stringify the range */ BridgeInternal.setProperty(this, RangePropertyName, range.toString()); } diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternalHelper.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternalHelper.java index 63bd1389a0217..de4750ece8aaf 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternalHelper.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/routing/PartitionKeyInternalHelper.java @@ -19,6 +19,12 @@ public class PartitionKeyInternalHelper { public static final String MinimumInclusiveEffectivePartitionKey = toHexEncodedBinaryString(PartitionKeyInternal.EmptyPartitionKey.components); public static final String MaximumExclusiveEffectivePartitionKey = toHexEncodedBinaryString(PartitionKeyInternal.InfinityPartitionKey.components); + public static final Range FullRange = new Range( + PartitionKeyInternalHelper.MinimumInclusiveEffectivePartitionKey, + PartitionKeyInternalHelper.MaximumExclusiveEffectivePartitionKey, + true, + false); + static final int MaxPartitionKeyBinarySize = (1 /*type marker */ + 9 /* hash value*/ + diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/FeedRange.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/FeedRange.java new file mode 100644 index 0000000000000..0b9cee7953858 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/models/FeedRange.java @@ -0,0 +1,24 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +package com.azure.cosmos.models; + +import com.azure.cosmos.implementation.feedranges.FeedRangeInternal; +import com.azure.cosmos.util.Beta; + +import java.io.IOException; + +@Beta(Beta.SinceVersion.V4_9_0) +public interface FeedRange { + /** + * Creates a range from a previously obtained string representation. + * + * @param json A string representation of a feed range + * @return A feed range + */ + public static FeedRange fromJsonString(String json) { + return FeedRangeInternal.fromJsonString(json); + } + + public String toJsonString(); +} \ No newline at end of file diff --git a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java index c981a2fe3f113..ac92720730665 100644 --- a/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java +++ b/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/Beta.java @@ -48,5 +48,7 @@ public enum SinceVersion { V4_7_0, /** v4.8.0 */ V4_8_0, + /** v4.9.0 */ + V4_9_0, } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerTest.java index 39c7f7f9d1292..272614e48a531 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/CosmosContainerTest.java @@ -11,6 +11,7 @@ import com.azure.cosmos.models.CosmosContainerRequestOptions; import com.azure.cosmos.models.CosmosContainerResponse; import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.FeedRange; import com.azure.cosmos.models.IndexingMode; import com.azure.cosmos.models.IndexingPolicy; import com.azure.cosmos.models.SqlQuerySpec; @@ -23,6 +24,7 @@ import org.testng.annotations.Factory; import org.testng.annotations.Test; +import java.util.List; import java.util.UUID; import static org.assertj.core.api.Assertions.assertThat; @@ -176,6 +178,25 @@ public void readContainer() throws Exception { validateContainerResponse(containerProperties, read1); } + @Test(groups = { "emulator" }, timeOut = TIMEOUT) + public void getFeedRanges() throws Exception { + String collectionName = UUID.randomUUID().toString(); + CosmosContainerProperties containerProperties = getCollectionDefinition(collectionName); + CosmosContainerRequestOptions options = new CosmosContainerRequestOptions(); + + CosmosContainerResponse containerResponse = createdDatabase.createContainer(containerProperties); + + CosmosContainer syncContainer = createdDatabase.getContainer(collectionName); + + List feedRanges = syncContainer.getFeedRanges(); + assertThat(feedRanges) + .isNotNull() + .hasSize(1); + assertThat(feedRanges.get(0).toJsonString()) + .isNotNull() + .isEqualTo("{\"PKRangeId\":\"0\"}"); + } + @Test(groups = { "emulator" }, timeOut = TIMEOUT) public void deleteContainer() throws Exception { String collectionName = UUID.randomUUID().toString(); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java index 9e0e52126cf14..ed1ffddf597ba 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/RetryContextOnDiagnosticTest.java @@ -17,6 +17,7 @@ import com.azure.cosmos.implementation.RxDocumentServiceRequest; import com.azure.cosmos.implementation.RxDocumentServiceResponse; import com.azure.cosmos.implementation.RxStoreModel; +import com.azure.cosmos.implementation.ShouldRetryResult; import com.azure.cosmos.implementation.TestConfigurations; import com.azure.cosmos.implementation.directconnectivity.AddressSelector; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; @@ -251,7 +252,7 @@ public Mono shouldRetry(Exception e) { if (noRetry) { return Mono.just(ShouldRetryResult.noRetry()); } - return Mono.just(IRetryPolicy.ShouldRetryResult.retryAfter(Duration.ofSeconds(2))); + return Mono.just(ShouldRetryResult.retryAfter(Duration.ofSeconds(2))); } @Override diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java index 04faf9151b2ef..1c5096fbfbe6c 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ClientRetryPolicyTest.java @@ -41,7 +41,7 @@ public void networkFailureOnRead() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { - Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); + Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); validateSuccess(shouldRetry, ShouldRetryValidator.builder() .nullException() @@ -76,7 +76,7 @@ public void tcpNetworkFailureOnRead() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { - Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); + Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); if (i < 2) { validateSuccess(shouldRetry, ShouldRetryValidator.builder() @@ -114,7 +114,7 @@ public void networkFailureOnWrite() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { - Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); + Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); validateSuccess(shouldRetry, ShouldRetryValidator.builder() .nullException() .shouldRetry(false) @@ -146,7 +146,7 @@ public void tcpNetworkFailureOnWrite() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { - Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); + Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); // We don't want to retry writes on network failure with non retriable exception validateSuccess(shouldRetry, ShouldRetryValidator.builder() .nullException() @@ -167,7 +167,7 @@ public void tcpNetworkFailureOnWrite() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { - Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); + Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); // We want to retry writes on network failure with retriable exception if (i < 2) { validateSuccess(shouldRetry, ShouldRetryValidator.builder() @@ -205,7 +205,7 @@ public void networkFailureOnUpsert() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { - Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); + Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); validateSuccess(shouldRetry, ShouldRetryValidator.builder() .nullException() .shouldRetry(false) @@ -236,7 +236,7 @@ public void tcpNetworkFailureOnUpsert() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { - Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); + Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); // We don't want to retry writes on network failure with non retriable exception validateSuccess(shouldRetry, ShouldRetryValidator.builder() .nullException() @@ -266,7 +266,7 @@ public void networkFailureOnDelete() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { - Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); + Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); validateSuccess(shouldRetry, ShouldRetryValidator.builder() .nullException() .shouldRetry(false) @@ -297,7 +297,7 @@ public void tcpNetworkFailureOnDelete() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { - Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); + Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); // We don't want to retry writes on network failure with non retriable exception validateSuccess(shouldRetry, ShouldRetryValidator.builder() .nullException() @@ -330,7 +330,7 @@ public void httpNetworkFailureOnQueryPlan() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { - Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); + Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); if (i < 3) { validateSuccess(shouldRetry, ShouldRetryValidator.builder() @@ -372,7 +372,7 @@ public void httpNetworkFailureOnAddressRefresh() throws Exception { clientRetryPolicy.onBeforeSendRequest(dsr); for (int i = 0; i < 10; i++) { - Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); + Mono shouldRetry = clientRetryPolicy.shouldRetry(cosmosException); if (i < 3) { validateSuccess(shouldRetry, ShouldRetryValidator.builder() @@ -407,7 +407,7 @@ public void onBeforeSendRequestNotInvoked() { OperationType.Create, "/dbs/db/colls/col/docs/docId", ResourceType.Document); dsr.requestContext = Mockito.mock(DocumentServiceRequestContext.class); - Mono shouldRetry = clientRetryPolicy.shouldRetry(exception); + Mono shouldRetry = clientRetryPolicy.shouldRetry(exception); validateSuccess(shouldRetry, ShouldRetryValidator.builder() .withException(exception) .shouldRetry(false) @@ -416,16 +416,16 @@ public void onBeforeSendRequestNotInvoked() { Mockito.verifyZeroInteractions(endpointManager); } - public static void validateSuccess(Mono single, + public static void validateSuccess(Mono single, ShouldRetryValidator validator) { validateSuccess(single, validator, TIMEOUT); } - public static void validateSuccess(Mono single, + public static void validateSuccess(Mono single, ShouldRetryValidator validator, long timeout) { - TestSubscriber testSubscriber = new TestSubscriber<>(); + TestSubscriber testSubscriber = new TestSubscriber<>(); single.flux().subscribe(testSubscriber); testSubscriber.awaitTerminalEvent(timeout, TimeUnit.MILLISECONDS); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicyTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicyTest.java index 0797ba34fffdd..06f5a6513428f 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RenameCollectionAwareClientRetryPolicyTest.java @@ -14,7 +14,6 @@ import static com.azure.cosmos.implementation.ClientRetryPolicyTest.validateSuccess; import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext; import static org.assertj.core.api.Assertions.assertThat; -import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext; public class RenameCollectionAwareClientRetryPolicyTest { @@ -39,7 +38,7 @@ public void onBeforeSendRequestNotInvoked() { OperationType.Create, "/dbs/db/colls/col/docs/docId", ResourceType.Document); dsr.requestContext = Mockito.mock(DocumentServiceRequestContext.class); - Mono shouldRetry = + Mono shouldRetry = renameCollectionAwareClientRetryPolicy.shouldRetry(exception); validateSuccess(shouldRetry, ShouldRetryValidator.builder() .withException(exception) @@ -67,7 +66,7 @@ public void shouldRetryWithNotFoundStatusCode() { NotFoundException notFoundException = new NotFoundException(); - Mono singleShouldRetry = renameCollectionAwareClientRetryPolicy + Mono singleShouldRetry = renameCollectionAwareClientRetryPolicy .shouldRetry(notFoundException); validateSuccess(singleShouldRetry, ShouldRetryValidator.builder() .withException(notFoundException) @@ -101,7 +100,7 @@ public void shouldRetryWithNotFoundStatusCodeAndReadSessionNotAvailableSubStatus Mockito.when(rxClientCollectionCache.resolveCollectionAsync(BridgeInternal.getMetaDataDiagnosticContext(request.requestContext.cosmosDiagnostics), request)).thenReturn(Mono.just(new Utils.ValueHolder<>(documentCollection))); - Mono singleShouldRetry = renameCollectionAwareClientRetryPolicy + Mono singleShouldRetry = renameCollectionAwareClientRetryPolicy .shouldRetry(notFoundException); validateSuccess(singleShouldRetry, ShouldRetryValidator.builder() .nullException() @@ -128,9 +127,9 @@ public void shouldRetryWithGenericException() { request.requestContext = Mockito.mock(DocumentServiceRequestContext.class); renameCollectionAwareClientRetryPolicy.onBeforeSendRequest(request); - Mono singleShouldRetry = renameCollectionAwareClientRetryPolicy + Mono singleShouldRetry = renameCollectionAwareClientRetryPolicy .shouldRetry(new BadRequestException()); - IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isFalse(); } } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java index da35a459cec37..de77eb4e6117d 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/RetryUtilsTest.java @@ -2,7 +2,6 @@ // Licensed under the MIT License. package com.azure.cosmos.implementation; -import com.azure.cosmos.implementation.IRetryPolicy.ShouldRetryResult; import com.azure.cosmos.implementation.directconnectivity.AddressSelector; import com.azure.cosmos.implementation.directconnectivity.StoreResponse; import com.azure.cosmos.implementation.directconnectivity.StoreResponseValidator; diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ShouldRetryValidator.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ShouldRetryValidator.java index 4eab46b76dc9a..757f754b045f1 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ShouldRetryValidator.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/ShouldRetryValidator.java @@ -13,7 +13,7 @@ */ public interface ShouldRetryValidator { - void validate(IRetryPolicy.ShouldRetryResult shouldRetryResult); + void validate(ShouldRetryResult shouldRetryResult); static Builder builder() { return new Builder(); @@ -26,7 +26,7 @@ public ShouldRetryValidator build() { return new ShouldRetryValidator() { @Override - public void validate(IRetryPolicy.ShouldRetryResult shouldRetryResult) { + public void validate(ShouldRetryResult shouldRetryResult) { for (ShouldRetryValidator validator : validators) { validator.validate(shouldRetryResult); } @@ -38,7 +38,7 @@ public Builder nullException() { validators.add(new ShouldRetryValidator() { @Override - public void validate(IRetryPolicy.ShouldRetryResult shouldRetryResult) { + public void validate(ShouldRetryResult shouldRetryResult) { assertThat(shouldRetryResult.exception).isNull(); } }); @@ -49,7 +49,7 @@ public Builder hasException() { validators.add(new ShouldRetryValidator() { @Override - public void validate(IRetryPolicy.ShouldRetryResult shouldRetryResult) { + public void validate(ShouldRetryResult shouldRetryResult) { assertThat(shouldRetryResult.exception).isNotNull(); } }); @@ -60,7 +60,7 @@ public Builder exceptionOfType(Class klass) { validators.add(new ShouldRetryValidator() { @Override - public void validate(IRetryPolicy.ShouldRetryResult shouldRetryResult) { + public void validate(ShouldRetryResult shouldRetryResult) { assertThat(shouldRetryResult.exception).isNotNull(); assertThat(shouldRetryResult.exception).isInstanceOf(klass); } @@ -72,7 +72,7 @@ public Builder withException(FailureValidator failureValidator) { validators.add(new ShouldRetryValidator() { @Override - public void validate(IRetryPolicy.ShouldRetryResult shouldRetryResult) { + public void validate(ShouldRetryResult shouldRetryResult) { assertThat(shouldRetryResult.exception).isNotNull(); failureValidator.validate(shouldRetryResult.exception); } @@ -84,7 +84,7 @@ public Builder withException(Exception exception) { validators.add(new ShouldRetryValidator() { @Override - public void validate(IRetryPolicy.ShouldRetryResult shouldRetryResult) { + public void validate(ShouldRetryResult shouldRetryResult) { assertThat(shouldRetryResult.exception).isNotNull(); assertThat(shouldRetryResult.exception).isEqualTo(exception); } @@ -96,7 +96,7 @@ public Builder shouldRetry(boolean value) { validators.add(new ShouldRetryValidator() { @Override - public void validate(IRetryPolicy.ShouldRetryResult shouldRetryResult) { + public void validate(ShouldRetryResult shouldRetryResult) { assertThat(shouldRetryResult.shouldRetry).isEqualTo(value); } }); @@ -108,7 +108,7 @@ public Builder backOfTime(Duration backOfTime) { validators.add(new ShouldRetryValidator() { @Override - public void validate(IRetryPolicy.ShouldRetryResult shouldRetryResult) { + public void validate(ShouldRetryResult shouldRetryResult) { assertThat(shouldRetryResult.backOffTime).isEqualTo(backOfTime); } }); diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java index 6683769944f3b..6019bbda468c3 100644 --- a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/directconnectivity/GoneAndRetryWithRetryPolicyTest.java @@ -8,7 +8,6 @@ import com.azure.cosmos.implementation.BadRequestException; import com.azure.cosmos.implementation.GoneException; import com.azure.cosmos.implementation.HttpConstants; -import com.azure.cosmos.implementation.IRetryPolicy; import com.azure.cosmos.implementation.InvalidPartitionException; import com.azure.cosmos.implementation.OperationType; import com.azure.cosmos.implementation.PartitionIsMigratingException; @@ -16,6 +15,7 @@ import com.azure.cosmos.implementation.RequestTimeoutException; import com.azure.cosmos.implementation.ResourceType; import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.ShouldRetryResult; import com.azure.cosmos.implementation.guava25.base.Supplier; import org.testng.annotations.Test; import reactor.core.publisher.Mono; @@ -41,9 +41,9 @@ public void shouldRetryReadWithGoneException() { OperationType.Read, ResourceType.Document); GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); - Mono singleShouldRetry = goneAndRetryWithRetryPolicy + Mono singleShouldRetry = goneAndRetryWithRetryPolicy .shouldRetry(new GoneException()); - IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isTrue(); assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(1); @@ -91,9 +91,9 @@ public void shouldRetryNotYetFlushedWriteWithGoneException() { return goneExceptionForNotYetFlushedRequest; }; - Mono singleShouldRetry = goneAndRetryWithRetryPolicy + Mono singleShouldRetry = goneAndRetryWithRetryPolicy .shouldRetry(goneExceptionForNotYetFlushedRequestSupplier.get()); - IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isTrue(); assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(1); @@ -141,9 +141,9 @@ public void shouldNotRetryFlushedWriteWithGoneExceptionButForceAddressRefresh() }; GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); - Mono singleShouldRetry = goneAndRetryWithRetryPolicy + Mono singleShouldRetry = goneAndRetryWithRetryPolicy .shouldRetry(goneExceptionForFlushedRequestSupplier.get()); - IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isFalse(); assertThat(shouldRetryResult.policyArg).isNotNull(); @@ -173,9 +173,9 @@ public void shouldRetryFlushedWriteWithGoneExceptionFromService() { return goneExceptionForFlushedRequest; }; - Mono singleShouldRetry = goneAndRetryWithRetryPolicy + Mono singleShouldRetry = goneAndRetryWithRetryPolicy .shouldRetry(goneExceptionForFlushedRequestSupplier.get()); - IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isTrue(); assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); assertThat(shouldRetryResult.policyArg.getValue3()).isEqualTo(1); @@ -215,9 +215,9 @@ public void shouldNotRetryRequestTimeoutException() { ResourceType.Document); GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); - Mono singleShouldRetry = goneAndRetryWithRetryPolicy + Mono singleShouldRetry = goneAndRetryWithRetryPolicy .shouldRetry(new RequestTimeoutException()); - IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isFalse(); assertThat(shouldRetryResult.policyArg).isNull(); @@ -248,9 +248,9 @@ public void shouldRetryWithPartitionIsMigratingException() { OperationType.Read, ResourceType.Document); GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); - Mono singleShouldRetry = goneAndRetryWithRetryPolicy + Mono singleShouldRetry = goneAndRetryWithRetryPolicy .shouldRetry(new PartitionIsMigratingException()); - IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isTrue(); assertThat(request.forceCollectionRoutingMapRefresh).isTrue(); assertThat(shouldRetryResult.policyArg.getValue0()).isTrue(); @@ -266,9 +266,9 @@ public void shouldRetryWithInvalidPartitionException() { OperationType.Read, ResourceType.Document); GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); - Mono singleShouldRetry = goneAndRetryWithRetryPolicy + Mono singleShouldRetry = goneAndRetryWithRetryPolicy .shouldRetry(new InvalidPartitionException()); - IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isTrue(); assertThat(request.requestContext.quorumSelectedLSN).isEqualTo(-1); assertThat(request.requestContext.resolvedPartitionKeyRange).isNull(); @@ -294,9 +294,9 @@ public void shouldRetryWithPartitionKeyRangeIsSplittingException() { OperationType.Read, ResourceType.Document); GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); - Mono singleShouldRetry = goneAndRetryWithRetryPolicy + Mono singleShouldRetry = goneAndRetryWithRetryPolicy .shouldRetry(new PartitionKeyRangeIsSplittingException()); - IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isTrue(); assertThat(request.forcePartitionKeyRangeRefresh).isTrue(); assertThat(request.requestContext.resolvedPartitionKeyRange).isNull(); @@ -315,9 +315,9 @@ public void shouldRetryWithGenericException() { OperationType.Read, ResourceType.Document); GoneAndRetryWithRetryPolicy goneAndRetryWithRetryPolicy = new GoneAndRetryWithRetryPolicy(request, 30); - Mono singleShouldRetry = goneAndRetryWithRetryPolicy + Mono singleShouldRetry = goneAndRetryWithRetryPolicy .shouldRetry(new BadRequestException()); - IRetryPolicy.ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); + ShouldRetryResult shouldRetryResult = singleShouldRetry.block(); assertThat(shouldRetryResult.shouldRetry).isFalse(); } diff --git a/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/feedranges/FeedRangeTest.java b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/feedranges/FeedRangeTest.java new file mode 100644 index 0000000000000..7758ab99db6d2 --- /dev/null +++ b/sdk/cosmos/azure-cosmos/src/test/java/com/azure/cosmos/implementation/feedranges/FeedRangeTest.java @@ -0,0 +1,505 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package com.azure.cosmos.implementation.feedranges; + +import com.azure.cosmos.implementation.HttpConstants; +import com.azure.cosmos.implementation.IRoutingMapProvider; +import com.azure.cosmos.implementation.MetadataDiagnosticsContext; +import com.azure.cosmos.implementation.OperationType; +import com.azure.cosmos.implementation.PartitionKeyRange; +import com.azure.cosmos.implementation.PartitionKeyRangeGoneException; +import com.azure.cosmos.implementation.RequestOptions; +import com.azure.cosmos.implementation.ResourceType; +import com.azure.cosmos.implementation.RxDocumentServiceRequest; +import com.azure.cosmos.implementation.Utils; +import com.azure.cosmos.implementation.apachecommons.collections.list.UnmodifiableList; +import com.azure.cosmos.implementation.routing.PartitionKeyInternal; +import com.azure.cosmos.implementation.routing.PartitionKeyInternalUtils; +import com.azure.cosmos.implementation.routing.Range; +import com.azure.cosmos.models.FeedRange; +import com.azure.cosmos.models.PartitionKeyDefinition; +import org.mockito.Mockito; +import org.testng.annotations.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.UUID; + +import static com.azure.cosmos.implementation.TestUtils.mockDiagnosticsClientContext; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.ThrowableAssert.catchThrowableOfType; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyMapOf; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.when; + +public class FeedRangeTest { + + @Test(groups = "unit") + public void feedRangeEPK_Range() { + Range range = new Range<>("AA", "BB", true, false); + FeedRangeEpkImpl feedRangeEPK = new FeedRangeEpkImpl(range); + assertThat(range).isEqualTo(feedRangeEPK.getRange()); + } + + @Test(groups = "unit") + public void feedRangeEPK_RequestVisitor() { + Range range = new Range<>("AA", "BB", true, false); + FeedRangeEpkImpl feedRange = new FeedRangeEpkImpl(range); + RxDocumentServiceRequest request = createMockRequest(true); + feedRange.accept( + FeedRangeRxDocumentServiceRequestPopulatorVisitorImpl.SINGLETON, request); + assertThat(request.getPropertiesOrThrow()).hasSize(2); + assertThat(request.getPropertiesOrThrow().get(EpkRequestPropertyConstants.START_EPK_STRING)) + .isNotNull() + .isEqualTo("AA"); + assertThat(request.getPropertiesOrThrow().get(EpkRequestPropertyConstants.END_EPK_STRING)) + .isNotNull() + .isEqualTo("BB"); + } + + @Test(groups = "unit") + public void feedRangeEPK_RequestVisitor_ThrowsWhenNoPropertiesAvailable() { + Range range = new Range<>("AA", "BB", true, false); + FeedRangeEpkImpl feedRange = new FeedRangeEpkImpl(range); + RxDocumentServiceRequest request = createMockRequest(false); + assertThat(catchThrowableOfType(() -> feedRange.accept( + FeedRangeRxDocumentServiceRequestPopulatorVisitorImpl.SINGLETON, request), + IllegalStateException.class)).isNotNull(); + } + + @Test(groups = "unit") + public void feedRangeEPK_getEffectiveRangesAsync() { + Range range = new Range<>("AA", "BB", true, false); + FeedRangeEpkImpl FeedRangeEpk = new FeedRangeEpkImpl(range); + + IRoutingMapProvider routingMapProviderMock = Mockito.mock(IRoutingMapProvider.class); + StepVerifier + .create( + FeedRangeEpk.getEffectiveRanges( + routingMapProviderMock, + null, + null)) + .recordWith(ArrayList::new) + .expectNextCount(1) + .consumeRecordedWith(r -> { + assertThat(r).hasSize(1); + assertThat(new ArrayList<>(r).get(0)) + .hasSize(1) + .contains(range); + }) + .verifyComplete(); + } + + @Test(groups = "unit") + public void feedRangeEPK_getPartitionKeyRangesAsync() { + Range range = new Range<>("AA", "BB", true, false); + String pkRangeId = UUID.randomUUID().toString(); + PartitionKeyRange partitionKeyRange = new PartitionKeyRange() + .setId(pkRangeId) + .setMinInclusive(range.getMin()) + .setMaxExclusive(range.getMax()); + + List pkRanges = new ArrayList<>(); + pkRanges.add(partitionKeyRange); + IRoutingMapProvider routingMapProviderMock = Mockito.mock(IRoutingMapProvider.class); + when( + routingMapProviderMock.tryGetOverlappingRangesAsync( + any(MetadataDiagnosticsContext.class), + anyString(), + eq(range), + anyBoolean(), + anyMapOf(String.class, Object.class))) + .thenReturn(Mono.just(Utils.ValueHolder.initialize(pkRanges))); + + FeedRangeEpkImpl feedRangeEpk = new FeedRangeEpkImpl(range); + StepVerifier + .create( + feedRangeEpk.getPartitionKeyRanges( + routingMapProviderMock, + null, + null)) + .recordWith(ArrayList::new) + .expectNextCount(1) + .consumeRecordedWith(r -> { + assertThat(r).hasSize(1); + UnmodifiableList response = new ArrayList<>(r).get(0); + assertThat(response) + .hasSize(1) + .contains(partitionKeyRange.getId()); + }) + .verifyComplete(); + + Mockito + .verify(routingMapProviderMock, Mockito.times(1)) + .tryGetOverlappingRangesAsync( + any(MetadataDiagnosticsContext.class), + anyString(), + eq(range), + eq(false), + anyMapOf(String.class, Object.class)); + } + + @Test(groups = "unit") + public void feedRangeEPK_toJsonFromJson() { + Range range = new Range<>("AA", "BB", true, false); + FeedRangeEpkImpl feedRange = new FeedRangeEpkImpl(range); + String representation = feedRange.toJson(); + assertThat(FeedRange.fromJsonString(representation)) + .isNotNull() + .isInstanceOf(FeedRangeEpkImpl.class); + FeedRangeEpkImpl feedRangeDeserialized = + (FeedRangeEpkImpl)FeedRange.fromJsonString(representation); + String representationAfterDeserialization = feedRangeDeserialized.toJson(); + assertThat(representationAfterDeserialization).isEqualTo(representation); + assertThat(feedRangeDeserialized.getRange()).isNotNull(); + assertThat(feedRangeDeserialized.getRange().getMin()) + .isNotNull() + .isEqualTo(range.getMin()); + assertThat(feedRangeDeserialized.getRange().getMax()) + .isNotNull() + .isEqualTo(range.getMax()); + } + + @Test(groups = "unit") + public void feedRangePKRangeId_PKRange() { + String pkRangeId = UUID.randomUUID().toString(); + FeedRangePartitionKeyRangeImpl feedRangePartitionKeyRange = + new FeedRangePartitionKeyRangeImpl(pkRangeId); + assertThat(pkRangeId).isEqualTo(feedRangePartitionKeyRange.getPartitionKeyRangeId()); + } + + @Test(groups = "unit") + public void feedRangePKRangeId_RequestVisitor() { + Range range = new Range<>("AA", "BB", true, false); + String pkRangeId = UUID.randomUUID().toString(); + PartitionKeyRange partitionKeyRange = new PartitionKeyRange() + .setId(pkRangeId) + .setMinInclusive(range.getMin()) + .setMaxExclusive(range.getMax()); + + FeedRangePartitionKeyRangeImpl feedRangPartitionKeyRange = + new FeedRangePartitionKeyRangeImpl(partitionKeyRange.getId()); + + RxDocumentServiceRequest request = createMockRequest(true); + feedRangPartitionKeyRange.accept( + FeedRangeRxDocumentServiceRequestPopulatorVisitorImpl.SINGLETON, request); + assertThat(request.getPartitionKeyRangeIdentity()).isNotNull(); + } + + @Test(groups = "unit") + public void feedRangePKRangeId_getEffectiveRangesAsync() { + String pkRangeId = UUID.randomUUID().toString(); + PartitionKeyRange partitionKeyRange = new PartitionKeyRange() + .setId(pkRangeId) + .setMinInclusive("AA") + .setMaxExclusive("BB"); + + FeedRangePartitionKeyRangeImpl feedRangePartitionKeyRange = + new FeedRangePartitionKeyRangeImpl(partitionKeyRange.getId()); + IRoutingMapProvider routingMapProviderMock = Mockito.mock(IRoutingMapProvider.class); + when( + routingMapProviderMock.tryGetPartitionKeyRangeByIdAsync( + any(MetadataDiagnosticsContext.class), + anyString(), + eq(partitionKeyRange.getId()), + anyBoolean(), + anyMapOf(String.class, Object.class))) + .thenReturn(Mono.just(Utils.ValueHolder.initialize(partitionKeyRange))); + + StepVerifier + .create( + feedRangePartitionKeyRange.getEffectiveRanges( + routingMapProviderMock, null, null)) + .recordWith(ArrayList::new) + .expectNextCount(1) + .consumeRecordedWith(r -> { + assertThat(r).hasSize(1); + UnmodifiableList> ranges = new ArrayList<>(r).get(0); + assertThat(ranges).hasSize(1); + Range range = ranges.get(0); + assertThat(range).isNotNull(); + assertThat(range.getMin()).isEqualTo(partitionKeyRange.getMinInclusive()); + assertThat(range.getMax()).isEqualTo(partitionKeyRange.getMaxExclusive()); + assertThat(range.getMin()).isEqualTo(partitionKeyRange.toRange().getMin()); + assertThat(range.getMax()).isEqualTo(partitionKeyRange.toRange().getMax()); + }) + .verifyComplete(); + + Mockito + .verify(routingMapProviderMock, Mockito.times(1)) + .tryGetPartitionKeyRangeByIdAsync( + null, + null, + partitionKeyRange.getId(), + false, + null); + } + + @Test(groups = "unit") + public void feedRangePKRangeId_getEffectiveRangesAsync_Null() { + String pkRangeId = UUID.randomUUID().toString(); + PartitionKeyRange partitionKeyRange = new PartitionKeyRange() + .setId(pkRangeId) + .setMinInclusive("AA") + .setMaxExclusive("BB"); + + FeedRangePartitionKeyRangeImpl feedRangePartitionKeyRange = + new FeedRangePartitionKeyRangeImpl(partitionKeyRange.getId()); + IRoutingMapProvider routingMapProviderMock = Mockito.mock(IRoutingMapProvider.class); + when( + routingMapProviderMock.tryGetPartitionKeyRangeByIdAsync( + any(MetadataDiagnosticsContext.class), + anyString(), + eq(partitionKeyRange.getId()), + anyBoolean(), + anyMapOf(String.class, Object.class))) + .thenReturn(Mono.just(Utils.ValueHolder.initialize(null))) + .thenReturn(Mono.just(Utils.ValueHolder.initialize(null))); + + StepVerifier + .create( + feedRangePartitionKeyRange.getEffectiveRanges( + routingMapProviderMock, null, null)) + .recordWith(ArrayList::new) + .expectErrorSatisfies((e) -> { + assertThat(e).isInstanceOf(PartitionKeyRangeGoneException.class); + PartitionKeyRangeGoneException pkGoneException = (PartitionKeyRangeGoneException)e; + assertThat(pkGoneException.getSubStatusCode()) + .isEqualTo(HttpConstants.SubStatusCodes.PARTITION_KEY_RANGE_GONE); + }) + .verify(); + } + + @Test(groups = "unit") + public void feedRangePKRangeId_getEffectiveRangesAsync_Refresh() { + String pkRangeId = UUID.randomUUID().toString(); + PartitionKeyRange partitionKeyRange = new PartitionKeyRange() + .setId(pkRangeId) + .setMinInclusive("AA") + .setMaxExclusive("BB"); + + FeedRangePartitionKeyRangeImpl feedRangePartitionKeyRange = + new FeedRangePartitionKeyRangeImpl(partitionKeyRange.getId()); + IRoutingMapProvider routingMapProviderMock = Mockito.mock(IRoutingMapProvider.class); + when( + routingMapProviderMock.tryGetPartitionKeyRangeByIdAsync( + any(MetadataDiagnosticsContext.class), + anyString(), + eq(partitionKeyRange.getId()), + anyBoolean(), + anyMapOf(String.class, Object.class))) + .thenReturn(Mono.just(Utils.ValueHolder.initialize(null))) + .thenReturn(Mono.just(Utils.ValueHolder.initialize(partitionKeyRange))); + + StepVerifier + .create( + feedRangePartitionKeyRange.getEffectiveRanges( + routingMapProviderMock, null, null)) + .recordWith(ArrayList::new) + .expectNextCount(1) + .consumeRecordedWith(r -> { + assertThat(r).hasSize(1); + UnmodifiableList> ranges = new ArrayList<>(r).get(0); + assertThat(ranges).hasSize(1); + Range range = ranges.get(0); + assertThat(range).isNotNull(); + assertThat(range.getMin()).isEqualTo(partitionKeyRange.getMinInclusive()); + assertThat(range.getMax()).isEqualTo(partitionKeyRange.getMaxExclusive()); + assertThat(range.getMin()).isEqualTo(partitionKeyRange.toRange().getMin()); + assertThat(range.getMax()).isEqualTo(partitionKeyRange.toRange().getMax()); + }) + .verifyComplete(); + + Mockito + .verify(routingMapProviderMock, Mockito.times(2)) + .tryGetPartitionKeyRangeByIdAsync( + any(MetadataDiagnosticsContext.class), + anyString(), + eq(partitionKeyRange.getId()), + anyBoolean(), + anyMapOf(String.class, Object.class)); + } + + @Test(groups = "unit") + public void feedRangePKRangeId_getPartitionKeyRangesAsync() { + Range range = new Range<>("AA", "BB", true, false); + String pkRangeId = UUID.randomUUID().toString(); + PartitionKeyRange partitionKeyRange = new PartitionKeyRange() + .setId(pkRangeId) + .setMinInclusive(range.getMin()) + .setMaxExclusive(range.getMax()); + + FeedRangePartitionKeyRangeImpl feedRangPartitionKeyRange = + new FeedRangePartitionKeyRangeImpl(partitionKeyRange.getId()); + + IRoutingMapProvider routingMapProviderMock = Mockito.mock(IRoutingMapProvider.class); + StepVerifier + .create( + feedRangPartitionKeyRange.getPartitionKeyRanges( + routingMapProviderMock, + null, + null)) + .recordWith(ArrayList::new) + .expectNextCount(1) + .consumeRecordedWith(r -> { + assertThat(r).hasSize(1); + UnmodifiableList response = new ArrayList<>(r).get(0); + assertThat(response) + .hasSize(1) + .contains(partitionKeyRange.getId()); + }) + .verifyComplete(); + } + + @Test(groups = "unit") + public void feedRangePKRangeId_toJsonFromJson() { + String pkRangeId = UUID.randomUUID().toString(); + FeedRangePartitionKeyRangeImpl feedRange = new FeedRangePartitionKeyRangeImpl(pkRangeId); + String representation = feedRange.toJson(); + assertThat(FeedRange.fromJsonString(representation)) + .isNotNull() + .isInstanceOf(FeedRangePartitionKeyRangeImpl.class); + FeedRangePartitionKeyRangeImpl feedRangeDeserialized = + (FeedRangePartitionKeyRangeImpl)FeedRange.fromJsonString(representation); + String representationAfterDeserialization = feedRangeDeserialized.toJson(); + assertThat(representationAfterDeserialization).isEqualTo(representation); + } + + @Test(groups = "unit") + public void feedRangePK_PK() { + PartitionKeyInternal partitionKey = PartitionKeyInternalUtils.createPartitionKeyInternal( + "Test"); + FeedRangePartitionKeyImpl feedRangePartitionKey = + new FeedRangePartitionKeyImpl(partitionKey); + assertThat(partitionKey).isEqualTo(feedRangePartitionKey.getPartitionKeyInternal()); + } + + @Test(groups = "unit") + public void feedRangePK_RequestVisitor() { + PartitionKeyInternal partitionKey = PartitionKeyInternalUtils.createPartitionKeyInternal( + "Test"); + FeedRangePartitionKeyImpl feedRangePartitionKey = + new FeedRangePartitionKeyImpl(partitionKey); + RxDocumentServiceRequest request = createMockRequest(true); + feedRangePartitionKey.accept( + FeedRangeRxDocumentServiceRequestPopulatorVisitorImpl.SINGLETON, request); + assertThat(request.getPartitionKeyInternal()).isNotNull(); + assertThat(request.getPartitionKeyInternal().toJson()) + .isNotNull() + .isEqualTo(request.getHeaders().get(HttpConstants.HttpHeaders.PARTITION_KEY)); + } + + @Test(groups = "unit") + public void feedRangePK_getEffectiveRangesAsync() { + PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition(); + partitionKeyDefinition.getPaths().add("/id"); + PartitionKeyInternal partitionKey = PartitionKeyInternalUtils.createPartitionKeyInternal( + "Test"); + FeedRangePartitionKeyImpl feedRangePartitionKey = + new FeedRangePartitionKeyImpl(partitionKey); + Range range = Range.getPointRange( + partitionKey.getEffectivePartitionKeyString(partitionKey, partitionKeyDefinition)); + + IRoutingMapProvider routingMapProviderMock = Mockito.mock(IRoutingMapProvider.class); + StepVerifier + .create( + feedRangePartitionKey.getEffectiveRanges( + routingMapProviderMock, + null, + partitionKeyDefinition)) + .recordWith(ArrayList::new) + .expectNextCount(1) + .consumeRecordedWith(r -> { + assertThat(r).hasSize(1); + assertThat(new ArrayList<>(r).get(0)) + .hasSize(1) + .contains(range); + }) + .verifyComplete(); + } + + @Test(groups = "unit") + public void feedRangePK_getPartitionKeyRangesAsync() { + PartitionKeyDefinition partitionKeyDefinition = new PartitionKeyDefinition(); + partitionKeyDefinition.getPaths().add("/id"); + PartitionKeyInternal partitionKey = PartitionKeyInternalUtils.createPartitionKeyInternal( + "Test"); + + Range range = new Range<>("AA", "BB", true, false); + String pkRangeId = UUID.randomUUID().toString(); + PartitionKeyRange partitionKeyRange = new PartitionKeyRange() + .setId(pkRangeId) + .setMinInclusive(range.getMin()) + .setMaxExclusive(range.getMax()); + List pkRanges = new ArrayList<>(); + pkRanges.add(partitionKeyRange); + + IRoutingMapProvider routingMapProviderMock = Mockito.mock(IRoutingMapProvider.class); + when( + routingMapProviderMock.tryGetOverlappingRangesAsync( + any(MetadataDiagnosticsContext.class), + anyString(), + any(), + anyBoolean(), + anyMapOf(String.class, Object.class))) + .thenReturn(Mono.just(Utils.ValueHolder.initialize(pkRanges))); + + FeedRangePartitionKeyImpl feedRangPartitionKey = + new FeedRangePartitionKeyImpl(partitionKey); + StepVerifier + .create( + feedRangPartitionKey.getPartitionKeyRanges( + routingMapProviderMock, + null, + partitionKeyDefinition)) + .recordWith(ArrayList::new) + .expectNextCount(1) + .consumeRecordedWith(r -> { + assertThat(r).hasSize(1); + UnmodifiableList response = new ArrayList<>(r).get(0); + assertThat(response) + .hasSize(1) + .contains(partitionKeyRange.getId()); + }) + .verifyComplete(); + } + + @Test(groups = "unit") + public void feedRangePK_toJsonFromJson() { + PartitionKeyInternal partitionKey = PartitionKeyInternalUtils.createPartitionKeyInternal( + "Test"); + FeedRangePartitionKeyImpl feedRange = new FeedRangePartitionKeyImpl(partitionKey); + String representation = feedRange.toJson(); + assertThat(FeedRange.fromJsonString(representation)) + .isNotNull() + .isInstanceOf(FeedRangePartitionKeyImpl.class); + FeedRangePartitionKeyImpl feedRangeDeserialized = + (FeedRangePartitionKeyImpl)FeedRange.fromJsonString(representation); + String representationAfterDeserialization = feedRangeDeserialized.toJson(); + assertThat(representationAfterDeserialization).isEqualTo(representation); + } + + private static RxDocumentServiceRequest createMockRequest(boolean hasProperties) { + RequestOptions requestOptions = new RequestOptions(); + + if (hasProperties) { + requestOptions.setProperties(new HashMap<>()); + } + + return RxDocumentServiceRequest.create( + mockDiagnosticsClientContext(), + OperationType.Read, + ResourceType.Document, + "/dbs/db/colls/col/docs/docId", + null, + requestOptions); + } + + +}