Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,12 @@ public void compactDB() throws IOException {
}
}

@Override
public void flushDB(boolean sync) {
// TODO: Implement flush for level db
// do nothing
}

@Override
public void writeBatch(BatchOperation operation) throws IOException {
List<BatchOperation.SingleOperation> operations =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
*/
void compactDB() throws IOException;

/**
* Flush the outstanding I/O operations of the DB.
* @param sync if true will sync the outstanding I/Os to the disk.
*/
void flushDB(boolean sync) throws IOException;

/**
* Destroy the content of the specified database,
* a destroyed database will not be able to load again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,19 @@ public void compactDB() throws IOException {
}
}

@Override
public void flushDB(boolean sync) throws IOException {
if (db != null) {
try {
// for RocksDB it is sufficient to flush the WAL as entire db can
// be reconstructed using it.
db.flushWal(sync);
} catch (RocksDBException e) {
throw toIOException("Failed to flush db", e);
}
}
}

private void deleteQuietly(File fileOrDir) {
if (fileOrDir != null && fileOrDir.exists()) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ enum Result {
CONTAINER_NOT_OPEN = 39;
CONTAINER_MISSING = 40;
BLOCK_TOKEN_VERIFICATION_FAILED = 41;
ERROR_IN_DB_SYNC = 42;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@
.Result.DISK_OUT_OF_SPACE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.ERROR_IN_COMPACT_DB;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.ERROR_IN_DB_SYNC;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.Result.INVALID_CONTAINER_STATE;
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
Expand Down Expand Up @@ -298,8 +300,14 @@ public void markContainerUnhealthy() throws StorageContainerException {

@Override
public void quasiClose() throws StorageContainerException {
// The DB must be synced during close operation
flushAndSyncDB();

writeLock();
try {
// Second sync should be a very light operation as sync has already
// been done outside the lock.
flushAndSyncDB();
updateContainerData(containerData::quasiCloseContainer);
} finally {
writeUnlock();
Expand All @@ -308,16 +316,18 @@ public void quasiClose() throws StorageContainerException {

@Override
public void close() throws StorageContainerException {
// The DB must be synced during close operation
flushAndSyncDB();

writeLock();
try {
// Second sync should be a very light operation as sync has already
// been done outside the lock.
flushAndSyncDB();
updateContainerData(containerData::closeContainer);
} finally {
writeUnlock();
}

// It is ok if this operation takes a bit of time.
// Close container is not expected to be instantaneous.
compactDB();
}

/**
Expand Down Expand Up @@ -365,6 +375,22 @@ void compactDB() throws StorageContainerException {
}
}

private void flushAndSyncDB() throws StorageContainerException {
try {
try (ReferenceCountedDB db = BlockUtils.getDB(containerData, config)) {
db.getStore().flushDB(true);
LOG.info("Container {} is synced with bcsId {}.",
containerData.getContainerID(),
containerData.getBlockCommitSequenceId());
}
} catch (StorageContainerException ex) {
throw ex;
} catch (IOException ex) {
LOG.error("Error in DB sync while closing container", ex);
throw new StorageContainerException(ex, ERROR_IN_DB_SYNC);
}
}

@Override
public KeyValueContainerData getContainerData() {
return containerData;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ public void testMarkClosedContainerAsUnhealthy() throws IOException {
*/
@Test
public void testMarkQuasiClosedContainerAsUnhealthy() throws IOException {
// We need to create the container so the sync-on-quasi-close operation
// does not NPE.
keyValueContainer.create(volumeSet, volumeChoosingPolicy, scmId);
keyValueContainer.quasiClose();
keyValueContainer.markContainerUnhealthy();
assertThat(keyValueContainerData.getState(), is(UNHEALTHY));
Expand Down