Skip to content

Commit

Permalink
fix(snapshots,accountsdb): Re-design the collapse method (#442)
Browse files Browse the repository at this point in the history
  • Loading branch information
InKryption authored Dec 17, 2024
1 parent 7f4165b commit e04e336
Show file tree
Hide file tree
Showing 8 changed files with 327 additions and 251 deletions.
216 changes: 120 additions & 96 deletions src/accountsdb/db.zig

Large diffs are not rendered by default.

16 changes: 10 additions & 6 deletions src/accountsdb/fuzz.zig
Original file line number Diff line number Diff line change
Expand Up @@ -303,13 +303,13 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
maybe_incremental_file_info,
);

var snapshot_fields = try sig.accounts_db.FullAndIncrementalManifest.fromFiles(
const combined_manifest = try sig.accounts_db.FullAndIncrementalManifest.fromFiles(
allocator,
logger,
alternative_snapshot_dir,
snapshot_files,
);
defer snapshot_fields.deinit(allocator);
defer combined_manifest.deinit(allocator);

var alt_accounts_db = try AccountsDB.init(.{
.allocator = allocator,
Expand All @@ -323,17 +323,21 @@ pub fn run(seed: u64, args: *std.process.ArgIterator) !void {
});
defer alt_accounts_db.deinit();

_ = try alt_accounts_db.loadWithDefaults(
(try alt_accounts_db.loadWithDefaults(
allocator,
&snapshot_fields,
combined_manifest,
1,
true,
N_ACCOUNTS_PER_SLOT,
false,
false,
);
)).deinit(allocator);

const maybe_inc_slot = if (snapshot_info.inc) |inc| inc.slot else null;
logger.info().logf("loaded and validated snapshot at slot: {} (and inc snapshot @ slot {any})", .{ full_snapshot_info.slot, maybe_inc_slot });
logger.info().logf(
"loaded and validated snapshot at slot: {} (and inc snapshot @ slot {any})",
.{ full_snapshot_info.slot, maybe_inc_slot },
);
}
}

