Skip to content

Commit

Permalink
Use Azure Bulk Deletes in Azure Repository (elastic#53919)
Browse files Browse the repository at this point in the history
Now that we upgraded the Azure SDK to 8.6.2 in elastic#53865 we can make use of
bulk deletes.
  • Loading branch information
original-brownbear committed Mar 23, 2020
1 parent 69a3515 commit ace1a1b
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,5 @@ testClusters.integTest {
// in a hacky way to change the protocol and endpoint. We must fix that.
setting 'azure.client.integration_test.endpoint_suffix',
{ "ignored;DefaultEndpointsProtocol=http;BlobEndpoint=${-> azureAddress()}" }, IGNORE_VALUE
String firstPartOfSeed = BuildParams.testSeed.tokenize(':').get(0)
setting 'thread_pool.repository_azure.max', (Math.abs(Long.parseUnsignedLong(firstPartOfSeed, 16) % 10) + 1).toString(), System.getProperty('ignore.tests.seed') == null ? DEFAULT : IGNORE_VALUE
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,12 @@
import com.microsoft.azure.storage.StorageException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRunnable;
import org.elasticsearch.action.support.GroupedActionListener;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.blobstore.BlobContainer;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.common.blobstore.support.AbstractBlobContainer;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -42,20 +37,18 @@
import java.nio.file.NoSuchFileException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

public class AzureBlobContainer extends AbstractBlobContainer {

private final Logger logger = LogManager.getLogger(AzureBlobContainer.class);
private final AzureBlobStore blobStore;
private final ThreadPool threadPool;
private final String keyPath;

AzureBlobContainer(BlobPath path, AzureBlobStore blobStore, ThreadPool threadPool) {
AzureBlobContainer(BlobPath path, AzureBlobStore blobStore) {
super(path);
this.blobStore = blobStore;
this.keyPath = path.buildAsString();
this.threadPool = threadPool;
}

private boolean blobExists(String blobName) {
Expand Down Expand Up @@ -112,41 +105,17 @@ public void writeBlobAtomic(String blobName, InputStream inputStream, long blobS
@Override
public DeleteResult delete() throws IOException {
try {
return blobStore.deleteBlobDirectory(keyPath, threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME));
return blobStore.deleteBlobDirectory(keyPath);
} catch (URISyntaxException | StorageException e) {
throw new IOException(e);
}
}

@Override
public void deleteBlobsIgnoringIfNotExists(List<String> blobNames) throws IOException {
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
if (blobNames.isEmpty()) {
result.onResponse(null);
} else {
final GroupedActionListener<Void> listener =
new GroupedActionListener<>(ActionListener.map(result, v -> null), blobNames.size());
final ExecutorService executor = threadPool.executor(AzureRepositoryPlugin.REPOSITORY_THREAD_POOL_NAME);
// Executing deletes in parallel since Azure SDK 8 is using blocking IO while Azure does not provide a bulk delete API endpoint
// TODO: Upgrade to newer non-blocking Azure SDK 11 and execute delete requests in parallel that way.
for (String blobName : blobNames) {
executor.execute(ActionRunnable.run(listener, () -> {
logger.trace("deleteBlob({})", blobName);
try {
blobStore.deleteBlob(buildKey(blobName));
} catch (StorageException e) {
if (e.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
throw new IOException(e);
}
} catch (URISyntaxException e) {
throw new IOException(e);
}
}));
}
}
try {
result.actionGet();
} catch (Exception e) {
blobStore.deleteBlobsIgnoringIfNotExists(blobNames.stream().map(this::buildKey).collect(Collectors.toList()));
} catch (URISyntaxException | StorageException e) {
throw new IOException("Exception during bulk delete", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,13 @@
import org.elasticsearch.common.blobstore.BlobStore;
import org.elasticsearch.common.blobstore.DeleteResult;
import org.elasticsearch.repositories.azure.AzureRepository.Repository;
import org.elasticsearch.threadpool.ThreadPool;

import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand All @@ -44,17 +43,15 @@
public class AzureBlobStore implements BlobStore {

private final AzureStorageService service;
private final ThreadPool threadPool;

private final String clientName;
private final String container;
private final LocationMode locationMode;

public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service, ThreadPool threadPool) {
public AzureBlobStore(RepositoryMetaData metadata, AzureStorageService service) {
this.container = Repository.CONTAINER_SETTING.get(metadata.settings());
this.clientName = Repository.CLIENT_NAME.get(metadata.settings());
this.service = service;
this.threadPool = threadPool;
// locationMode is set per repository, not per client
this.locationMode = Repository.LOCATION_MODE_SETTING.get(metadata.settings());
final Map<String, AzureStorageSettings> prevSettings = this.service.refreshAndClearCache(emptyMap());
Expand All @@ -80,7 +77,7 @@ public LocationMode getLocationMode() {

@Override
public BlobContainer blobContainer(BlobPath path) {
return new AzureBlobContainer(path, this, threadPool);
return new AzureBlobContainer(path, this);
}

@Override
Expand All @@ -91,13 +88,12 @@ public boolean blobExists(String blob) throws URISyntaxException, StorageExcepti
return service.blobExists(clientName, container, blob);
}

public void deleteBlob(String blob) throws URISyntaxException, StorageException, IOException {
service.deleteBlob(clientName, container, blob);
public void deleteBlobsIgnoringIfNotExists(Collection<String> blobs) throws URISyntaxException, StorageException {
service.deleteBlobsIgnoringIfNotExists(clientName, container, blobs);
}

public DeleteResult deleteBlobDirectory(String path, Executor executor)
throws URISyntaxException, StorageException, IOException {
return service.deleteBlobDirectory(clientName, container, path, executor);
public DeleteResult deleteBlobDirectory(String path) throws URISyntaxException, StorageException, IOException {
return service.deleteBlobDirectory(clientName, container, path);
}

public InputStream getInputStream(String blob) throws URISyntaxException, StorageException, IOException {
Expand All @@ -111,7 +107,7 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String keyPath, String prefix

public Map<String, BlobContainer> children(BlobPath path) throws URISyntaxException, StorageException, IOException {
return Collections.unmodifiableMap(service.children(clientName, container, path).stream().collect(
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this, threadPool))));
Collectors.toMap(Function.identity(), name -> new AzureBlobContainer(path.add(name), this))));
}

public void writeBlob(String blobName, InputStream inputStream, long blobSize, boolean failIfAlreadyExists)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected BlobStore getBlobStore() {

@Override
protected AzureBlobStore createBlobStore() {
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService, threadPool);
final AzureBlobStore blobStore = new AzureBlobStore(metadata, storageService);

logger.debug(() -> new ParameterizedMessage(
"using container [{}], chunk_size [{}], compress [{}], base_path [{}]",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.env.Environment;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.ReloadablePlugin;
import org.elasticsearch.plugins.RepositoryPlugin;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.threadpool.ExecutorBuilder;
import org.elasticsearch.threadpool.ScalingExecutorBuilder;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
Expand All @@ -43,8 +39,6 @@
*/
public class AzureRepositoryPlugin extends Plugin implements RepositoryPlugin, ReloadablePlugin {

public static final String REPOSITORY_THREAD_POOL_NAME = "repository_azure";

// protected for testing
final AzureStorageService azureStoreService;

Expand Down Expand Up @@ -80,15 +74,6 @@ public List<Setting<?>> getSettings() {
);
}

@Override
public List<ExecutorBuilder<?>> getExecutorBuilders(Settings settings) {
return Collections.singletonList(executorBuilder());
}

public static ExecutorBuilder<?> executorBuilder() {
return new ScalingExecutorBuilder(REPOSITORY_THREAD_POOL_NAME, 0, 32, TimeValue.timeValueSeconds(30L));
}

@Override
public void reload(Settings settings) {
// secure settings should be readable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
package org.elasticsearch.repositories.azure;

import com.microsoft.azure.storage.AccessCondition;
import com.microsoft.azure.storage.BatchException;
import com.microsoft.azure.storage.CloudStorageAccount;
import com.microsoft.azure.storage.Constants;
import com.microsoft.azure.storage.OperationContext;
import com.microsoft.azure.storage.RetryExponentialRetry;
import com.microsoft.azure.storage.RetryPolicy;
import com.microsoft.azure.storage.RetryPolicyFactory;
import com.microsoft.azure.storage.StorageErrorCodeStrings;
import com.microsoft.azure.storage.StorageException;
import com.microsoft.azure.storage.blob.BlobDeleteBatchOperation;
import com.microsoft.azure.storage.blob.BlobInputStream;
import com.microsoft.azure.storage.blob.BlobListingDetails;
import com.microsoft.azure.storage.blob.BlobProperties;
Expand All @@ -42,7 +45,6 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.common.blobstore.BlobMetaData;
import org.elasticsearch.common.blobstore.BlobPath;
import org.elasticsearch.common.blobstore.DeleteResult;
Expand All @@ -53,7 +55,6 @@
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;

import java.io.IOException;
import java.io.InputStream;
Expand All @@ -67,9 +68,10 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

Expand Down Expand Up @@ -188,72 +190,61 @@ public boolean blobExists(String account, String container, String blob) throws
});
}

public void deleteBlob(String account, String container, String blob) throws URISyntaxException, StorageException {
public void deleteBlobsIgnoringIfNotExists(String account, String container, Collection<String> blobs)
throws URISyntaxException, StorageException {
logger.trace(() -> new ParameterizedMessage("delete blobs for container [{}], blob [{}]", container, blobs));
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
// Container name must be lower case.
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
logger.trace(() -> new ParameterizedMessage("delete blob for container [{}], blob [{}]", container, blob));
SocketAccess.doPrivilegedVoidException(() -> {
final CloudBlockBlob azureBlob = blobContainer.getBlockBlobReference(blob);
logger.trace(() -> new ParameterizedMessage("container [{}]: blob [{}] found. removing.", container, blob));
azureBlob.delete(DeleteSnapshotsOption.NONE, null, null, client.v2().get());
});
final Iterator<String> blobIterator = blobs.iterator();
int currentBatchSize = 0;
while (blobIterator.hasNext()) {
final BlobDeleteBatchOperation batchDeleteOp = new BlobDeleteBatchOperation();
do {
batchDeleteOp.addSubOperation(blobContainer.getBlockBlobReference(blobIterator.next()),
DeleteSnapshotsOption.NONE, null, null);
++currentBatchSize;
} while (blobIterator.hasNext() && currentBatchSize < Constants.BATCH_MAX_REQUESTS);
currentBatchSize = 0;
try {
SocketAccess.doPrivilegedVoidException(() -> blobContainer.getServiceClient().executeBatch(batchDeleteOp));
} catch (BatchException e) {
for (StorageException ex : e.getExceptions().values()) {
if (ex.getHttpStatusCode() != HttpURLConnection.HTTP_NOT_FOUND) {
logger.error("Batch exceptions [{}]", e.getExceptions());
throw e;
}
}
}
}
}

DeleteResult deleteBlobDirectory(String account, String container, String path, Executor executor)
DeleteResult deleteBlobDirectory(String account, String container, String path)
throws URISyntaxException, StorageException, IOException {
final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account);
final CloudBlobContainer blobContainer = client.v1().getContainerReference(container);
final Collection<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
final AtomicLong outstanding = new AtomicLong(1L);
final PlainActionFuture<Void> result = PlainActionFuture.newFuture();
final AtomicLong blobsDeleted = new AtomicLong();
final AtomicLong bytesDeleted = new AtomicLong();
final List<String> blobsToDelete = new ArrayList<>();
SocketAccess.doPrivilegedVoidException(() -> {
for (final ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
for (ListBlobItem blobItem : blobContainer.listBlobs(path, true)) {
// uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/
// this requires 1 + container.length() + 1, with each 1 corresponding to one of the /
final String blobPath = blobItem.getUri().getPath().substring(1 + container.length() + 1);
outstanding.incrementAndGet();
executor.execute(new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
final long len;
if (blobItem instanceof CloudBlob) {
len = ((CloudBlob) blobItem).getProperties().getLength();
} else {
len = -1L;
}
deleteBlob(account, container, blobPath);
blobsDeleted.incrementAndGet();
if (len >= 0) {
bytesDeleted.addAndGet(len);
}
}

@Override
public void onFailure(Exception e) {
exceptions.add(e);
}

@Override
public void onAfter() {
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
}
});
final long len;
if (blobItem instanceof CloudBlob) {
len = ((CloudBlob) blobItem).getProperties().getLength();
} else {
len = -1L;
}
blobsToDelete.add(blobPath);
blobsDeleted.incrementAndGet();
if (len >= 0) {
bytesDeleted.addAndGet(len);
}
}
});
if (outstanding.decrementAndGet() == 0) {
result.onResponse(null);
}
result.actionGet();
if (exceptions.isEmpty() == false) {
final IOException ex = new IOException("Deleting directory [" + path + "] failed");
exceptions.forEach(ex::addSuppressed);
throw ex;
}
deleteBlobsIgnoringIfNotExists(account, container, blobsToDelete);
return new DeleteResult(blobsDeleted.get(), bytesDeleted.get());
}

Expand Down
Loading

0 comments on commit ace1a1b

Please sign in to comment.