Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(toolkit/lite): optimize DbLite tool for database pruning #5876

Open
wants to merge 2 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion plugins/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ DB lite provides lite database, parameters are compatible with previous `LiteFul
- `-t | --type`: Only used with operate=split: [snapshot,history], default: snapshot.
- `-fn | --fn-data-path`: The database path to be split or merged.
- `-ds | --dataset-path`: When operation is `split`,`dataset-path` is the path that store the `snapshot` or `history`, when
operation is `split`, `dataset-path` is the `history` data path.
operation is `merge`, `dataset-path` is the `history` data path.
- `-h | --help`: Provide the help info.

### Examples:
Expand Down
117 changes: 35 additions & 82 deletions plugins/src/main/java/org/tron/plugins/DbLite.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.Ints;
import com.google.common.primitives.Longs;
import java.io.File;
import java.io.FileNotFoundException;
Expand Down Expand Up @@ -99,7 +97,7 @@ enum Type { snapshot, history }
required = true,
description = "when operation is `split`,"
+ "`dataset-path` is the path that store the `snapshot` or `history`,"
+ "when operation is `split`,"
+ "when operation is `merge`,"
+ "`dataset-path` is the `history` data path.",
order = 4)
private String datasetPath;
Expand All @@ -117,6 +115,8 @@ public Integer call() {
return 0;
}
try {
// merge checkpoint to fnDataPath first
mergeCheckpoint(fnDataPath);
switch (this.operate) {
case split:
if (Type.snapshot == this.type) {
Expand Down Expand Up @@ -158,7 +158,6 @@ public void generateSnapshot(String sourceDir, String snapshotDir) {
hasEnoughBlock(sourceDir);
List<String> snapshotDbs = getSnapshotDbs(sourceDir);
split(sourceDir, snapshotDir, snapshotDbs);
mergeCheckpoint2Snapshot(sourceDir, snapshotDir);
// write genesisBlock , latest recent blocks and trans
fillSnapshotBlockAndTransDb(sourceDir, snapshotDir);
// save min block to info
Expand Down Expand Up @@ -194,7 +193,6 @@ public void generateHistory(String sourceDir, String historyDir) {
}
hasEnoughBlock(sourceDir);
split(sourceDir, historyDir, archiveDbs);
mergeCheckpoint2History(sourceDir, historyDir);
// save max block to info
generateInfoProperties(Paths.get(historyDir, INFO_FILE_NAME).toString(),
getLatestBlockHeaderNum(sourceDir));
Expand Down Expand Up @@ -263,15 +261,6 @@ private List<String> getSnapshotDbs(String sourceDir) {
return snapshotDbs;
}

private void mergeCheckpoint2Snapshot(String sourceDir, String historyDir) {
List<String> snapshotDbs = getSnapshotDbs(sourceDir);
mergeCheckpoint(sourceDir, historyDir, snapshotDbs);
}

private void mergeCheckpoint2History(String sourceDir, String destDir) {
mergeCheckpoint(sourceDir, destDir, archiveDbs);
}

private void split(String sourceDir, String destDir, List<String> dbs) throws IOException {
logger.info("Begin to split the dbs.");
spec.commandLine().getOut().println("Begin to split the dbs.");
Expand All @@ -286,32 +275,37 @@ private void split(String sourceDir, String destDir, List<String> dbs) throws IO
if (!destPath.mkdirs()) {
throw new RuntimeException(String.format("destDir: %s create failed, please check", destDir));
}
FileUtils.copyDatabases(Paths.get(sourceDir), Paths.get(destDir), dbs);
copyDatabases(Paths.get(sourceDir), Paths.get(destDir), dbs);
}

private void mergeCheckpoint(String sourceDir, String destDir, List<String> destDbs) {
logger.info("Begin to merge checkpoint to dataset.");
spec.commandLine().getOut().println("Begin to merge checkpoint to dataset.");
private void mergeCheckpoint(String sourceDir) {
logger.info("Begin to merge checkpoint to dataset {}.", sourceDir);
spec.commandLine().getOut().format("Begin to merge checkpoint to dataset %s.", sourceDir)
.println();
try {
List<String> cpList = getCheckpointV2List(sourceDir);
if (cpList.size() > 0) {
for (String cp : cpList) {
DBInterface checkpointDb = DbTool.getDB(
sourceDir + "/" + DBUtils.CHECKPOINT_DB_V2, cp);
recover(checkpointDb, destDir, destDbs);
recover(checkpointDb, sourceDir);
}
} else if (Paths.get(sourceDir, CHECKPOINT_DB).toFile().exists()) {
DBInterface tmpDb = DbTool.getDB(sourceDir, CHECKPOINT_DB);
recover(tmpDb, destDir, destDbs);
recover(tmpDb, sourceDir);
}
} catch (IOException | RocksDBException e) {
throw new RuntimeException(e);
}
logger.info("End to merge checkpoint to dataset {}.", sourceDir);
spec.commandLine().getOut().format("End to merge checkpoint to dataset %s.", sourceDir)
.println();
}

private void recover(DBInterface db, String destDir, List<String> destDbs)
private void recover(DBInterface checkpoint, String destDir)
throws IOException, RocksDBException {
try (DBIterator iterator = db.iterator()) {
logger.info("Begin to recover checkpoint {} to dataset {}.", checkpoint.getName(), destDir);
try (DBIterator iterator = checkpoint.iterator()) {
for (iterator.seekToFirst(); iterator.hasNext(); iterator.next()) {
byte[] key = iterator.getKey();
byte[] value = iterator.getValue();
Expand All @@ -323,21 +317,20 @@ private void recover(DBInterface db, String destDir, List<String> destDbs)
byte[] realKey = Arrays.copyOfRange(key, dbName.getBytes().length + 4, key.length);
byte[] realValue =
value.length == 1 ? null : Arrays.copyOfRange(value, 1, value.length);
if (destDbs != null && destDbs.contains(dbName)) {
DBInterface destDb = DbTool.getDB(destDir, dbName);
if (realValue != null) {
destDb.put(realKey, realValue);
DBInterface destDb = DbTool.getDB(destDir, dbName);
if (realValue != null) {
destDb.put(realKey, realValue);
} else {
byte op = value[0];
if (DBUtils.Operator.DELETE.getValue() == op) {
destDb.delete(realKey);
} else {
byte op = value[0];
if (DBUtils.Operator.DELETE.getValue() == op) {
destDb.delete(realKey);
} else {
destDb.put(realKey, new byte[0]);
}
destDb.put(realKey, new byte[0]);
}
}
}
}
logger.info("End to recover checkpoint {} to dataset {}.", checkpoint.getName(), destDir);
}

private void generateInfoProperties(String propertyfile, long num)
Expand All @@ -353,30 +346,14 @@ private void generateInfoProperties(String propertyfile, long num)
}

private long getLatestBlockHeaderNum(String databaseDir) throws IOException, RocksDBException {
// query latest_block_header_number from checkpoint first
final String latestBlockHeaderNumber = "latest_block_header_number";
DBInterface checkpointDb = getCheckpointDb(databaseDir);
Long blockNumber = getLatestBlockHeaderNumFromCP(checkpointDb,
latestBlockHeaderNumber.getBytes());
if (blockNumber != null) {
return blockNumber;
}
// query from propertiesDb if checkpoint not contains latest_block_header_number
DBInterface propertiesDb = DbTool.getDB(databaseDir, PROPERTIES_DB_NAME);
return Optional.ofNullable(propertiesDb.get(ByteArray.fromString(latestBlockHeaderNumber)))
.map(ByteArray::toLong)
.orElseThrow(
() -> new IllegalArgumentException("not found latest block header number"));
}

private Long getLatestBlockHeaderNumFromCP(DBInterface db, byte[] key) {
byte[] value = db.get(Bytes.concat(simpleEncode(PROPERTIES_DB_NAME), key));
if (value != null && value.length > 1) {
return ByteArray.toLong(Arrays.copyOfRange(value, 1, value.length));
}
return null;
}

/**
* recent blocks, trans and genesis block.
*/
Expand Down Expand Up @@ -443,15 +420,6 @@ private byte[] getGenesisBlockHash(String parentDir) throws IOException, RocksDB
return result;
}

private static byte[] simpleEncode(String s) {
byte[] bytes = s.getBytes();
byte[] length = Ints.toByteArray(bytes.length);
byte[] r = new byte[4 + bytes.length];
System.arraycopy(length, 0, r, 0, 4);
System.arraycopy(bytes, 0, r, 4, bytes.length);
return r;
}

private BlockNumInfo checkAndGetBlockNumInfo(String historyDir, String liteDir)
throws IOException, RocksDBException {
logger.info("Check the compatibility of this history.");
Expand Down Expand Up @@ -495,14 +463,21 @@ private void backupArchiveDbs(String databaseDir) throws IOException {
if (!FileUtils.createDirIfNotExists(bakDir.toString())) {
throw new RuntimeException(String.format("create bak dir %s failed", bakDir));
}
FileUtils.copyDatabases(Paths.get(databaseDir), bakDir, archiveDbs);
copyDatabases(Paths.get(databaseDir), bakDir, archiveDbs);
archiveDbs.forEach(db -> FileUtils.deleteDir(new File(databaseDir, db)));
}

private void copyHistory2Database(String historyDir, String databaseDir) throws IOException {
logger.info("Begin to copy history to database.");
spec.commandLine().getOut().println("Begin to copy history to database.");
FileUtils.copyDatabases(Paths.get(historyDir), Paths.get(databaseDir), archiveDbs);
copyDatabases(Paths.get(historyDir), Paths.get(databaseDir), archiveDbs);
}

private static void copyDatabases(Path src, Path dest, List<String> subDirs)
throws IOException {
// Close db first
DbTool.close();
FileUtils.copyDatabases(src, dest, subDirs);
}

private void trimExtraHistory(String liteDir, BlockNumInfo blockNumInfo)
Expand Down Expand Up @@ -586,16 +561,7 @@ private void mergeBak2Database(String liteDir, BlockNumInfo blockNumInfo) throws
private byte[] getDataFromSourceDB(String sourceDir, String dbName, byte[] key)
throws IOException, RocksDBException {
DBInterface sourceDb = DbTool.getDB(sourceDir, dbName);
DBInterface checkpointDb = getCheckpointDb(sourceDir);
// get data from tmp first.
byte[] valueFromTmp = checkpointDb.get(Bytes.concat(simpleEncode(dbName), key));
byte[] value;
if (isEmptyBytes(valueFromTmp)) {
value = sourceDb.get(key);
} else {
value = valueFromTmp.length == 1
? null : Arrays.copyOfRange(valueFromTmp, 1, valueFromTmp.length);
}
byte[] value = sourceDb.get(key);
if (isEmptyBytes(value)) {
throw new RuntimeException(String.format("data not found in store, dbName: %s, key: %s",
dbName, Arrays.toString(key)));
Expand Down Expand Up @@ -664,19 +630,6 @@ private long getSecondBlock(String databaseDir) throws RocksDBException, IOExcep
return num;
}

private DBInterface getCheckpointDb(String sourceDir) throws IOException, RocksDBException {
List<String> cpList = getCheckpointV2List(sourceDir);
DBInterface checkpointDb;
if (cpList.size() > 0) {
String latestCp = cpList.get(cpList.size() - 1);
checkpointDb = DbTool.getDB(
sourceDir + "/" + DBUtils.CHECKPOINT_DB_V2, latestCp);
} else {
checkpointDb = DbTool.getDB(sourceDir, CHECKPOINT_DB);
}
return checkpointDb;
}

@VisibleForTesting
public static void setRecentBlks(long recentBlks) {
RECENT_BLKS = recentBlks;
Expand Down
Loading