Skip to content

Commit

Permalink
feat(ci): support snapshot download/unpack (#429)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xNineteen authored Dec 17, 2024
1 parent e04e336 commit 2cf44b6
Show file tree
Hide file tree
Showing 12 changed files with 438 additions and 257 deletions.
1 change: 1 addition & 0 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ pub fn build(b: *Build) void {
benchmark_exe.root_module.addImport("zig-network", zig_network_module);
benchmark_exe.root_module.addImport("httpz", httpz_mod);
benchmark_exe.root_module.addImport("zstd", zstd_mod);
benchmark_exe.root_module.addImport("curl", curl_mod);
benchmark_exe.root_module.addImport("prettytable", pretty_table_mod);
switch (blockstore_db) {
.rocksdb => benchmark_exe.root_module.addImport("rocksdb", rocksdb_mod),
Expand Down
107 changes: 50 additions & 57 deletions src/accountsdb/db.zig
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ const Slot = sig.core.Slot;

const NestedHashTree = sig.utils.merkle_tree.NestedHashTree;
const Logger = sig.trace.log.Logger;

const GeyserWriter = sig.geyser.GeyserWriter;

const Counter = sig.prometheus.counter.Counter;
Expand All @@ -60,6 +59,9 @@ const spawnThreadTasks = sig.utils.thread.spawnThreadTasks;
const printTimeEstimate = sig.time.estimate.printTimeEstimate;
const globalRegistry = sig.prometheus.registry.globalRegistry;

const LOG_SCOPE = "accounts_db";
const ScopedLogger = sig.trace.log.ScopedLogger(LOG_SCOPE);

pub const DB_LOG_RATE = sig.time.Duration.fromSecs(5);
pub const DB_MANAGER_LOOP_MIN = sig.time.Duration.fromSecs(5);

Expand All @@ -74,10 +76,10 @@ pub const DELETE_ACCOUNT_FILES_MIN = 100;
pub const AccountsDB = struct {

// injected dependencies

allocator: std.mem.Allocator,
metrics: AccountsDBMetrics,
logger: Logger,
logger: ScopedLogger,

/// Not closed by the `AccountsDB`, but must live at least as long as it.
snapshot_dir: std.fs.Dir,
geyser_writer: ?*GeyserWriter,
Expand Down Expand Up @@ -204,7 +206,7 @@ pub const AccountsDB = struct {
return .{
.allocator = params.allocator,
.metrics = metrics,
.logger = params.logger,
.logger = params.logger.withScope(LOG_SCOPE),
.snapshot_dir = params.snapshot_dir,
.geyser_writer = params.geyser_writer,
.gossip_view = params.gossip_view,
Expand Down Expand Up @@ -307,8 +309,7 @@ pub const AccountsDB = struct {
var fastload_dir = try self.snapshot_dir.makeOpenPath("fastload_state", .{});
defer fastload_dir.close();

self.logger.info().log("saving account index state to disk...");
try self.account_index.saveToDisk(fastload_dir, self.logger);
try self.account_index.saveToDisk(fastload_dir);
}

if (validate) {
Expand Down Expand Up @@ -383,7 +384,7 @@ pub const AccountsDB = struct {

// NOTE: index loading was the most expensive part which we fastload here
self.logger.info().log("loading account index");
try self.account_index.loadFromDisk(dir, self.logger);
try self.account_index.loadFromDisk(dir);
}

/// loads the account files and generates the account index from a snapshot
Expand Down Expand Up @@ -710,7 +711,7 @@ pub const AccountsDB = struct {

if (print_progress and progress_timer.read().asNanos() > DB_LOG_RATE.asNanos()) {
printTimeEstimate(
self.logger.withScope(@typeName(Self)),
self.logger,
&timer,
n_account_files,
file_count,
Expand Down Expand Up @@ -748,7 +749,7 @@ pub const AccountsDB = struct {

if (print_progress and progress_timer.read().asNanos() > DB_LOG_RATE.asNanos()) {
printTimeEstimate(
self.logger.withScope(@typeName(Self)),
self.logger,
&timer,
n_accounts_total,
ref_count,
Expand All @@ -767,14 +768,14 @@ pub const AccountsDB = struct {
thread_dbs: []AccountsDB,
n_threads: usize,
) !void {
var combine_indexes_wg: std.Thread.WaitGroup = .{};
defer combine_indexes_wg.wait();
try spawnThreadTasks(combineThreadIndexesMultiThread, .{
.wg = &combine_indexes_wg,
var merge_indexes_wg: std.Thread.WaitGroup = .{};
defer merge_indexes_wg.wait();
try spawnThreadTasks(mergeThreadIndexesMultiThread, .{
.wg = &merge_indexes_wg,
.data_len = self.account_index.pubkey_ref_map.numberOfShards(),
.max_threads = n_threads,
.params = .{
self.logger.unscoped(),
self.logger,
&self.account_index,
thread_dbs,
},
Expand Down Expand Up @@ -836,13 +837,12 @@ pub const AccountsDB = struct {

/// combines multiple thread indexes into the given index.
/// each bin is also sorted by pubkey.
pub fn combineThreadIndexesMultiThread(
logger_: Logger,
pub fn mergeThreadIndexesMultiThread(
logger: ScopedLogger,
index: *AccountIndex,
thread_dbs: []const AccountsDB,
task: sig.utils.thread.TaskParams,
) !void {
const logger = logger_.withScope(@typeName(Self));
const shard_start_index = task.start_index;
const shard_end_index = task.end_index;

Expand Down Expand Up @@ -886,7 +886,7 @@ pub const AccountsDB = struct {
&timer,
total_shards,
iteration_count,
"combining thread indexes",
"merging thread indexes",
"thread0",
);
progress_timer.reset();
Expand Down Expand Up @@ -1245,7 +1245,7 @@ pub const AccountsDB = struct {

if (print_progress and progress_timer.read() > DB_LOG_RATE.asNanos()) {
printTimeEstimate(
self.logger.withScope(@typeName(Self)),
self.logger,
&timer,
shards.len,
count,
Expand Down Expand Up @@ -4390,43 +4390,53 @@ test "generate snapshot & update gossip snapshot hashes" {
}
}

pub fn getAccountPerFileEstimateFromCluster(
cluster: sig.core.Cluster,
) error{NotImplementedYet}!u64 {
return switch (cluster) {
.testnet => 500,
else => error.NotImplementedYet,
};
}

pub const BenchmarkAccountsDBSnapshotLoad = struct {
pub const min_iterations = 1;
pub const max_iterations = 1;

pub const SNAPSHOT_DIR_PATH = sig.TEST_DATA_DIR ++ "bench_snapshot/";

pub const BenchArgs = struct {
use_disk: bool,
n_threads: u32,
name: []const u8,
cluster: sig.core.Cluster,
// TODO: support fastloading checks
};

pub const args = [_]BenchArgs{
BenchArgs{
.name = "RAM index (2 threads)",
.name = "testnet - ram index - 4 threads",
.use_disk = false,
.n_threads = 2,
.n_threads = 4,
.cluster = .testnet,
},
// BenchArgs{
// .use_disk = true,
// .n_threads = 2,
// .name = "DISK index (2 threads)",
// },
};

pub fn loadAndVerifySnapshot(units: BenchTimeUnit, bench_args: BenchArgs) !struct {
load_time: u64,
validate_time: u64,
} {
const allocator = std.heap.c_allocator;
const logger = .noop;
var print_logger = sig.trace.DirectPrintLogger.init(allocator, .debug);
const logger = print_logger.logger();

// unpack the snapshot
// NOTE: usually this will be an incremental snapshot
// renamed as a full snapshot (mv {inc-snap-fmt}.tar.zstd {full-snap-fmt}.tar.zstd)
// (because test snapshots are too small and full snapshots are too big)
const dir_path = sig.TEST_DATA_DIR ++ "bench_snapshot/";
var snapshot_dir = std.fs.cwd().openDir(dir_path, .{ .iterate = true }) catch {
std.debug.print("need to setup a snapshot in {s} for this benchmark...\n", .{dir_path});
var snapshot_dir = std.fs.cwd().openDir(SNAPSHOT_DIR_PATH, .{ .iterate = true }) catch {
// not snapshot -> early exit
std.debug.print(
"need to setup a snapshot in {s} for this benchmark...\n",
.{SNAPSHOT_DIR_PATH},
);
const zero_duration = sig.time.Duration.fromNanos(0);
return .{
.load_time = zero_duration.asNanos(),
Expand All @@ -4436,29 +4446,12 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
defer snapshot_dir.close();

const snapshot_files = try SnapshotFiles.find(allocator, snapshot_dir);

var accounts_dir = inline for (0..2) |attempt| {
if (snapshot_dir.openDir("accounts", .{ .iterate = true })) |accounts_dir|
break accounts_dir
else |err| switch (err) {
else => |e| return e,
error.FileNotFound => if (attempt == 0) {
const archive_file = try snapshot_dir.openFile(snapshot_files.full.snapshotArchiveName().constSlice(), .{});
defer archive_file.close();
try parallelUnpackZstdTarBall(
allocator,
logger,
archive_file,
snapshot_dir,
try std.Thread.getCpuCount() / 2,
true,
);
},
}
} else return error.SnapshotMissingAccountsDir;
defer accounts_dir.close();

const full_inc_manifest = try FullAndIncrementalManifest.fromFiles(allocator, logger, snapshot_dir, snapshot_files);
const full_inc_manifest = try FullAndIncrementalManifest.fromFiles(
allocator,
logger,
snapshot_dir,
snapshot_files,
);
defer full_inc_manifest.deinit(allocator);
const collapsed_manifest = try full_inc_manifest.collapse(allocator);

Expand All @@ -4478,7 +4471,7 @@ pub const BenchmarkAccountsDBSnapshotLoad = struct {
collapsed_manifest.accounts_db_fields,
bench_args.n_threads,
allocator,
500,
try getAccountPerFileEstimateFromCluster(bench_args.cluster),
);

const full_snapshot = full_inc_manifest.full;
Expand Down
Loading

0 comments on commit 2cf44b6

Please sign in to comment.