Expand Down
10 changes: 5 additions & 5 deletions src/accountsdb/fuzz_snapshot.zig
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,13 @@ pub fn run(args: *std.process.ArgIterator) !void {
while (timer.read() < MAX_FUZZ_TIME_NS) : (i += 1) {
bytes_buffer.clearRetainingCapacity();

const snapshot_original: SnapshotManifest = try randomSnapshotFields(allocator, random);
defer snapshot_original.deinit(allocator);
const manifest_original: SnapshotManifest = try randomSnapshotManifest(allocator, random);
defer manifest_original.deinit(allocator);

try bytes_buffer.ensureUnusedCapacity(bincode.sizeOf(snapshot_original, .{}) * 2);
try bytes_buffer.ensureUnusedCapacity(bincode.sizeOf(manifest_original, .{}) * 2);

const original_bytes_start = bytes_buffer.items.len;
try bincode.write(bytes_buffer.writer(), snapshot_original, .{});
try bincode.write(bytes_buffer.writer(), manifest_original, .{});
const original_bytes_end = bytes_buffer.items.len;

const snapshot_deserialized = try bincode.readFromSlice(allocator, SnapshotManifest, bytes_buffer.items[original_bytes_start..original_bytes_end], .{});
Expand All @@ -69,7 +69,7 @@ pub fn run(args: *std.process.ArgIterator) !void {

const max_list_entries = 1 << 8;

fn randomSnapshotFields(
fn randomSnapshotManifest(
allocator: std.mem.Allocator,
/// Should be a PRNG, not a true RNG. See the documentation on `std.Random.uintLessThan`
/// for commentary on the runtime of this function.
Expand Down
2 changes: 1 addition & 1 deletion src/accountsdb/readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@ The core logic for generating a snapshot lives in `accounts_db.db.writeSnapshotT
The procedure consists of writing the version file, the status cache (`snapshots/status_cache`) file, the snapshot manifest (`snapshots/{SLOT}/{SLOT}`),
and the account files (`accounts/{SLOT}.{FILE_ID}`). This is all written to a stream in the TAR archive format.

The snapshot manifest file content is comprised of the bincoded (bincode-encoded) data structure `SnapshotFields`, which is an aggregate of:
The snapshot manifest file content is comprised of the bincoded (bincode-encoded) data structure `Manifest`, which is an aggregate of:
* implicit state: data derived from the current state of AccountsDB, like the file map for all the account which exist at that snapshot, or which have
changed relative to a full snapshot in an incremental one
* configuration state: data that is used to communicate details about the snapshot, like the full slot to which an incremental snapshot is relative.
Expand Down
153 changes: 89 additions & 64 deletions src/accountsdb/snapshots.zig
Original file line number Diff line number Diff line change
Expand Up @@ -2599,7 +2599,6 @@ pub const SnapshotFiles = struct {
pub const FullAndIncrementalManifest = struct {
full: Manifest,
incremental: ?Manifest,
was_collapsed: bool = false, // used for deinit()

pub fn fromFiles(
allocator: std.mem.Allocator,
Expand Down Expand Up @@ -2644,78 +2643,104 @@ pub const FullAndIncrementalManifest = struct {
};
}

/// collapse all full and incremental snapshots into one.
/// note: this works by stack copying the full snapshot and combining
/// the accounts-db account file map.
/// this will 1) modify the incremental snapshot account map
/// and 2) the returned snapshot heap fields will still point to the incremental snapshot
/// (so be sure not to deinit it while still using the returned snapshot)
pub const CollapseError = error{
/// There are storages for the same slot in both the full and incremental snapshot.
SnapshotSlotOverlap,
};

/// Like `collapseIfNecessary`, but returns a clone of the full snapshot
/// manifest if there is no incremental update to apply.
/// The caller is responsible for `.deinit`ing the result with `allocator`.
pub fn collapse(
self: *FullAndIncrementalManifest,
/// Should be the same allocator passed to `fromFiles`, or otherwise to allocate `Self`.
self: FullAndIncrementalManifest,
allocator: std.mem.Allocator,
) !Manifest {
// nothing to collapse
if (self.incremental == null)
return self.full;
self.was_collapsed = true;

// collapse bank fields into the
// incremental =pushed into=> full
var snapshot = self.incremental.?; // stack copy
const full_slot = self.full.bank_fields.slot;

// collapse accounts-db fields
const storages_map = &self.incremental.?.accounts_db_fields.file_map;

// TODO: use a better allocator
var slots_to_remove = std.ArrayList(Slot).init(allocator);
defer slots_to_remove.deinit();

// make sure theres no overlap in slots between full and incremental and combine
var storages_entry_iter = storages_map.iterator();
while (storages_entry_iter.next()) |incremental_entry| {
const slot = incremental_entry.key_ptr.*;

// only keep slots > full snapshot slot
if (!(slot > full_slot)) {
try slots_to_remove.append(slot);
continue;
}
) (std.mem.Allocator.Error || CollapseError)!Manifest {
const maybe_collapsed = try self.collapseIfNecessary(allocator);
return maybe_collapsed orelse try self.full.clone(allocator);
}

/// Returns null if there is no incremental snapshot manifest; otherwise
/// returns the result of overlaying the updates of the incremental
/// onto the full snapshot manifest.
/// The caller is responsible for `.deinit`ing the result with `allocator`
/// if it is non-null.
pub fn collapseIfNecessary(
self: FullAndIncrementalManifest,
allocator: std.mem.Allocator,
) (std.mem.Allocator.Error || CollapseError)!?Manifest {
const full = self.full;
const incremental = self.incremental orelse return null;

const slot_entry = try self.full.accounts_db_fields.file_map.getOrPut(allocator, slot);
if (slot_entry.found_existing) {
std.debug.panic("invalid incremental snapshot: slot {d} is in both full and incremental snapshots\n", .{slot});
} else {
slot_entry.value_ptr.* = incremental_entry.value_ptr.*;
}
}
// make a heap clone of the incremental manifest's more up-to-date
// data, except with the file map of the full manifest, which is
// likely to contain a larger amount of entries; can then overlay
// the relevant entries from the incremental manifest onto the
// clone of the full manifest.

for (slots_to_remove.items) |slot| {
_ = storages_map.swapRemove(slot);
}
var collapsed = incremental;
collapsed.accounts_db_fields.file_map = full.accounts_db_fields.file_map;

collapsed = try collapsed.clone(allocator);
errdefer collapsed.deinit(allocator);

const collapsed_file_map = &collapsed.accounts_db_fields.file_map;
try collapsed_file_map.ensureUnusedCapacity(
allocator,
incremental.accounts_db_fields.file_map.count(),
);

snapshot.accounts_db_fields = self.full.accounts_db_fields;
const inc_file_map = &incremental.accounts_db_fields.file_map;
for (inc_file_map.keys(), inc_file_map.values()) |slot, account_file_info| {
if (slot <= full.accounts_db_fields.slot) continue;
const gop = collapsed_file_map.getOrPutAssumeCapacity(slot);
if (gop.found_existing) return error.SnapshotSlotOverlap;
gop.value_ptr.* = account_file_info;
}

return snapshot;
return collapsed;
}

pub fn deinit(self: *FullAndIncrementalManifest, allocator: std.mem.Allocator) void {
pub fn deinit(self: FullAndIncrementalManifest, allocator: std.mem.Allocator) void {
self.full.deinit(allocator);
if (self.incremental) |*inc| {
if (!self.was_collapsed) {
inc.deinit(allocator);
} else {
inc.accounts_db_fields.file_map.deinit(allocator);
inc.bank_fields.deinit(allocator);
allocator.free(inc.accounts_db_fields.rooted_slots);
allocator.free(inc.accounts_db_fields.rooted_slot_hashes);
inc.bank_extra.deinit(allocator);
}
if (self.incremental) |inc| {
inc.deinit(allocator);
}
}
};

test "checkAllAllocationFailures FullAndIncrementalManifest" {
const local = struct {
fn parseFiles(
allocator: std.mem.Allocator,
snapdir: std.fs.Dir,
snapshot_files: SnapshotFiles,
) !void {
const combined_manifest = try FullAndIncrementalManifest.fromFiles(
allocator,
.noop,
snapdir,
snapshot_files,
);
defer combined_manifest.deinit(allocator);

const collapsed_manifest = try combined_manifest.collapse(allocator);
defer collapsed_manifest.deinit(allocator);
}
};

var tmp_dir_root = std.testing.tmpDir(.{});
defer tmp_dir_root.cleanup();
const snapdir = tmp_dir_root.dir;

const snapshot_files = try sig.accounts_db.db.findAndUnpackTestSnapshots(1, snapdir);

try std.testing.checkAllAllocationFailures(
std.testing.allocator,
local.parseFiles,
.{ snapdir, snapshot_files },
);
}

pub const generate = struct {
/// Writes the version, status cache, and manifest files.
/// Should call this first to begin generating the snapshot archive.
Expand Down Expand Up @@ -2874,8 +2899,8 @@ test "parse snapshot fields" {
const full_manifest_file = try snapdir.openFile(full_manifest_path, .{});
defer full_manifest_file.close();

const snapshot_fields_full = try Manifest.readFromFile(allocator, full_manifest_file);
defer snapshot_fields_full.deinit(allocator);
const full_manifest = try Manifest.readFromFile(allocator, full_manifest_file);
defer full_manifest.deinit(allocator);

if (snapshot_files.incremental_info) |inc| {
const inc_slot = inc.slot;
Expand All @@ -2885,7 +2910,7 @@ test "parse snapshot fields" {
const inc_manifest_file = try snapdir.openFile(inc_manifest_path, .{});
defer inc_manifest_file.close();

const snapshot_fields_inc = try Manifest.readFromFile(allocator, inc_manifest_file);
defer snapshot_fields_inc.deinit(allocator);
const inc_manifest = try Manifest.readFromFile(allocator, inc_manifest_file);
defer inc_manifest.deinit(allocator);
}
}
5 changes: 2 additions & 3 deletions src/bincode/hashmap.zig
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,10 @@ pub fn readCtx(
const key = try ctx_impl.readKey(allocator, reader, params);
errdefer ctx_impl.freeKey(allocator, key);

const gop = hash_map.getOrPutAssumeCapacity(key);
if (gop.found_existing) return error.DuplicateFileMapEntry;
if (hash_map.contains(key)) return error.DuplicateFileMapEntry;

const value = try ctx_impl.readValue(allocator, reader, params);
gop.value_ptr.* = value;
hash_map.putAssumeCapacityNoClobber(key, value);
}

return switch (hm_info.management) {
Expand Down
Loading

0 comments on commit e04e336

Please sign in to comment.