Skip to content

Commit

Permalink
Support dynamically removing store from storage manager
Browse files Browse the repository at this point in the history
1. Given partition id, remove the corresponding store from storage
manager, disk manager and compaction manager.
2. Support deleting allocated files of store and returning swap
segments to reserver pool if necessary.
  • Loading branch information
jsjtzyy committed Aug 8, 2019
1 parent 73468e0 commit f86a962
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,29 @@ boolean controlCompactionForBlobStore(BlobStore store, boolean enable) {
return compactionExecutor == null || compactionExecutor.controlCompactionForBlobStore(store, enable);
}

/**
*
* @param store
* @return
*/
boolean removeBlobStore(BlobStore store) {
boolean result;
if (compactionExecutor == null) {
stores.remove(store);
result = true;
} else {
if (!compactionExecutor.getStoresDisabledCompaction().contains(store)) {
logger.error("Fail to remove store ({}) from compaction manager because compaction on it is still enabled",
store);
result = false;
} else {
// stores.remove(store) is invoked within compactionExecutor.removeBlobStore() because it requires lock
result = compactionExecutor.removeBlobStore(store);
}
}
return result;
}

/**
* Get compaction details for a given {@link BlobStore} if any
* @param blobStore the {@link BlobStore} for which compation details are requested
Expand Down Expand Up @@ -345,6 +368,25 @@ boolean controlCompactionForBlobStore(BlobStore store, boolean enable) {
}
return true;
}

boolean removeBlobStore(BlobStore store) {
lock.lock();
try {
stores.remove(store);
// It's ok to remove store from "storesDisabledCompaction" and "storesToSkip" list while executor thread is
// going through each store to check compaction eligibility. Note that the executor will first check if store
// is started, which is guaranteed to be false before removeBlobStore() is invoked.
storesDisabledCompaction.remove(store);
storesToSkip.remove(store);
} finally {
lock.unlock();
}
return true;
}

Set<BlobStore> getStoresDisabledCompaction() {
return storesDisabledCompaction;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ void start() throws InterruptedException {
partitionAndStore.getValue().start();
} catch (Exception e) {
numStoreFailures.incrementAndGet();
logger.error("Exception while starting store for the partition" + partitionAndStore.getKey(), e);
logger.error("Exception while starting store for the " + partitionAndStore.getKey(), e);
}
}, false);
thread.start();
Expand Down Expand Up @@ -302,6 +302,32 @@ boolean shutdownBlobStore(PartitionId id) {
}
}

/**
* Given partition id, remove the corresponding blob store in disk manager
* @param id the {@link PartitionId} of the {@link BlobStore} which should be removed.
* @return {@code true} if store removal was successful. {@code false} if not.
*/
boolean removeBlobStore(PartitionId id) {
BlobStore store = stores.get(id);
if (store == null) {
logger.info("Store {} is not found in disk manager", id);
return true;
}
if (!running || store.isStarted()) {
logger.error("Removing store {} failed. Disk running = {}, store running = {}", id, running, store.isStarted());
return false;
}
if (!compactionManager.removeBlobStore(store)) {
logger.error("Fail to remove store {} from compaction manager.", id);
return false;
}
stores.remove(id);
stoppedReplicas.remove(id.toPathString());
partitionToReplicaMap.remove(id);
logger.info("Store {} is successfully removed from disk manager", id);
return true;
}

/**
* Set the BlobStore stopped state with given {@link PartitionId} {@code id}.
* @param partitionIds a list of {@link PartitionId} of the {@link BlobStore} whose stopped state should be set.
Expand Down Expand Up @@ -384,6 +410,8 @@ boolean areAllStoresDown() {

/**
* Reports any unrecognized directories on disk
* TODO go to unrecognized dir and check if there is remove store event log. If yes, this method should clean up that
* store dir and return swap segment to reserve pool if needed.
*/
private void reportUnrecognizedDirs() {
File[] dirs = new File(disk.getMountPath()).listFiles(File::isDirectory);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,21 @@ public boolean shutdownBlobStore(PartitionId id) {
return diskManager != null && diskManager.shutdownBlobStore(id);
}

public boolean removeBlobStore(PartitionId id) {
DiskManager diskManager = partitionToDiskManager.get(id);
if (diskManager == null) {
logger.info("Store {} is not found in storage manager", id);
return true;
}
if (!diskManager.removeBlobStore(id)) {
logger.error("Fail to remove store {} from disk manager", id);
return false;
}
partitionToDiskManager.remove(id);
logger.info("Store {} is successfully removed from storage manager", id);
return true;
}

/**
* Set BlobStore Stopped state with given {@link PartitionId} {@code id}.
* @param partitionIds a list {@link PartitionId} of the {@link Store} whose stopped state should be set.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,49 @@ public void shutdownBlobStoreTest() throws Exception {
shutdownAndAssertStoresInaccessible(storageManager, replicas);
}

/**
* Test remove blob store with given {@link PartitionId}
* @throws Exception
*/
@Test
public void removeBlobStoreTest() throws Exception {
MockDataNodeId dataNode = clusterMap.getDataNodes().get(0);
List<ReplicaId> replicas = clusterMap.getReplicaIds(dataNode);
List<MockDataNodeId> dataNodes = new ArrayList<>();
dataNodes.add(dataNode);
MockPartitionId invalidPartition =
new MockPartitionId(Long.MAX_VALUE, MockClusterMap.DEFAULT_PARTITION_CLASS, dataNodes, 0);
StorageManager storageManager = createStorageManager(replicas, metricRegistry, null);
storageManager.start();
// shut down replica[1] ~ replica[size - 2]. The replica[0] will be used to test removing store that disk is not running
// Replica[1] will be used to test removing a started store. Replica[2] will be used to test a store with compaction enabled
System.out.println("size = " + replicas.size());
for (int i = 3; i < replicas.size(); i++) {
ReplicaId replica = replicas.get(i);
PartitionId id = replica.getPartitionId();
assertTrue("Disable compaction should succeed", storageManager.controlCompactionForBlobStore(id, false));
assertTrue("Shutdown should succeed on given store", storageManager.shutdownBlobStore(id));
assertTrue("Removing store should succeed", storageManager.removeBlobStore(id));
assertNull("The store should not exist", storageManager.getStore(id));
}
// test remove store that compaction is still enabled on it, even though it is shutdown
PartitionId id = replicas.get(2).getPartitionId();
assertTrue("Shutdown should succeed on given store", storageManager.shutdownBlobStore(id));
assertFalse("Removing store should fail because compaction is enabled on this store",
storageManager.removeBlobStore(id));
// test remove store that is still started
id = replicas.get(1).getPartitionId();
assertFalse("Removing store should fail because store is still started", storageManager.removeBlobStore(id));
// test remove store that the disk manager is not running
id = replicas.get(0).getPartitionId();
storageManager.getDiskManager(id).shutdown();
assertFalse("Removing store should fail because disk mananager is not running", storageManager.removeBlobStore(id));
// test a store that doesn't exceed
assertTrue("Removing not-found store should be considered success",
storageManager.removeBlobStore(invalidPartition));
shutdownAndAssertStoresInaccessible(storageManager, replicas);
}

/**
* Test set stopped state of blobstore with given list of {@link PartitionId} in failure cases.
*/
Expand Down

0 comments on commit f86a962

Please sign in to comment.