Skip to content

Commit

Permalink
Fixed resource address in CosmosException. (#17279)
Browse files Browse the repository at this point in the history
* Fixed resource address in CosmosException. Added new API to expose regions contacted on CosmosDiagnostics

* Fixed resource address in GATEWAY mode to have full physical address

* Setting physical resource address in tests
  • Loading branch information
kushagraThapar authored Nov 17, 2020
1 parent fd7f230 commit 7def120
Show file tree
Hide file tree
Showing 17 changed files with 77 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -387,14 +387,8 @@ public static CosmosException createCosmosException(int statusCode, String error
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static CosmosException createCosmosException(int statusCode, Exception innerException) {
return new CosmosException(statusCode, null, null, innerException);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
public static CosmosException createCosmosException(int statusCode, CosmosError cosmosErrorResource,
Map<String, String> responseHeaders) {
return new CosmosException(/* resourceAddress */ null, statusCode, cosmosErrorResource, responseHeaders);
public static CosmosException createCosmosException(String resourceAddress, int statusCode, Exception innerException) {
return new CosmosException(resourceAddress, statusCode, null, null, innerException);
}

@Warning(value = INTERNAL_USE_ONLY_WARNING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.time.Duration;
import java.util.Set;

/**
* This class represents response diagnostic statistics associated with a request to Azure Cosmos DB
Expand Down Expand Up @@ -77,6 +79,14 @@ public Duration getDuration() {
return this.clientSideRequestStatistics.getDuration();
}

/**
* Regions contacted for this request
* @return set of regions contacted for this request
*/
public Set<URI> getRegionsContacted() {
return this.clientSideRequestStatistics.getRegionsContacted();
}

FeedResponseDiagnostics getFeedResponseDiagnostics() {
return feedResponseDiagnostics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ CosmosException setDiagnostics(CosmosDiagnostics cosmosDiagnostics) {
@Override
public String toString() {
return getClass().getSimpleName() + "{" + "userAgent=" + USER_AGENT + ", error=" + cosmosError + ", resourceAddress='"
+ resourceAddress + "', requestUri='" + (requestUri != null ? requestUri.getURIAsString() : null) + "', statusCode=" + statusCode + ", message=" + getMessage()
+ resourceAddress + ", statusCode=" + statusCode + ", message=" + getMessage()
+ ", causeInfo=" + causeInfo() + ", responseHeaders=" + responseHeaders + ", requestHeaders="
+ filterSensitiveData(requestHeaders) + '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DocumentServiceRequestContext implements Cloneable {
public volatile StoreResult quorumSelectedStoreResponse;
public volatile PartitionKeyInternal effectivePartitionKey;
public volatile CosmosDiagnostics cosmosDiagnostics;
public volatile String resourcePhysicalAddress;
public RetryContext retryContext;

public DocumentServiceRequestContext() {
Expand Down Expand Up @@ -145,6 +146,7 @@ public DocumentServiceRequestContext clone() {
context.effectivePartitionKey = this.effectivePartitionKey;
context.performedBackgroundAddressRefresh = this.performedBackgroundAddressRefresh;
context.cosmosDiagnostics = this.cosmosDiagnostics;
context.resourcePhysicalAddress = this.resourcePhysicalAddress;
context.retryContext = new RetryContext(this.retryContext);

return context;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ public Mono<RxDocumentServiceResponse> performRequest(RxDocumentServiceRequest r
}

URI uri = getUri(request);
request.requestContext.resourcePhysicalAddress = uri.toString();

HttpHeaders httpHeaders = this.getHttpRequestHeaders(request.getHeaders());

Expand Down Expand Up @@ -300,7 +301,7 @@ private Mono<RxDocumentServiceResponse> toDocumentServiceResponse(Mono<HttpRespo
if (!(exception instanceof CosmosException)) {
// wrap in CosmosException
logger.error("Network failure", exception);
dce = BridgeInternal.createCosmosException(0, exception);
dce = BridgeInternal.createCosmosException(request.requestContext.resourcePhysicalAddress, 0, exception);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
} else {
dce = (CosmosException) exception;
Expand Down Expand Up @@ -342,7 +343,7 @@ private void validateOrThrow(RxDocumentServiceRequest request,
String.format("%s, StatusCode: %s", cosmosError.getMessage(), statusCodeString),
cosmosError.getPartitionedQueryExecutionInfo());

CosmosException dce = BridgeInternal.createCosmosException(statusCode, cosmosError, headers.toMap());
CosmosException dce = BridgeInternal.createCosmosException(request.requestContext.resourcePhysicalAddress, statusCode, cosmosError, headers.toMap());
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
throw dce;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ private void throwIfTargetChanged(RxDocumentServiceRequest request, PartitionKey
}

request.requestContext.resolvedPartitionKeyRange = null;
throw new InvalidPartitionException(RMResources.InvalidTarget, request.getResourceAddress());
throw new InvalidPartitionException(RMResources.InvalidTarget, request.requestContext.resourcePhysicalAddress);
}
}

Expand All @@ -176,7 +176,7 @@ private static void ensureRoutingMapPresent(
request.getPartitionKeyRangeIdentity().toHeader());
}
InvalidPartitionException invalidPartitionException = new InvalidPartitionException();
BridgeInternal.setResourceAddress(invalidPartitionException, request.getResourceAddress());
BridgeInternal.setResourceAddress(invalidPartitionException, request.requestContext.resourcePhysicalAddress);
throw invalidPartitionException;
}

Expand All @@ -188,7 +188,7 @@ private static void ensureRoutingMapPresent(
}
// Routing map not found although collection was resolved correctly.
NotFoundException e = new NotFoundException();
BridgeInternal.setResourceAddress(e, request.getResourceAddress());
BridgeInternal.setResourceAddress(e, request.requestContext.resourcePhysicalAddress);
throw e;
}
}
Expand Down Expand Up @@ -223,7 +223,7 @@ private Mono<Utils.ValueHolder<ResolutionResult>> tryResolveServerPartitionAsync
request.getResourceType(),
request.getOperationType(),
request.getResourceAddress());
return Mono.error(BridgeInternal.setResourceAddress(new InternalServerErrorException(RMResources.InternalServerError), request.getResourceAddress()));
return Mono.error(BridgeInternal.setResourceAddress(new InternalServerErrorException(RMResources.InternalServerError), request.requestContext.resourcePhysicalAddress));
}

PartitionKeyRange range;
Expand Down Expand Up @@ -293,7 +293,7 @@ private PartitionKeyRange tryResolveSinglePartitionCollection(

logger.debug("tryResolveSinglePartitionCollection: collectionCacheIsUptoDate = {}", collectionCacheIsUptoDate);
if (collectionCacheIsUptoDate) {
throw BridgeInternal.setResourceAddress(new BadRequestException(RMResources.MissingPartitionKeyValue), request.getResourceAddress());
throw BridgeInternal.setResourceAddress(new BadRequestException(RMResources.MissingPartitionKeyValue), request.requestContext.resourcePhysicalAddress);
} else {
return null;
}
Expand All @@ -314,7 +314,7 @@ private Mono<ResolutionResult> resolveMasterResourceAddress(RxDocumentServiceReq

// return Observable.getError()
NotFoundException e = new NotFoundException();
BridgeInternal.setResourceAddress(e, request.getResourceAddress());
BridgeInternal.setResourceAddress(e, request.requestContext.resourcePhysicalAddress);
return Mono.error(e);
}

Expand Down Expand Up @@ -507,7 +507,7 @@ private Mono<ResolutionResult> resolveAddressesAndIdentityAsync(
// The only reason we will get here is if collection doesn't exist.
// Case when partition-key-range doesn't exist is handled in the corresponding method.

return Mono.error(BridgeInternal.setResourceAddress(new NotFoundException(), request.getResourceAddress()));
return Mono.error(BridgeInternal.setResourceAddress(new NotFoundException(), request.requestContext.resourcePhysicalAddress));
}

return Mono.just(funcResolutionResult.v);
Expand Down Expand Up @@ -568,7 +568,7 @@ private ResolutionResult handleRangeAddressResolutionFailure(
RMResources.PartitionKeyRangeNotFound,
request.getPartitionKeyRangeIdentity().getPartitionKeyRangeId(),
request.getPartitionKeyRangeIdentity().getCollectionRid());
throw BridgeInternal.setResourceAddress(new PartitionKeyRangeGoneException(errorMessage), request.getResourceAddress());
throw BridgeInternal.setResourceAddress(new PartitionKeyRangeGoneException(errorMessage), request.requestContext.resourcePhysicalAddress);
}
logger.debug("handleRangeAddressResolutionFailure returns null");
return null;
Expand Down Expand Up @@ -649,7 +649,7 @@ private PartitionKeyRange tryResolveServerPartitionByPartitionKey(
} catch (Exception ex) {
throw BridgeInternal.setResourceAddress(new BadRequestException(
String.format(RMResources.InvalidPartitionKey, partitionKeyString),
ex), request.getResourceAddress());
ex), request.requestContext.resourcePhysicalAddress);
}
}
}
Expand All @@ -670,7 +670,8 @@ private PartitionKeyRange tryResolveServerPartitionByPartitionKey(
}

