Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use async cosmos library and upgrade to 2.6.3 #1321

Merged
merged 7 commits into from
Dec 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,17 @@
import com.github.ambry.cloud.CloudStorageException;
import com.github.ambry.commons.BlobId;
import com.github.ambry.config.CloudConfig;
import com.microsoft.azure.documentdb.ConnectionMode;
import com.microsoft.azure.documentdb.ConnectionPolicy;
import com.microsoft.azure.documentdb.ConsistencyLevel;
import com.microsoft.azure.documentdb.Document;
import com.microsoft.azure.documentdb.DocumentClient;
import com.microsoft.azure.documentdb.DocumentClientException;
import com.microsoft.azure.documentdb.ResourceResponse;
import com.microsoft.azure.documentdb.SqlParameter;
import com.microsoft.azure.documentdb.SqlParameterCollection;
import com.microsoft.azure.documentdb.SqlQuerySpec;
import com.microsoft.azure.cosmosdb.ConnectionMode;
import com.microsoft.azure.cosmosdb.ConnectionPolicy;
import com.microsoft.azure.cosmosdb.ConsistencyLevel;
import com.microsoft.azure.cosmosdb.Document;
import com.microsoft.azure.cosmosdb.DocumentClientException;
import com.microsoft.azure.cosmosdb.ResourceResponse;
import com.microsoft.azure.cosmosdb.RetryOptions;
import com.microsoft.azure.cosmosdb.SqlParameter;
import com.microsoft.azure.cosmosdb.SqlParameterCollection;
import com.microsoft.azure.cosmosdb.SqlQuerySpec;
import com.microsoft.azure.cosmosdb.rx.AsyncDocumentClient;
import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.OperationContext;
Expand All @@ -51,7 +52,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
Expand All @@ -61,7 +61,6 @@
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.http.HttpHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -77,22 +76,22 @@ class AzureCloudDestination implements CloudDestination {
private static final String TIME_SINCE_PARAM = "@timesince";
private static final String BATCH_ID_QUERY_TEMPLATE = "SELECT * FROM c WHERE c.id IN (%s)";
static final int ID_QUERY_BATCH_SIZE = 1000;
static final String DEAD_BLOBS_QUERY_TEMPLATE =
private static final String DEAD_BLOBS_QUERY_TEMPLATE =
"SELECT TOP " + LIMIT_PARAM + " * FROM c WHERE (c." + CloudBlobMetadata.FIELD_DELETION_TIME + " BETWEEN 1 AND "
+ THRESHOLD_PARAM + ")" + " OR (c." + CloudBlobMetadata.FIELD_EXPIRATION_TIME + " BETWEEN 1 AND "
+ THRESHOLD_PARAM + ")" + " ORDER BY c." + CloudBlobMetadata.FIELD_UPLOAD_TIME + " ASC";
// Note: ideally would like to order by uploadTime and id, but Cosmos doesn't allow without composite index.
// It is unlikely (but not impossible) for two blobs in same partition to have the same uploadTime (would have to
// be multiple VCR's uploading same partition). We track the lastBlobId in the CloudFindToken and skip it if
// is returned in successive queries.
static final String ENTRIES_SINCE_QUERY_TEMPLATE =
private static final String ENTRIES_SINCE_QUERY_TEMPLATE =
"SELECT TOP " + LIMIT_PARAM + " * FROM c WHERE c." + CosmosDataAccessor.COSMOS_LAST_UPDATED_COLUMN + " >= "
+ TIME_SINCE_PARAM + " ORDER BY c." + CosmosDataAccessor.COSMOS_LAST_UPDATED_COLUMN + " ASC";
private static final String SEPARATOR = "-";
private static final int findSinceQueryLimit = 1000;
private final CloudStorageAccount azureAccount;
private final CloudBlobClient azureBlobClient;
private final DocumentClient documentClient;
private final AsyncDocumentClient asyncDocumentClient;
private final CosmosDataAccessor cosmosDataAccessor;
private final OperationContext blobOpContext = new OperationContext();
private final AzureMetrics azureMetrics;
Expand Down Expand Up @@ -123,20 +122,25 @@ class AzureCloudDestination implements CloudDestination {
OperationContext.setDefaultProxy(
new Proxy(Proxy.Type.HTTP, new InetSocketAddress(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort)));
}
// Set up CosmosDB connection, including any proxy setting
// Set up CosmosDB connection, including retry options and any proxy setting
ConnectionPolicy connectionPolicy = new ConnectionPolicy();
RetryOptions retryOptions = new RetryOptions();
retryOptions.setMaxRetryAttemptsOnThrottledRequests(azureCloudConfig.cosmosMaxRetries);
connectionPolicy.setRetryOptions(retryOptions);
if (azureCloudConfig.cosmosDirectHttps) {
logger.info("Using CosmosDB DirectHttps connection mode");
connectionPolicy.setConnectionMode(ConnectionMode.DirectHttps);
connectionPolicy.setConnectionMode(ConnectionMode.Direct);
}
if (cloudConfig.vcrProxyHost != null) {
connectionPolicy.setProxy(new HttpHost(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort));
connectionPolicy.setHandleServiceUnavailableFromProxy(true);
connectionPolicy.setProxy(cloudConfig.vcrProxyHost, cloudConfig.vcrProxyPort);
}
// TODO: test option to set connectionPolicy.setEnableEndpointDiscovery(false);
documentClient = new DocumentClient(azureCloudConfig.cosmosEndpoint, azureCloudConfig.cosmosKey, connectionPolicy,
ConsistencyLevel.Session);
cosmosDataAccessor = new CosmosDataAccessor(documentClient, azureCloudConfig, azureMetrics);
asyncDocumentClient = new AsyncDocumentClient.Builder().withServiceEndpoint(azureCloudConfig.cosmosEndpoint)
.withMasterKeyOrResourceToken(azureCloudConfig.cosmosKey)
.withConnectionPolicy(connectionPolicy)
.withConsistencyLevel(ConsistencyLevel.Session)
.build();
cosmosDataAccessor = new CosmosDataAccessor(asyncDocumentClient, azureCloudConfig, azureMetrics);
this.retentionPeriodMs = TimeUnit.DAYS.toMillis(cloudConfig.cloudDeletedBlobRetentionDays);
this.deadBlobsQueryLimit = cloudConfig.cloudBlobCompactionQueryLimit;
logger.info("Created Azure destination");
Expand All @@ -145,26 +149,23 @@ class AzureCloudDestination implements CloudDestination {
/**
* Test constructor.
* @param azureAccount the {@link CloudStorageAccount} to use.
* @param documentClient the {@link DocumentClient} to use.
* @param asyncDocumentClient the {@link AsyncDocumentClient} to use.
* @param cosmosCollectionLink the CosmosDB collection link to use.
* @param clusterName the name of the Ambry cluster.
* @param azureMetrics the {@link AzureMetrics} to use.
* @throws CloudStorageException if the destination could not be created.
*/
AzureCloudDestination(CloudStorageAccount azureAccount, DocumentClient documentClient, String cosmosCollectionLink,
String clusterName, AzureMetrics azureMetrics) {
AzureCloudDestination(CloudStorageAccount azureAccount, AsyncDocumentClient asyncDocumentClient,
String cosmosCollectionLink, String clusterName, AzureMetrics azureMetrics) {
this.azureAccount = azureAccount;
this.documentClient = documentClient;
this.asyncDocumentClient = asyncDocumentClient;
this.azureMetrics = azureMetrics;
this.clusterName = clusterName;
this.retentionPeriodMs = TimeUnit.DAYS.toMillis(CloudConfig.DEFAULT_RETENTION_DAYS);
this.deadBlobsQueryLimit = CloudConfig.DEFAULT_COMPACTION_QUERY_LIMIT;

// Create a blob client to interact with Blob storage
azureBlobClient = azureAccount.createCloudBlobClient();
cosmosDataAccessor =
new CosmosDataAccessor(documentClient, cosmosCollectionLink, AzureCloudConfig.DEFAULT_COSMOS_MAX_RETRIES,
azureMetrics);
cosmosDataAccessor = new CosmosDataAccessor(asyncDocumentClient, cosmosCollectionLink, azureMetrics);
}

/**
Expand Down Expand Up @@ -309,15 +310,14 @@ public Map<String, CloudBlobMetadata> getBlobMetadata(List<BlobId> blobIds) thro
metadataList = getBlobMetadataChunked(blobIds);
}

return metadataList.stream().collect(Collectors.toMap(m -> m.getId(), Function.identity()));
return metadataList.stream().collect(Collectors.toMap(CloudBlobMetadata::getId, Function.identity()));
}

private List<CloudBlobMetadata> getBlobMetadataChunked(List<BlobId> blobIds) throws CloudStorageException {
if (blobIds.isEmpty() || blobIds.size() > ID_QUERY_BATCH_SIZE) {
throw new IllegalArgumentException("Invalid input list size: " + blobIds.size());
}
String quotedBlobIds =
String.join(",", blobIds.stream().map(s -> '"' + s.getID() + '"').collect(Collectors.toList()));
String quotedBlobIds = blobIds.stream().map(s -> '"' + s.getID() + '"').collect(Collectors.joining(","));
String query = String.format(BATCH_ID_QUERY_TEMPLATE, quotedBlobIds);
String partitionPath = blobIds.get(0).getPartition().toPathString();
try {
Expand Down Expand Up @@ -363,6 +363,14 @@ public List<CloudBlobMetadata> findEntriesSince(String partitionPath, CloudFindT
}
}

/**
* Getter for {@link AsyncDocumentClient} object.
* @return {@link AsyncDocumentClient} object.
*/
AsyncDocumentClient getAsyncDocumentClient() {
return asyncDocumentClient;
}

/**
* Filter out {@link CloudBlobMetadata} objects from lastUpdateTime ordered {@code cloudBlobMetadataList} whose
* lastUpdateTime is {@code lastUpdateTime} and id is in {@code lastReadBlobIds}.
Expand All @@ -374,9 +382,9 @@ private void filterOutLastReadBlobs(List<CloudBlobMetadata> cloudBlobMetadataLis
long lastUpdateTime) {
ListIterator<CloudBlobMetadata> iterator = cloudBlobMetadataList.listIterator();
int numRemovedBlobs = 0;
while(iterator.hasNext()) {
while (iterator.hasNext()) {
CloudBlobMetadata cloudBlobMetadata = iterator.next();
if(numRemovedBlobs == lastReadBlobIds.size() || cloudBlobMetadata.getLastUpdateTime() > lastUpdateTime) {
if (numRemovedBlobs == lastReadBlobIds.size() || cloudBlobMetadata.getLastUpdateTime() > lastUpdateTime) {
break;
}
if (lastReadBlobIds.contains(cloudBlobMetadata.getId())) {
Expand All @@ -392,7 +400,7 @@ private void filterOutLastReadBlobs(List<CloudBlobMetadata> cloudBlobMetadataLis
* @param fieldName The metadata field to modify.
* @param value The new value.
* @return {@code true} if the udpate succeeded, {@code false} if the metadata record was not found.
* @throws DocumentClientException
* @throws CloudStorageException if the update fails.
*/
private boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value) throws CloudStorageException {
Objects.requireNonNull(blobId, "BlobId cannot be null");
Expand Down Expand Up @@ -424,7 +432,6 @@ private boolean updateBlobMetadata(BlobId blobId, String fieldName, Object value
}

ResourceResponse<Document> response = cosmosDataAccessor.readMetadata(blobId);
//CloudBlobMetadata blobMetadata = response.getResource().toObject(CloudBlobMetadata.class);
Document doc = response.getResource();
if (doc == null) {
logger.warn("Blob metadata record not found: {}", blobId.getID());
Expand Down Expand Up @@ -510,7 +517,8 @@ public boolean doesBlobExist(BlobId blobId) throws CloudStorageException {
* @param blobId the {@link BlobId} that needs a container.
* @param autoCreate flag indicating whether to create the container if it does not exist.
* @return the created {@link CloudBlobContainer}.
* @throws Exception
* @throws URISyntaxException
* @throws StorageException
*/
private CloudBlobContainer getContainer(BlobId blobId, boolean autoCreate)
throws URISyntaxException, StorageException {
Expand Down Expand Up @@ -594,7 +602,7 @@ private CloudBlockBlob getAzureBlobReference(BlobId blobId, boolean autoCreateCo
* @return the name of the Azure storage container where blobs in the specified partition are stored.
* @param partitionPath the lexical path of the Ambry partition.
*/
String getAzureContainerName(String partitionPath) {
private String getAzureContainerName(String partitionPath) {
// Include Ambry cluster name in case the same storage account is used to backup multiple clusters.
// Azure requires container names to be all lower case
String rawContainerName = clusterName + SEPARATOR + partitionPath;
Expand All @@ -612,14 +620,6 @@ String getAzureBlobName(BlobId blobId) {
return blobIdStr.substring(blobIdStr.length() - 4) + SEPARATOR + blobIdStr;
}

/**
* Visible for test.
* @return the CosmosDB DocumentClient
*/
DocumentClient getDocumentClient() {
return documentClient;
}

/**
* Visible for test.
* @return the {@link CosmosDataAccessor}
Expand All @@ -628,14 +628,6 @@ CosmosDataAccessor getCosmosDataAccessor() {
return cosmosDataAccessor;
}

/**
* Visible for test.
* @return the blob storage operation context.
*/
OperationContext getBlobOpContext() {
return blobOpContext;
}

/**
* Update the appropriate error metrics corresponding to the thrown exception.
* @param e the exception thrown.
Expand Down
Loading