if (collectionCacheUptoDate) {
BadRequestException badRequestException = BridgeInternal.setResourceAddress(new BadRequestException(RMResources.PartitionKeyMismatch), request.getResourceAddress());
BadRequestException badRequestException = BridgeInternal.setResourceAddress(new BadRequestException(RMResources.PartitionKeyMismatch),
request.requestContext.resourcePhysicalAddress);
badRequestException.getResponseHeaders().put(WFConstants.BackendHeaders.SUB_STATUS, Integer.toString(HttpConstants.SubStatusCodes.PARTITION_KEY_MISMATCH));

throw badRequestException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ public Mono<List<Address>> getServerAddressesViaGatewayAsync(
Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds())));
}

Mono<RxDocumentServiceResponse> dsrObs = HttpClientUtils.parseResponseAsync(clientContext, httpResponseMono, httpRequest);
Mono<RxDocumentServiceResponse> dsrObs = HttpClientUtils.parseResponseAsync(request, clientContext, httpResponseMono, httpRequest);
return dsrObs.map(
dsr -> {
MetadataDiagnosticsContext metadataDiagnosticsContext =
Expand Down Expand Up @@ -386,7 +386,7 @@ public Mono<List<Address>> getServerAddressesViaGatewayAsync(
if (!(exception instanceof CosmosException)) {
// wrap in CosmosException
logger.error("Network failure", exception);
dce = BridgeInternal.createCosmosException(0, exception);
dce = BridgeInternal.createCosmosException(request.requestContext.resourcePhysicalAddress, 0, exception);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
} else {
dce = (CosmosException) exception;
Expand Down Expand Up @@ -588,7 +588,7 @@ public Mono<List<Address>> getMasterAddressesViaGatewayAsync(
Duration.ofSeconds(Configs.getAddressRefreshResponseTimeoutInSeconds())));
}

Mono<RxDocumentServiceResponse> dsrObs = HttpClientUtils.parseResponseAsync(this.clientContext, httpResponseMono, httpRequest);
Mono<RxDocumentServiceResponse> dsrObs = HttpClientUtils.parseResponseAsync(request, this.clientContext, httpResponseMono, httpRequest);

return dsrObs.map(
dsr -> {
Expand Down Expand Up @@ -618,7 +618,7 @@ public Mono<List<Address>> getMasterAddressesViaGatewayAsync(
if (!(exception instanceof CosmosException)) {
// wrap in CosmosException
logger.error("Network failure", exception);
dce = BridgeInternal.createCosmosException(0, exception);
dce = BridgeInternal.createCosmosException(request.requestContext.resourcePhysicalAddress, 0, exception);
BridgeInternal.setRequestHeaders(dce, request.getHeaders());
} else {
dce = (CosmosException) exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.azure.cosmos.implementation.CosmosError;
import com.azure.cosmos.implementation.DiagnosticsClientContext;
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.RxDocumentServiceRequest;
import com.azure.cosmos.implementation.RxDocumentServiceResponse;
import com.azure.cosmos.implementation.apachecommons.lang.StringUtils;
import com.azure.cosmos.implementation.http.HttpRequest;
Expand All @@ -16,7 +17,10 @@

public class HttpClientUtils {

static Mono<RxDocumentServiceResponse> parseResponseAsync(DiagnosticsClientContext diagnosticsClientContext, Mono<HttpResponse> httpResponse, HttpRequest httpRequest) {
static Mono<RxDocumentServiceResponse> parseResponseAsync(RxDocumentServiceRequest request,
DiagnosticsClientContext diagnosticsClientContext,
Mono<HttpResponse> httpResponse,
HttpRequest httpRequest) {
return httpResponse.flatMap(response -> {
if (response.statusCode() < HttpConstants.StatusCodes.MINIMUM_STATUSCODE_AS_ERROR_GATEWAY) {

Expand All @@ -27,20 +31,19 @@ static Mono<RxDocumentServiceResponse> parseResponseAsync(DiagnosticsClientConte

} else {
return HttpClientUtils
.createDocumentClientException(response).flatMap(Mono::error);
.createDocumentClientException(request, response).flatMap(Mono::error);
}
});
}

private static Mono<CosmosException> createDocumentClientException(HttpResponse httpResponse) {
private static Mono<CosmosException> createDocumentClientException(RxDocumentServiceRequest request, HttpResponse httpResponse) {
Mono<String> readStream = httpResponse.bodyAsString().switchIfEmpty(Mono.just(StringUtils.EMPTY));

return readStream.map(body -> {
CosmosError cosmosError = new CosmosError(body);

// TODO: we should set resource address in the Document Client Exception
return BridgeInternal.createCosmosException(httpResponse.statusCode(), cosmosError,
httpResponse.headers().toMap());
return BridgeInternal.createCosmosException(request.requestContext.resourcePhysicalAddress, httpResponse.statusCode(),
cosmosError, httpResponse.headers().toMap());
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ public Mono<StoreResponse> invokeStoreAsync(
null);
});

return httpResponseMono.flatMap(rsp -> processHttpResponse(request.getResourceAddress(),
return httpResponseMono.flatMap(rsp -> processHttpResponse(request.requestContext.resourcePhysicalAddress,
httpRequest, activityId, rsp, physicalAddress));

} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,9 @@ private void messageReceived(final ChannelHandlerContext context, final RntbdRes

// ..Create CosmosException based on status and sub-status codes

final String resourceAddress = requestRecord.args().physicalAddress() != null ?
requestRecord.args().physicalAddress().toString() : null;

switch (status.code()) {

case StatusCodes.BADREQUEST:
Expand Down Expand Up @@ -825,9 +828,6 @@ private void messageReceived(final ChannelHandlerContext context, final RntbdRes

case StatusCodes.REQUEST_TIMEOUT:
Exception inner = new RequestTimeoutException(error, lsn, partitionKeyRangeId, responseHeaders);
String resourceAddress = requestRecord.args().physicalAddress() != null ?
requestRecord.args().physicalAddress().toString() : null;

cause = new GoneException(resourceAddress, error, lsn, partitionKeyRangeId, responseHeaders, inner);
break;

Expand All @@ -848,9 +848,10 @@ private void messageReceived(final ChannelHandlerContext context, final RntbdRes
break;

default:
cause = BridgeInternal.createCosmosException(status.code(), error, responseHeaders);
cause = BridgeInternal.createCosmosException(resourceAddress, status.code(), error, responseHeaders);
break;
}
BridgeInternal.setResourceAddress(cause, resourceAddress);

requestRecord.completeExceptionally(cause);
}
Expand Down
Loading

0 comments on commit 7def120

Please sign in to comment.