diff --git a/build.zig.zon b/build.zig.zon index 328303456..4e263b682 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -32,8 +32,8 @@ .hash = "1220f70ac854b59315a8512861e039648d677feb4f9677bd873d6b9b7074a5906485", }, .rocksdb = .{ - .url = "https://github.com/Syndica/rocksdb-zig/archive/6fa5e95875941236768e87a8e0c5ef45c75c9b56.tar.gz", - .hash = "122067ae3d6a64b0e1401dd40059535cb782d0c1a99c39a83f24c2385391e0b22876", + .url = "https://github.com/Syndica/rocksdb-zig/archive/9c09659a5e41f226b6b8f3fa21149247eb26dfae.tar.gz", + .hash = "1220aeb80b2f8bb48c131ef306fe48ddfcb537210c5f77742e921cbf40fc4c19b41e", }, }, } diff --git a/src/cmd/cmd.zig b/src/cmd/cmd.zig index 2ec120bc6..3a9babab1 100644 --- a/src/cmd/cmd.zig +++ b/src/cmd/cmd.zig @@ -691,7 +691,6 @@ fn validator() !void { ); var cleanup_service_handle = try std.Thread.spawn(.{}, sig.ledger.cleanup_service.run, .{ - allocator, app_base.logger, blockstore_reader, blockstore_writer, @@ -789,7 +788,6 @@ fn shredCollector() !void { ); var cleanup_service_handle = try std.Thread.spawn(.{}, sig.ledger.cleanup_service.run, .{ - allocator, app_base.logger, blockstore_reader, blockstore_writer, diff --git a/src/ledger/cleanup_service.zig b/src/ledger/cleanup_service.zig index a6c404385..acc4179f0 100644 --- a/src/ledger/cleanup_service.zig +++ b/src/ledger/cleanup_service.zig @@ -26,7 +26,6 @@ const DEFAULT_CLEANUP_SLOT_INTERVAL: u64 = 512; const LOOP_LIMITER = Duration.fromMillis(DEFAULT_CLEANUP_SLOT_INTERVAL * DEFAULT_MS_PER_SLOT / 10); pub fn run( - allocator: std.mem.Allocator, logger: sig.trace.Logger, blockstore_reader: *BlockstoreReader, blockstore_writer: *BlockstoreWriter, @@ -38,7 +37,6 @@ pub fn run( logger.info("Starting blockstore cleanup service"); while (!exit.load(.unordered)) { last_purge_slot = try cleanBlockstore( - allocator, logger, blockstore_reader, blockstore_writer, @@ -71,7 +69,6 @@ pub fn run( /// /// Analogous to the [`cleanup_ledger`](https://github.com/anza-xyz/agave/blob/6476d5fac0c30d1f49d13eae118b89be78fb15d2/ledger/src/blockstore_cleanup_service.rs#L198) in agave: pub fn cleanBlockstore( - allocator: std.mem.Allocator, logger: sig.trace.Logger, blockstore_reader: *BlockstoreReader, blockstore_writer: *BlockstoreWriter, @@ -87,12 +84,7 @@ pub fn cleanBlockstore( // NOTE: this will clean everything past the lowest slot in the blockstore const root: Slot = try blockstore_reader.lowestSlot(); - const result = try findSlotsToClean( - allocator, - blockstore_reader, - root, - max_ledger_shreds, - ); + const result = try findSlotsToClean(blockstore_reader, root, max_ledger_shreds); logger.infof("findSlotsToClean result: {any}", .{result}); if (result.should_clean) { @@ -127,7 +119,6 @@ pub fn cleanBlockstore( /// /// Analogous to the [`find_slots_to_clean`](https://github.com/anza-xyz/agave/blob/6476d5fac0c30d1f49d13eae118b89be78fb15d2/ledger/src/blockstore_cleanup_service.rs#L103) fn findSlotsToClean( - allocator: std.mem.Allocator, blockstore_reader: *BlockstoreReader, max_root: Slot, max_ledger_shreds: u64, @@ -136,17 +127,7 @@ fn findSlotsToClean( highest_slot_to_purge: Slot, total_shreds: u64, } { - const data_shred_cf_name = Schema.data_shred.name; - - const live_files = try blockstore_reader.db.db.liveFiles(allocator); - defer live_files.deinit(); - - var num_shreds: u64 = 0; - for (live_files.items) |live_file| { - if (std.mem.eql(u8, live_file.column_family_name, data_shred_cf_name)) { - num_shreds += live_file.num_entries; - } - } + const num_shreds = try blockstore_reader.db.count(Schema.data_shred); // Using the difference between the lowest and highest slot seen will // result in overestimating the number of slots in the blockstore since @@ -229,12 +210,13 @@ test "findSlotsToClean" { { var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put(ledger.schema.schema.slot_meta, lowest_slot_meta.slot, lowest_slot_meta); try write_batch.put(ledger.schema.schema.slot_meta, highest_slot_meta.slot, highest_slot_meta); try db.commit(write_batch); } - const r = try findSlotsToClean(allocator, &reader, 0, 100); + const r = try findSlotsToClean(&reader, 0, 100); try std.testing.expectEqual(false, r.should_clean); try std.testing.expectEqual(0, r.total_shreds); try std.testing.expectEqual(0, r.highest_slot_to_purge); diff --git a/src/ledger/database.zig b/src/ledger/database.zig index aa232f1d0..dc37383dc 100644 --- a/src/ledger/database.zig +++ b/src/ledger/database.zig @@ -20,8 +20,19 @@ pub fn assertIsDatabase(comptime Impl: type) void { /// Runs all tests in `tests` pub fn testDatabase(comptime Impl: fn ([]const ColumnFamily) type) !void { assertIsDatabase(Impl(&.{})); + var err: ?anyerror = null; inline for (@typeInfo(tests(Impl)).Struct.decls) |decl| { - try @call(.auto, @field(tests(Impl), decl.name), .{}); + @call(.auto, @field(tests(Impl), decl.name), .{}) catch |e| { + if (err == null) { + std.debug.print("The following tests failed:\n", .{}); + } + err = e; + std.debug.print("\n\u{001B}[35m{s}\u{001B}[0m\n\n", .{decl.name}); + if (@errorReturnTrace()) |trace| std.debug.dumpStackTrace(trace.*); + }; + } + if (err) |e| { + return e; } } @@ -46,6 +57,10 @@ pub fn Database(comptime Impl: type) type { self.impl.deinit(); } + pub fn count(self: *Self, comptime cf: ColumnFamily) anyerror!u64 { + return self.impl.count(cf); + } + pub fn put( self: *Self, comptime cf: ColumnFamily, @@ -55,8 +70,16 @@ pub fn Database(comptime Impl: type) type { return try self.impl.put(cf, key, value); } - pub fn get(self: *Self, comptime cf: ColumnFamily, key: cf.Key) anyerror!?cf.Value { - return try self.impl.get(cf, key); + // TODO: split into two methods: get and getAlloc, where "get" is used for + // types that do not need an allocator to deserialize them. + // this will need some changes to bincode. + pub fn get( + self: *Self, + allocator: Allocator, + comptime cf: ColumnFamily, + key: cf.Key, + ) anyerror!?cf.Value { + return try self.impl.get(allocator, cf, key); } /// Returns a reference to the serialized bytes. @@ -81,12 +104,17 @@ pub fn Database(comptime Impl: type) type { return try self.impl.delete(cf, key); } - pub fn deleteFilesRange(self: *Self, comptime cf: ColumnFamily, start: cf.Key, end: cf.Key) anyerror!void { - return try self.impl.deleteFilesRange(cf, start, end); + pub fn deleteFilesInRange( + self: *Self, + comptime cf: ColumnFamily, + start: cf.Key, + end: cf.Key, + ) anyerror!void { + return try self.impl.deleteFilesInRange(cf, start, end); } pub fn initWriteBatch(self: *Self) anyerror!WriteBatch { - return .{ .impl = self.impl.initBatch() }; + return .{ .impl = try self.impl.initWriteBatch() }; } pub fn commit(self: *Self, batch: WriteBatch) anyerror!void { @@ -106,6 +134,10 @@ pub fn Database(comptime Impl: type) type { pub const WriteBatch = struct { impl: Impl.WriteBatch, + pub fn deinit(self: *WriteBatch) void { + self.impl.deinit(); + } + pub fn put( self: *WriteBatch, comptime cf: ColumnFamily, @@ -244,33 +276,37 @@ pub const BytesRef = struct { fn tests(comptime Impl: fn ([]const ColumnFamily) type) type { @setEvalBranchQuota(10_000); const impl_id = sig.core.Hash.generateSha256Hash(@typeName(Impl(&.{}))).base58String(); - const test_dir = std.fmt.comptimePrint(sig.TEST_DATA_DIR ++ "blockstore/database/{s}", .{impl_id.buffer}); + const test_dir = sig.TEST_DATA_DIR ++ "blockstore/database/" ++ impl_id.buffer ++ "/"; + + const allocator = std.testing.allocator; + + const Value1 = struct { hello: u16 }; + const Value2 = struct { world: u16 }; + const cf1 = ColumnFamily{ + .name = "one", + .Key = u64, + .Value = Value1, + }; + const cf2 = ColumnFamily{ + .name = "two", + .Key = u64, + .Value = Value2, + }; + const DB = Database(Impl(&.{ cf1, cf2 })); + + const logger = sig.trace.TestLogger.default.logger(); return struct { pub fn basic() !void { - const Value = struct { hello: u16 }; - const cf1 = ColumnFamily{ - .name = "one", - .Key = u64, - .Value = Value, - }; - const cf2 = ColumnFamily{ - .name = "two", - .Key = u64, - .Value = Value, - }; - const path = std.fmt.comptimePrint("{s}/basic", .{test_dir}); + const path = test_dir ++ @src().fn_name; try blockstore.tests.freshDir(path); - const allocator = std.testing.allocator; - const logger = Logger.init(std.testing.allocator, Logger.TEST_DEFAULT_LEVEL); - defer logger.deinit(); - var db = try Database(Impl(&.{ cf1, cf2 })).open(allocator, logger, path); + var db = try DB.open(allocator, logger, path); defer db.deinit(); try db.put(cf1, 123, .{ .hello = 345 }); - const got = try db.get(cf1, 123); + const got = try db.get(allocator, cf1, 123); try std.testing.expect(345 == got.?.hello); - const not = try db.get(cf2, 123); + const not = try db.get(allocator, cf2, 123); try std.testing.expect(null == not); const wrong_was_deleted = try db.delete(cf2, 123); _ = wrong_was_deleted; @@ -278,8 +314,237 @@ fn tests(comptime Impl: fn ([]const ColumnFamily) type) type { const was_deleted = try db.delete(cf1, 123); _ = was_deleted; // try std.testing.expect(was_deleted); - const not_now = try db.get(cf1, 123); + const not_now = try db.get(allocator, cf1, 123); try std.testing.expect(null == not_now); } + + pub fn @"write batch"() !void { + const path = test_dir ++ @src().fn_name; + try blockstore.tests.freshDir(path); + var db = try DB.open(allocator, logger, path); + defer db.deinit(); + + try db.put(cf1, 0, .{ .hello = 99 }); + + var batch = try db.initWriteBatch(); + defer batch.deinit(); + + try batch.delete(cf1, 0); + try batch.put(cf1, 123, .{ .hello = 100 }); + try batch.put(cf2, 321, .{ .world = 444 }); + try batch.delete(cf2, 321); + try batch.put(cf2, 133, .{ .world = 555 }); + try batch.put(cf2, 133, .{ .world = 666 }); + + try std.testing.expectEqual(Value1{ .hello = 99 }, try db.get(allocator, cf1, 0)); + try std.testing.expectEqual(null, try db.get(allocator, cf1, 123)); + try std.testing.expectEqual(null, try db.get(allocator, cf2, 321)); + try std.testing.expectEqual(null, try db.get(allocator, cf2, 333)); + + try db.commit(batch); + + try std.testing.expectEqual(null, try db.get(allocator, cf1, 0)); + try std.testing.expectEqual(Value1{ .hello = 100 }, try db.get(allocator, cf1, 123)); + try std.testing.expectEqual(null, try db.get(allocator, cf2, 321)); + try std.testing.expectEqual(Value2{ .world = 666 }, try db.get(allocator, cf2, 133)); + } + + pub fn @"iterator forward"() !void { + const path = test_dir ++ @src().fn_name; + try blockstore.tests.freshDir(path); + var db = try DB.open(allocator, logger, path); + defer db.deinit(); + + try db.put(cf1, 4, .{ .hello = 44 }); + try db.put(cf1, 1, .{ .hello = 111 }); + try db.put(cf1, 3, .{ .hello = 33 }); + try db.put(cf1, 2, .{ .hello = 222 }); + + var iter = try db.iterator(cf1, .forward, null); + defer iter.deinit(); + + var next = (try iter.next()).?; + try std.testing.expectEqual(1, next[0]); + try std.testing.expectEqual(Value1{ .hello = 111 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(2, next[0]); + try std.testing.expectEqual(Value1{ .hello = 222 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(3, next[0]); + try std.testing.expectEqual(Value1{ .hello = 33 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(4, next[0]); + try std.testing.expectEqual(Value1{ .hello = 44 }, next[1]); + + try std.testing.expectEqual(null, try iter.next()); + } + + pub fn @"iterator forward start exact"() !void { + const path = test_dir ++ @src().fn_name; + try blockstore.tests.freshDir(path); + var db = try DB.open(allocator, logger, path); + defer db.deinit(); + + try db.put(cf1, 40, .{ .hello = 44 }); + try db.put(cf1, 10, .{ .hello = 111 }); + try db.put(cf1, 30, .{ .hello = 33 }); + try db.put(cf1, 20, .{ .hello = 222 }); + + var iter = try db.iterator(cf1, .forward, 20); + defer iter.deinit(); + + var next = (try iter.next()).?; + try std.testing.expectEqual(20, next[0]); + try std.testing.expectEqual(Value1{ .hello = 222 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(30, next[0]); + try std.testing.expectEqual(Value1{ .hello = 33 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(40, next[0]); + try std.testing.expectEqual(Value1{ .hello = 44 }, next[1]); + + try std.testing.expectEqual(null, try iter.next()); + } + + pub fn @"iterator forward start between"() !void { + const path = test_dir ++ @src().fn_name; + try blockstore.tests.freshDir(path); + var db = try DB.open(allocator, logger, path); + defer db.deinit(); + + try db.put(cf1, 40, .{ .hello = 44 }); + try db.put(cf1, 10, .{ .hello = 111 }); + try db.put(cf1, 30, .{ .hello = 33 }); + try db.put(cf1, 20, .{ .hello = 222 }); + + var iter = try db.iterator(cf1, .forward, 11); + defer iter.deinit(); + + var next = (try iter.next()).?; + try std.testing.expectEqual(20, next[0]); + try std.testing.expectEqual(Value1{ .hello = 222 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(30, next[0]); + try std.testing.expectEqual(Value1{ .hello = 33 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(40, next[0]); + try std.testing.expectEqual(Value1{ .hello = 44 }, next[1]); + + try std.testing.expectEqual(null, try iter.next()); + } + + pub fn @"iterator reverse"() !void { + const path = test_dir ++ @src().fn_name; + try blockstore.tests.freshDir(path); + var db = try DB.open(allocator, logger, path); + defer db.deinit(); + + try db.put(cf1, 4, .{ .hello = 44 }); + try db.put(cf1, 1, .{ .hello = 111 }); + try db.put(cf1, 3, .{ .hello = 33 }); + try db.put(cf1, 2, .{ .hello = 222 }); + + var iter = try db.iterator(cf1, .reverse, null); + defer iter.deinit(); + + var next = (try iter.next()).?; + try std.testing.expectEqual(4, next[0]); + try std.testing.expectEqual(Value1{ .hello = 44 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(3, next[0]); + try std.testing.expectEqual(Value1{ .hello = 33 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(2, next[0]); + try std.testing.expectEqual(Value1{ .hello = 222 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(1, next[0]); + try std.testing.expectEqual(Value1{ .hello = 111 }, next[1]); + + try std.testing.expectEqual(null, try iter.next()); + } + + pub fn @"iterator reverse start at end"() !void { + const path = test_dir ++ @src().fn_name; + try blockstore.tests.freshDir(path); + var db = try DB.open(allocator, logger, path); + defer db.deinit(); + + try db.put(cf1, 4, .{ .hello = 44 }); + try db.put(cf1, 1, .{ .hello = 111 }); + try db.put(cf1, 3, .{ .hello = 33 }); + try db.put(cf1, 2, .{ .hello = 222 }); + + var iter = try db.iterator(cf1, .reverse, 4); + defer iter.deinit(); + + var next = (try iter.next()).?; + try std.testing.expectEqual(4, next[0]); + try std.testing.expectEqual(Value1{ .hello = 44 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(3, next[0]); + try std.testing.expectEqual(Value1{ .hello = 33 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(2, next[0]); + try std.testing.expectEqual(Value1{ .hello = 222 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(1, next[0]); + try std.testing.expectEqual(Value1{ .hello = 111 }, next[1]); + + try std.testing.expectEqual(null, try iter.next()); + } + + pub fn @"iterator reverse start exact"() !void { + const path = test_dir ++ @src().fn_name; + try blockstore.tests.freshDir(path); + var db = try DB.open(allocator, logger, path); + defer db.deinit(); + + try db.put(cf1, 40, .{ .hello = 44 }); + try db.put(cf1, 10, .{ .hello = 111 }); + try db.put(cf1, 30, .{ .hello = 33 }); + try db.put(cf1, 20, .{ .hello = 222 }); + + var iter = try db.iterator(cf1, .reverse, 30); + defer iter.deinit(); + + var next = (try iter.next()).?; + try std.testing.expectEqual(30, next[0]); + try std.testing.expectEqual(Value1{ .hello = 33 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(20, next[0]); + try std.testing.expectEqual(Value1{ .hello = 222 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(10, next[0]); + try std.testing.expectEqual(Value1{ .hello = 111 }, next[1]); + + try std.testing.expectEqual(null, try iter.next()); + } + + pub fn @"iterator reverse start between"() !void { + const path = test_dir ++ @src().fn_name; + try blockstore.tests.freshDir(path); + var db = try DB.open(allocator, logger, path); + defer db.deinit(); + + try db.put(cf1, 40, .{ .hello = 44 }); + try db.put(cf1, 10, .{ .hello = 111 }); + try db.put(cf1, 30, .{ .hello = 33 }); + try db.put(cf1, 20, .{ .hello = 222 }); + + var iter = try db.iterator(cf1, .reverse, 39); + defer iter.deinit(); + + var next = (try iter.next()).?; + try std.testing.expectEqual(30, next[0]); + try std.testing.expectEqual(Value1{ .hello = 33 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(20, next[0]); + try std.testing.expectEqual(Value1{ .hello = 222 }, next[1]); + next = (try iter.next()).?; + try std.testing.expectEqual(10, next[0]); + try std.testing.expectEqual(Value1{ .hello = 111 }, next[1]); + + try std.testing.expectEqual(null, try iter.next()); + } }; } diff --git a/src/ledger/hashmap_db.zig b/src/ledger/hashmap_db.zig index 3361d6de5..954428f9c 100644 --- a/src/ledger/hashmap_db.zig +++ b/src/ledger/hashmap_db.zig @@ -46,6 +46,13 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { self.allocator.free(self.maps); } + pub fn count(self: *Self, comptime cf: ColumnFamily) anyerror!u64 { + self.transaction_lock.lockShared(); + defer self.transaction_lock.unlockShared(); + + return self.maps[cf.find(column_families)].count(); + } + pub fn put( self: *Self, comptime cf: ColumnFamily, @@ -66,6 +73,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { pub fn get( self: *Self, + allocator: Allocator, comptime cf: ColumnFamily, key: cf.Key, ) anyerror!?cf.Value { @@ -80,7 +88,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { const val_bytes = map.getPreLocked(key_bytes) orelse return null; - return try value_serializer.deserialize(cf.Value, self.allocator, val_bytes); + return try value_serializer.deserialize(cf.Value, allocator, val_bytes); } pub fn getBytes( @@ -119,7 +127,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { _ = self.maps[cf.find(column_families)].delete(self.allocator, key_bytes); } - pub fn deleteFilesRange( + pub fn deleteFilesInRange( self: *Self, comptime cf: ColumnFamily, start: cf.Key, @@ -130,10 +138,13 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { _ = end; } - pub fn initWriteBatch(self: *Self) error{}!WriteBatch { + pub fn initWriteBatch(self: *Self) Allocator.Error!WriteBatch { + const executed = try self.allocator.create(bool); + executed.* = false; return .{ .allocator = self.allocator, .instructions = .{}, + .executed = executed, }; } @@ -151,14 +162,15 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { }, .delete => |delete_ix| { const cf_index, const key = delete_ix; - self.maps[cf_index].delete(batch.allocator, key); + self.maps[cf_index].delete(self.allocator, key); }, .delete_range => { - // TODO + // TODO: also add to database tests @panic("not implemented"); }, } } + batch.executed.* = true; } /// A write batch is a sequence of operations that execute atomically. @@ -174,6 +186,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { pub const WriteBatch = struct { allocator: Allocator, instructions: std.ArrayListUnmanaged(Instruction), + executed: *bool, const Instruction = union(enum) { put: struct { usize, []const u8, []const u8 }, @@ -181,8 +194,22 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { delete_range: struct { usize, []const u8, []const u8 }, }; - fn deinit(self: WriteBatch) void { + pub fn deinit(self: *WriteBatch) void { + for (self.instructions.items) |ix| switch (ix) { + .put => |data| if (!self.executed.*) { + self.allocator.free(data[1]); + self.allocator.free(data[2]); + }, + .delete => |data| { + self.allocator.free(data[1]); + }, + .delete_range => |data| { + self.allocator.free(data[1]); + self.allocator.free(data[2]); + }, + }; self.instructions.deinit(self.allocator); + self.allocator.destroy(self.executed); } pub fn put( @@ -243,6 +270,8 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { shared_map.lock.lockShared(); defer shared_map.lock.unlockShared(); + self.transaction_lock.lockShared(); + defer self.transaction_lock.unlockShared(); const keys, const vals = if (start) |start_| b: { const search_bytes = try key_serializer.serializeAlloc(self.allocator, start_); @@ -255,18 +284,28 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { std.debug.assert(keys.len == vals.len); // TODO perf: reduce copying, e.g. copy-on-write or reference counting - const copied_keys = self.allocator.alloc([]const u8, keys.len); - const copied_vals = self.allocator.alloc([]const u8, vals.len); + const copied_keys = try self.allocator.alloc([]const u8, keys.len); + errdefer self.allocator.free(copied_keys); + const copied_vals = try self.allocator.alloc([]const u8, vals.len); + errdefer self.allocator.free(copied_vals); for (0..keys.len) |i| { - copied_keys[i] = self.allocator.dupe(u8, keys[i]); - copied_vals[i] = self.allocator.dupe(u8, vals[i]); + errdefer for (0..i) |n| { + self.allocator.free(copied_keys[n]); + self.allocator.free(copied_vals[n]); + }; + copied_keys[i] = try self.allocator.dupe(u8, keys[i]); + errdefer self.allocator.free(copied_keys[i]); + copied_vals[i] = try self.allocator.dupe(u8, vals[i]); } return .{ .allocator = self.allocator, .keys = copied_keys, .vals = copied_vals, - .cursor = 0, + .cursor = switch (direction) { + .forward => 0, + .reverse => keys.len, + }, .size = keys.len, }; } @@ -291,8 +330,8 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { pub fn next(self: *@This()) anyerror!?cf.Entry() { const index = self.nextIndex() orelse return null; return .{ - key_serializer.deserialize(cf.Key, self.allocator, self.keys[index]), - value_serializer.deserialize(cf.Value, self.allocator, self.vals[index]), + try key_serializer.deserialize(cf.Key, self.allocator, self.keys[index]), + try value_serializer.deserialize(cf.Value, self.allocator, self.vals[index]), }; } @@ -354,10 +393,21 @@ const SharedHashMap = struct { self.map.deinit(); } + pub fn count(self: *Self) usize { + self.lock.lockShared(); + defer self.lock.unlockShared(); + return self.map.count(); + } + pub fn put(self: *Self, key: []const u8, value: []const u8) Allocator.Error!void { self.lock.lock(); defer self.lock.unlock(); - try self.map.put(key, value); + const entry = try self.map.getOrPut(key); + if (entry.found_existing) { + self.allocator.free(key); + self.allocator.free(entry.value_ptr.*); + } + entry.value_ptr.* = value; } /// Only call this while holding the lock diff --git a/src/ledger/insert_shred.zig b/src/ledger/insert_shred.zig index 2b06fd93e..bd6c71895 100644 --- a/src/ledger/insert_shred.zig +++ b/src/ledger/insert_shred.zig @@ -152,6 +152,7 @@ pub const ShredInserter = struct { std.debug.assert(shreds.len == is_repaired.len); var total_timer = try Timer.start(); var write_batch = try self.db.initWriteBatch(); + defer write_batch.deinit(); var just_inserted_shreds = AutoHashMap(ShredId, Shred).init(allocator); // TODO capacity = shreds.len var erasure_metas = SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)).init(allocator); @@ -448,7 +449,7 @@ pub const ShredInserter = struct { const erasure_set_id = shred.fields.common.erasureSetId(); // TODO: redundant get or put pattern if (!merkle_root_metas.contains(erasure_set_id)) { - if (try self.db.get(schema.merkle_root_meta, erasure_set_id)) |meta_| { + if (try self.db.get(self.allocator, schema.merkle_root_meta, erasure_set_id)) |meta_| { try merkle_root_metas.put(erasure_set_id, .{ .clean = meta_ }); } } @@ -486,7 +487,7 @@ pub const ShredInserter = struct { // TODO: redundant get or put pattern const erasure_meta_entry = try erasure_metas.getOrPut(erasure_set_id); if (!erasure_meta_entry.found_existing) { - if (try self.db.get(schema.erasure_meta, erasure_set_id)) |meta_| { + if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { erasure_meta_entry.value_ptr.* = .{ .clean = meta_ }; } else { erasure_meta_entry.value_ptr.* = .{ @@ -650,7 +651,7 @@ pub const ShredInserter = struct { const erasure_set_id = shred.fields.common.erasureSetId(); // TODO: redundant get or put pattern if (!merkle_root_metas.contains(erasure_set_id)) { - if (try self.db.get(schema.merkle_root_meta, erasure_set_id)) |meta_| { + if (try self.db.get(self.allocator, schema.merkle_root_meta, erasure_set_id)) |meta_| { try merkle_root_metas.put(erasure_set_id, .{ .clean = meta_ }); } } @@ -725,7 +726,7 @@ pub const ShredInserter = struct { // TODO: redundant get or put pattern if (!erasure_metas.contains(erasure_set_id)) { - if (try self.db.get(schema.erasure_meta, erasure_set_id)) |meta_| { + if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { try erasure_metas.put(erasure_set_id, .{ .clean = meta_ }); } } @@ -745,7 +746,7 @@ pub const ShredInserter = struct { var timer = try Timer.start(); const entry = try working_set.getOrPut(slot); if (!entry.found_existing) { - if (try self.db.get(schema.index, slot)) |item| { + if (try self.db.get(self.allocator, schema.index, slot)) |item| { entry.value_ptr.* = .{ .index = item }; } else { entry.value_ptr.* = IndexMetaWorkingSetEntry.init(allocator, slot); @@ -765,7 +766,7 @@ pub const ShredInserter = struct { // TODO: redundant get or put pattern const entry = try working_set.getOrPut(slot); if (!entry.found_existing) { - if (try self.db.get(schema.slot_meta, slot)) |backup| { + if (try self.db.get(self.allocator, schema.slot_meta, slot)) |backup| { var slot_meta: SlotMeta = try backup.clone(self.allocator); // If parent_slot == None, then this is one of the orphans inserted // during the chaining process, see the function find_slot_meta_in_cached_state() @@ -1262,7 +1263,7 @@ pub const ShredInserter = struct { if (entry.found_existing) { return entry.value_ptr; } - entry.value_ptr.* = if (try self.db.get(schema.slot_meta, slot)) |m| + entry.value_ptr.* = if (try self.db.get(self.allocator, schema.slot_meta, slot)) |m| m else SlotMeta.init(self.allocator, slot, null); @@ -1344,7 +1345,7 @@ pub const ShredInserter = struct { const next_erasure_set = ErasureSetId{ .slot = slot, .fec_set_index = next_fec_set_index }; const next_merkle_root_meta = if (merkle_root_metas.get(next_erasure_set)) |nes| nes.asRef().* - else if (try self.db.get(schema.merkle_root_meta, next_erasure_set)) |nes| + else if (try self.db.get(self.allocator, schema.merkle_root_meta, next_erasure_set)) |nes| nes else // No shred from the next fec set has been received @@ -2024,7 +2025,7 @@ test "chaining basic" { // insert slot 1 _ = try state.insertShredBytes(slots[1]); { - var slot_meta: SlotMeta = (try state.db.get(schema.slot_meta, 1)).?; + var slot_meta: SlotMeta = (try state.db.get(state.allocator(), schema.slot_meta, 1)).?; defer slot_meta.deinit(); try std.testing.expectEqualSlices(u64, &.{}, slot_meta.next_slots.items); try std.testing.expect(!slot_meta.isConnected()); @@ -2035,7 +2036,7 @@ test "chaining basic" { // insert slot 2 _ = try state.insertShredBytes(slots[2]); { - var slot_meta: SlotMeta = (try state.db.get(schema.slot_meta, 1)).?; + var slot_meta: SlotMeta = (try state.db.get(state.allocator(), schema.slot_meta, 1)).?; defer slot_meta.deinit(); try std.testing.expectEqualSlices(u64, &.{2}, slot_meta.next_slots.items); try std.testing.expect(!slot_meta.isConnected()); // since 0 is not yet inserted @@ -2043,7 +2044,7 @@ test "chaining basic" { try std.testing.expectEqual(shreds_per_slot - 1, slot_meta.last_index); } { - var slot_meta: SlotMeta = (try state.db.get(schema.slot_meta, 2)).?; + var slot_meta: SlotMeta = (try state.db.get(state.allocator(), schema.slot_meta, 2)).?; defer slot_meta.deinit(); try std.testing.expectEqualSlices(u64, &.{}, slot_meta.next_slots.items); try std.testing.expect(!slot_meta.isConnected()); // since 0 is not yet inserted @@ -2054,7 +2055,7 @@ test "chaining basic" { // insert slot 0 _ = try state.insertShredBytes(slots[0]); { - var slot_meta: SlotMeta = (try state.db.get(schema.slot_meta, 0)).?; + var slot_meta: SlotMeta = (try state.db.get(state.allocator(), schema.slot_meta, 0)).?; defer slot_meta.deinit(); try std.testing.expectEqualSlices(u64, &.{1}, slot_meta.next_slots.items); try std.testing.expect(slot_meta.isConnected()); @@ -2062,7 +2063,7 @@ test "chaining basic" { try std.testing.expectEqual(shreds_per_slot - 1, slot_meta.last_index); } { - var slot_meta: SlotMeta = (try state.db.get(schema.slot_meta, 1)).?; + var slot_meta: SlotMeta = (try state.db.get(state.allocator(), schema.slot_meta, 1)).?; defer slot_meta.deinit(); try std.testing.expectEqualSlices(u64, &.{2}, slot_meta.next_slots.items); try std.testing.expect(slot_meta.isConnected()); @@ -2070,7 +2071,7 @@ test "chaining basic" { try std.testing.expectEqual(shreds_per_slot - 1, slot_meta.last_index); } { - var slot_meta: SlotMeta = (try state.db.get(schema.slot_meta, 2)).?; + var slot_meta: SlotMeta = (try state.db.get(state.allocator(), schema.slot_meta, 2)).?; defer slot_meta.deinit(); try std.testing.expectEqualSlices(u64, &.{}, slot_meta.next_slots.items); try std.testing.expect(slot_meta.isConnected()); @@ -2096,6 +2097,7 @@ test "merkle root metas coding" { { // first shred (should succeed) var write_batch = try state.db.initWriteBatch(); + defer write_batch.deinit(); const this_shred = shreds[0]; var merkle_root_metas = AutoHashMap(ErasureSetId, WorkingEntry(MerkleRootMeta)).init(allocator); @@ -2135,6 +2137,7 @@ test "merkle root metas coding" { { // second shred (same index as first, should conflict with merkle root) var write_batch = try state.db.initWriteBatch(); + defer write_batch.deinit(); const this_shred = shreds[1]; const succeeded, // @@ -2154,6 +2157,7 @@ test "merkle root metas coding" { const original_erasure_set_id = shreds[0].commonHeader().erasureSetId(); const original_meta_from_map = merkle_root_metas.get(original_erasure_set_id).?.asRef(); const original_meta_from_db = (try state.db.get( + state.allocator(), schema.merkle_root_meta, original_erasure_set_id, )).?; @@ -2171,6 +2175,7 @@ test "merkle root metas coding" { { // third shred (different index, should succeed) var write_batch = try state.db.initWriteBatch(); + defer write_batch.deinit(); const this_shred = shreds[2]; const this_index = start_index + 31; @@ -2187,6 +2192,7 @@ test "merkle root metas coding" { const original_erasure_set_id = shreds[0].commonHeader().erasureSetId(); const original_meta_from_map = merkle_root_metas.get(original_erasure_set_id).?.asRef(); const original_meta_from_db = (try state.db.get( + state.allocator(), schema.merkle_root_meta, original_erasure_set_id, )).?; diff --git a/src/ledger/meta.zig b/src/ledger/meta.zig index 17f035d64..b45834576 100644 --- a/src/ledger/meta.zig +++ b/src/ledger/meta.zig @@ -60,7 +60,7 @@ pub const SlotMeta = struct { }; } - pub fn deinit(self: *Self) void { + pub fn deinit(self: Self) void { self.next_slots.deinit(); self.completed_data_indexes.deinit(); } diff --git a/src/ledger/reader.zig b/src/ledger/reader.zig index 41f10c00d..ddf3fd768 100644 --- a/src/ledger/reader.zig +++ b/src/ledger/reader.zig @@ -83,7 +83,7 @@ pub const BlockstoreReader = struct { /// /// Analogous to [is_full](https://github.com/anza-xyz/agave/blob/15dbe7fb0fc07e11aaad89de1576016412c7eb9e/ledger/src/blockstore.rs#L500) pub fn isFull(self: *Self, slot: Slot) !bool { - return if (try self.db.get(schema.slot_meta, slot)) |meta| + return if (try self.db.get(self.allocator, schema.slot_meta, slot)) |meta| meta.isFull() else false; @@ -114,7 +114,11 @@ pub const BlockstoreReader = struct { return true; } - var start_slot_meta = try self.db.get(schema.slot_meta, starting_slot) orelse return false; + var start_slot_meta = try self.db.get( + self.allocator, + schema.slot_meta, + starting_slot, + ) orelse return false; defer start_slot_meta.deinit(); // need a reference so the start_slot_meta.deinit works correctly var next_slots: *ArrayList(Slot) = &start_slot_meta.next_slots; @@ -127,7 +131,7 @@ pub const BlockstoreReader = struct { var last_slot = starting_slot; while (i < next_slots.items.len) : (i += 1) { const slot = next_slots.items[i]; - if (try self.db.get(schema.slot_meta, slot)) |_slot_meta| { + if (try self.db.get(self.allocator, schema.slot_meta, slot)) |_slot_meta| { var slot_meta = _slot_meta; defer slot_meta.deinit(); @@ -310,7 +314,8 @@ pub const BlockstoreReader = struct { defer lock.unlock(); if (try self.isRoot(slot)) { - return try self.db.get(schema.blocktime, slot) orelse error.SlotUnavailable; + return try self.db.get(self.allocator, schema.blocktime, slot) orelse + error.SlotUnavailable; } return error.SlotNotRooted; } @@ -320,7 +325,7 @@ pub const BlockstoreReader = struct { self.rpc_api_metrics.num_get_block_height.inc(); var lock = try self.checkLowestCleanupSlot(slot); defer lock.unlock(); - return try self.db.get(schema.block_height, slot); + return try self.db.get(self.allocator, schema.block_height, slot); } /// Acquires the `lowest_cleanup_slot` lock and returns a tuple of the held lock @@ -445,7 +450,7 @@ pub const BlockstoreReader = struct { populate_entries: bool, allow_dead_slots: bool, ) !VersionedConfirmedBlockWithEntries { - var slot_meta: SlotMeta = try self.db.get(schema.slot_meta, slot) orelse { + var slot_meta: SlotMeta = try self.db.get(self.allocator, schema.slot_meta, slot) orelse { self.logger.debugf("getCompleteBlockWithEntries failed for slot {} (missing SlotMeta)", .{slot}); return error.SlotUnavailable; }; @@ -515,7 +520,11 @@ pub const BlockstoreReader = struct { const signature = transaction.signatures[0]; txns_with_statuses.appendAssumeCapacity(.{ .transaction = transaction, - .meta = try self.db.get(schema.transaction_status, .{ signature, slot }) orelse + .meta = try self.db.get( + self.allocator, + schema.transaction_status, + .{ signature, slot }, + ) orelse return error.MissingTransactionMetadata, }); num_moved_slot_transactions += 1; @@ -541,16 +550,14 @@ pub const BlockstoreReader = struct { else Hash.default(); - const rewards = try self.db.get(schema.rewards, slot) orelse schema.rewards.Value{ - .rewards = &.{}, - .num_partitions = null, - }; + const rewards = try self.db.get(self.allocator, schema.rewards, slot) orelse + schema.rewards.Value{ .rewards = &.{}, .num_partitions = null }; // The Blocktime and BlockHeight column families are updated asynchronously; they // may not be written by the time the complete slot entries are available. In this // case, these fields will be null. - const block_time = try self.db.get(schema.blocktime, slot); - const block_height = try self.db.get(schema.block_height, slot); + const block_time = try self.db.get(self.allocator, schema.blocktime, slot); + const block_height = try self.db.get(self.allocator, schema.block_height, slot); return VersionedConfirmedBlockWithEntries{ .block = VersionedConfirmedBlock{ @@ -625,7 +632,8 @@ pub const BlockstoreReader = struct { continue; } // TODO get from iterator - const status = try self.db.get(schema.transaction_status, key) orelse return error.Unwrap; + const status = try self.db.get(self.allocator, schema.transaction_status, key) orelse + return error.Unwrap; return .{ .{ slot, status }, counter }; } @@ -691,7 +699,7 @@ pub const BlockstoreReader = struct { fn getBlockTime(self: *Self, slot: Slot) !?UnixTimestamp { var lock = try self.checkLowestCleanupSlot(slot); defer lock.unlock(); - return self.db.get(schema.blocktime, slot); + return self.db.get(self.allocator, schema.blocktime, slot); } /// Analogous to [find_transaction_in_slot](https://github.com/anza-xyz/agave/blob/15dbe7fb0fc07e11aaad89de1576016412c7eb9e/ledger/src/blockstore.rs#L3115) @@ -1026,7 +1034,7 @@ pub const BlockstoreReader = struct { slot: Slot, start_index: u64, ) !struct { CompletedRanges, ?SlotMeta } { - const maybe_slot_meta = try self.db.get(schema.slot_meta, slot); + const maybe_slot_meta = try self.db.get(self.allocator, schema.slot_meta, slot); if (maybe_slot_meta == null) { return .{ CompletedRanges.init(self.allocator), null }; } @@ -1195,7 +1203,7 @@ pub const BlockstoreReader = struct { map.deinit(); } for (slots) |slot| { - if (try self.db.get(schema.slot_meta, slot)) |meta| { + if (try self.db.get(self.allocator, schema.slot_meta, slot)) |meta| { errdefer meta.next_slots.deinit(); var cdi = meta.completed_data_indexes; cdi.deinit(); @@ -1210,7 +1218,7 @@ pub const BlockstoreReader = struct { /// agave handles DB errors with placeholder values, which seems like a mistake. /// this implementation instead returns errors. pub fn isRoot(self: *Self, slot: Slot) !bool { - return try self.db.get(schema.roots, slot) orelse false; + return try self.db.get(self.allocator, schema.roots, slot) orelse false; } /// Returns true if a slot is between the rooted slot bounds of the ledger, but has not itself @@ -1225,7 +1233,7 @@ pub const BlockstoreReader = struct { var iterator = try self.db.iterator(schema.roots, .forward, 0); defer iterator.deinit(); const lowest_root = try iterator.nextKey() orelse 0; - return if (try self.db.get(schema.roots, slot)) |_| + return if (try self.db.get(self.allocator, schema.roots, slot)) |_| false else slot < self.max_root.load(.monotonic) and slot > lowest_root; @@ -1233,7 +1241,7 @@ pub const BlockstoreReader = struct { /// Analogous to [get_bank_hash](https://github.com/anza-xyz/agave/blob/15dbe7fb0fc07e11aaad89de1576016412c7eb9e/ledger/src/blockstore.rs#L3873) pub fn getBankHash(self: *Self, slot: Slot) !?Hash { - return if (try self.db.get(schema.bank_hash, slot)) |versioned| + return if (try self.db.get(self.allocator, schema.bank_hash, slot)) |versioned| versioned.frozenHash() else null; @@ -1241,7 +1249,7 @@ pub const BlockstoreReader = struct { /// Analogous to [is_duplicate_confirmed](https://github.com/anza-xyz/agave/blob/15dbe7fb0fc07e11aaad89de1576016412c7eb9e/ledger/src/blockstore.rs#L3880) pub fn isDuplicateConfirmed(self: *Self, slot: Slot) !bool { - return if (try self.db.get(schema.bank_hash, slot)) |versioned| + return if (try self.db.get(self.allocator, schema.bank_hash, slot)) |versioned| versioned.isDuplicateConfirmed() else false; @@ -1251,7 +1259,8 @@ pub const BlockstoreReader = struct { /// /// Analogous to [get_optimistic_slot](https://github.com/anza-xyz/agave/blob/15dbe7fb0fc07e11aaad89de1576016412c7eb9e/ledger/src/blockstore.rs#L3899) pub fn getOptimisticSlot(self: *Self, slot: Slot) !?struct { Hash, UnixTimestamp } { - const meta = try self.db.get(schema.optimistic_slots, slot) orelse return null; + const meta = try self.db.get(self.allocator, schema.optimistic_slots, slot) orelse + return null; return .{ meta.V0.hash, meta.V0.timestamp }; } @@ -1282,7 +1291,7 @@ pub const BlockstoreReader = struct { /// Analogous to [is_dead](https://github.com/anza-xyz/agave/blob/15dbe7fb0fc07e11aaad89de1576016412c7eb9e/ledger/src/blockstore.rs#L3962) pub fn isDead(self: *Self, slot: Slot) !bool { - return try self.db.get(schema.dead_slots, slot) orelse false; + return try self.db.get(self.allocator, schema.dead_slots, slot) orelse false; } /// Analogous to [get_first_duplicate_proof](https://github.com/anza-xyz/agave/blob/15dbe7fb0fc07e11aaad89de1576016412c7eb9e/ledger/src/blockstore.rs#L3983) @@ -1506,7 +1515,8 @@ pub const AncestorIterator = struct { if (self.next_slot) |slot| { if (slot == 0) { self.next_slot = null; - } else if (try self.db.get(schema.slot_meta, slot)) |slot_meta| { + } else if (try self.db.get(self.db.allocator, schema.slot_meta, slot)) |slot_meta| { + defer slot_meta.deinit(); self.next_slot = slot_meta.parent_slot; } else { self.next_slot = null; @@ -1551,6 +1561,7 @@ test "getLatestOptimisticSlots" { { var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); const hash = Hash{ .data = .{1} ** 32 }; try write_batch.put(schema.optimistic_slots, 1, .{ .V0 = .{ @@ -1575,6 +1586,7 @@ test "getLatestOptimisticSlots" { { var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); const hash = Hash{ .data = .{10} ** 32 }; try write_batch.put(schema.optimistic_slots, 10, .{ .V0 = .{ @@ -1626,6 +1638,7 @@ test "getFirstDuplicateProof" { .shred2 = test_shreds.mainnet_shreds[1], }; var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put(schema.duplicate_slots, 19, proof); try db.commit(write_batch); @@ -1659,6 +1672,7 @@ test "isDead" { { var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put(schema.dead_slots, 19, true); try db.commit(write_batch); } @@ -1666,6 +1680,7 @@ test "isDead" { { var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put(schema.dead_slots, 19, false); try db.commit(write_batch); } @@ -1692,6 +1707,7 @@ test "getBlockHeight" { ); var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put(schema.block_height, 19, 19); try db.commit(write_batch); @@ -1720,6 +1736,7 @@ test "getRootedBlockTime" { ); var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put(schema.blocktime, 19, 19); try db.commit(write_batch); @@ -1729,6 +1746,7 @@ test "getRootedBlockTime" { // root it var write_batch2 = try db.initWriteBatch(); + defer write_batch2.deinit(); try write_batch2.put(schema.roots, 19, true); try db.commit(write_batch2); @@ -1765,6 +1783,7 @@ test "slotMetaIterator" { } var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); // 1 -> 2 -> 3 const roots: [3]Slot = .{ 1, 2, 3 }; var parent_slot: ?Slot = null; @@ -1818,6 +1837,7 @@ test "rootedSlotIterator" { ); var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); const roots: [3]Slot = .{ 2, 3, 4 }; for (roots) |slot| { try write_batch.put(schema.roots, slot, true); @@ -1853,6 +1873,7 @@ test "slotRangeConnected" { ); var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); const roots: [3]Slot = .{ 1, 2, 3 }; // 1 -> 2 -> 3 @@ -1917,6 +1938,7 @@ test "highestSlot" { slot_meta.received = 1; var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put( schema.slot_meta, shred_slot, @@ -1935,6 +1957,7 @@ test "highestSlot" { slot_meta2.received = 1; var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put( schema.slot_meta, slot_meta2.slot, @@ -1981,6 +2004,7 @@ test "lowestSlot" { slot_meta.received = 1; var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put( schema.slot_meta, shred_slot, @@ -2029,6 +2053,7 @@ test "isShredDuplicate" { // insert a shred var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put( schema.data_shred, .{ shred_slot, shred_index }, @@ -2084,6 +2109,7 @@ test "findMissingDataIndexes" { slot_meta.last_index = 4; var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put( schema.data_shred, .{ shred_slot, shred_index }, @@ -2150,6 +2176,7 @@ test "getCodeShred" { const shred_index = shred.commonHeader().index; var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put( schema.code_shred, .{ shred_slot, shred_index }, @@ -2218,6 +2245,7 @@ test "getDataShred" { defer shred.deinit(); var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put( schema.data_shred, .{ shred_slot, shred_index }, diff --git a/src/ledger/rocksdb.zig b/src/ledger/rocksdb.zig index 0e4393159..b3763e157 100644 --- a/src/ledger/rocksdb.zig +++ b/src/ledger/rocksdb.zig @@ -42,7 +42,7 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type { const cfs: []const rocks.ColumnFamily // = try callRocks( logger, - rocks.DB.openCf, + rocks.DB.open, .{ allocator, path, @@ -76,6 +76,20 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type { self.allocator.free(self.path); } + pub fn count(self: *Self, comptime cf: ColumnFamily) Allocator.Error!u64 { + const live_files = try self.db.liveFiles(self.allocator); + defer live_files.deinit(); + + var sum: u64 = 0; + for (live_files.items) |live_file| { + if (std.mem.eql(u8, live_file.column_family_name, cf.name)) { + sum += live_file.num_entries; + } + } + + return sum; + } + pub fn put( self: *Self, comptime cf: ColumnFamily, @@ -98,10 +112,15 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type { ); } - pub fn get(self: *Self, comptime cf: ColumnFamily, key: cf.Key) anyerror!?cf.Value { + pub fn get( + self: *Self, + allocator: Allocator, + comptime cf: ColumnFamily, + key: cf.Key, + ) anyerror!?cf.Value { const val_bytes = try self.getBytes(cf, key) orelse return null; defer val_bytes.deinit(); - return try value_serializer.deserialize(cf.Value, self.allocator, val_bytes.data); + return try value_serializer.deserialize(cf.Value, allocator, val_bytes.data); } pub fn getBytes(self: *Self, comptime cf: ColumnFamily, key: cf.Key) anyerror!?BytesRef { @@ -128,7 +147,7 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type { ); } - pub fn deleteFilesRange( + pub fn deleteFilesInRange( self: *Self, comptime cf: ColumnFamily, start: cf.Key, @@ -142,7 +161,7 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type { return try callRocks( self.logger, - rocks.DB.deleteFileInRange, + rocks.DB.deleteFilesInRange, .{ &self.db, self.cf_handles[cf.find(column_families)], start_bytes.data, end_bytes.data }, ); } @@ -174,6 +193,10 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type { inner: rocks.WriteBatch, cf_handles: []const rocks.ColumnFamilyHandle, + pub fn deinit(self: *WriteBatch) void { + self.inner.deinit(); + } + pub fn put( self: *WriteBatch, comptime cf: ColumnFamily, @@ -297,7 +320,7 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type { RocksDBPut, RocksDBGet, RocksDBDelete, - RocksDBDeleteFileInRange, + RocksDBDeleteFilesInRange, RocksDBIterator, RocksDBWrite, } || Allocator.Error; diff --git a/src/ledger/tests.zig b/src/ledger/tests.zig index a01201267..9f5a17ba0 100644 --- a/src/ledger/tests.zig +++ b/src/ledger/tests.zig @@ -40,7 +40,11 @@ test "put/get data consistency for merkle root" { .first_received_shred_type = .data, }, ); - const output: sig.ledger.meta.MerkleRootMeta = (try db.get(schema.merkle_root_meta, id)).?; + const output: sig.ledger.meta.MerkleRootMeta = (try db.get( + std.testing.allocator, + schema.merkle_root_meta, + id, + )).?; try std.testing.expectEqualSlices(u8, &root.data, &output.merkle_root.?.data); } diff --git a/src/ledger/writer.zig b/src/ledger/writer.zig index 1a17d8c64..2fbd18e5c 100644 --- a/src/ledger/writer.zig +++ b/src/ledger/writer.zig @@ -92,7 +92,7 @@ pub const BlockstoreWriter = struct { frozen_hash: Hash, is_duplicate_confirmed: bool, ) !void { - if (try self.db.get(schema.bank_hash, slot)) |prev_value| { + if (try self.db.get(self.allocator, schema.bank_hash, slot)) |prev_value| { if (frozen_hash.eql(prev_value.frozenHash()) and prev_value.isDuplicateConfirmed()) { // Don't overwrite is_duplicate_confirmed == true with is_duplicate_confirmed == false, // which may happen on startup when procesing from blockstore processor because the @@ -127,6 +127,7 @@ pub const BlockstoreWriter = struct { /// agave: set_roots pub fn setRoots(self: *Self, rooted_slots: []const Slot) !void { var write_batch = try self.db.initWriteBatch(); + defer write_batch.deinit(); var max_new_rooted_slot: Slot = 0; for (rooted_slots) |slot| { max_new_rooted_slot = @max(max_new_rooted_slot, slot); @@ -257,7 +258,7 @@ pub const BlockstoreWriter = struct { /// have their slots considered connected. /// agave: set_and_chain_connected_on_root_and_next_slots pub fn setAndChainConnectedOnRootAndNextSlots(self: *Self, root: Slot) !void { - var root_slot_meta: SlotMeta = try self.db.get(schema.slot_meta, root) orelse + var root_slot_meta: SlotMeta = try self.db.get(self.allocator, schema.slot_meta, root) orelse SlotMeta.init(self.allocator, root, null); defer root_slot_meta.deinit(); @@ -268,6 +269,7 @@ pub const BlockstoreWriter = struct { } self.logger.infof("Marking slot {} and any full children slots as connected", .{root}); var write_batch = try self.db.initWriteBatch(); + defer write_batch.deinit(); // Mark both connected bits on the root slot so that the flags for this // slot match the flags of slots that become connected the typical way. @@ -284,7 +286,7 @@ pub const BlockstoreWriter = struct { var i: usize = 0; while (i < next_slots.items.len) : (i += 1) { const slot = next_slots.items[i]; - var slot_meta: SlotMeta = try self.db.get(schema.slot_meta, slot) orelse { + var slot_meta: SlotMeta = try self.db.get(self.allocator, schema.slot_meta, slot) orelse { self.logger.errf("Slot {} is a child but has no SlotMeta in blockstore", .{slot}); return error.CorruptedBlockstore; }; @@ -310,6 +312,7 @@ pub const BlockstoreWriter = struct { /// analog to [`run_purge_with_stats`](https://github.com/anza-xyz/agave/blob/26692e666454d340a6691e2483194934e6a8ddfc/ledger/src/blockstore/blockstore_purge.rs#L202) pub fn purgeSlots(self: *Self, from_slot: Slot, to_slot: Slot) !bool { var write_batch = try self.db.initWriteBatch(); + defer write_batch.deinit(); // the methods used below are exclusive [from_slot, to_slot), so we add 1 to purge inclusive const purge_to_slot = to_slot + 1; @@ -425,12 +428,12 @@ pub const BlockstoreWriter = struct { to_key: cf.Key, count: *u32, ) !void { - try db.deleteFilesRange(cf, from_key, to_key); + try db.deleteFilesInRange(cf, from_key, to_key); count.* += 1; } fn isRoot(self: *Self, slot: Slot) !bool { - return try self.db.get(schema.roots, slot) orelse false; + return try self.db.get(self.allocator, schema.roots, slot) orelse false; } }; @@ -498,6 +501,7 @@ test "purgeSlots" { // write another type var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); for (0..roots.len + 1) |i| { const merkle_root_meta = sig.ledger.shred.ErasureSetId{ .fec_set_index = i, @@ -518,12 +522,20 @@ test "purgeSlots" { try std.testing.expectEqual(true, did_purge2); for (0..5 + 1) |i| { - const r = try db.get(schema.merkle_root_meta, .{ .slot = i, .fec_set_index = i }); + const r = try db.get( + std.testing.allocator, + schema.merkle_root_meta, + .{ .slot = i, .fec_set_index = i }, + ); try std.testing.expectEqual(null, r); } for (6..10 + 1) |i| { - const r = try db.get(schema.merkle_root_meta, .{ .slot = i, .fec_set_index = i }); + const r = try db.get( + std.testing.allocator, + schema.merkle_root_meta, + .{ .slot = i, .fec_set_index = i }, + ); try std.testing.expect(r != null); } } @@ -584,6 +596,7 @@ test "scanAndFixRoots" { const slot_meta_3 = SlotMeta.init(allocator, 3, 2); var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put(schema.slot_meta, slot_meta_1.slot, slot_meta_1); try write_batch.put(schema.slot_meta, slot_meta_2.slot, slot_meta_2); try write_batch.put(schema.slot_meta, slot_meta_3.slot, slot_meta_3); @@ -621,6 +634,7 @@ test "setAndChainConnectedOnRootAndNextSlots" { try writer.setRoots(&roots); const slot_meta_1 = SlotMeta.init(allocator, 1, null); var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); try write_batch.put(schema.slot_meta, slot_meta_1.slot, slot_meta_1); try db.commit(write_batch); @@ -629,13 +643,15 @@ test "setAndChainConnectedOnRootAndNextSlots" { try writer.setAndChainConnectedOnRootAndNextSlots(1); // should be connected - const db_slot_meta_1 = (try db.get(schema.slot_meta, 1)) orelse return error.MissingSlotMeta; + const db_slot_meta_1 = (try db.get(allocator, schema.slot_meta, 1)) orelse + return error.MissingSlotMeta; try std.testing.expectEqual(true, db_slot_meta_1.isConnected()); // write some roots past 1 const other_roots: [3]Slot = .{ 2, 3, 4 }; var parent_slot: ?Slot = 1; var write_batch2 = try db.initWriteBatch(); + defer write_batch2.deinit(); try writer.setRoots(&other_roots); for (other_roots, 0..) |slot, i| { @@ -659,7 +675,8 @@ test "setAndChainConnectedOnRootAndNextSlots" { try writer.setAndChainConnectedOnRootAndNextSlots(other_roots[0]); for (other_roots) |slot| { - var db_slot_meta = (try db.get(schema.slot_meta, slot)) orelse return error.MissingSlotMeta; + var db_slot_meta = (try db.get(allocator, schema.slot_meta, slot)) orelse + return error.MissingSlotMeta; defer db_slot_meta.deinit(); try std.testing.expectEqual(true, db_slot_meta.isConnected()); } @@ -686,6 +703,7 @@ test "setAndChainConnectedOnRootAndNextSlots: disconnected" { // 1 is a root and full var write_batch = try db.initWriteBatch(); + defer write_batch.deinit(); const roots: [3]Slot = .{ 1, 2, 3 }; try writer.setRoots(&roots); @@ -716,16 +734,19 @@ test "setAndChainConnectedOnRootAndNextSlots: disconnected" { try writer.setAndChainConnectedOnRootAndNextSlots(1); // should be connected - var db_slot_meta_1 = (try db.get(schema.slot_meta, 1)) orelse return error.MissingSlotMeta; + var db_slot_meta_1 = (try db.get(allocator, schema.slot_meta, 1)) orelse + return error.MissingSlotMeta; defer db_slot_meta_1.deinit(); try std.testing.expectEqual(true, db_slot_meta_1.isConnected()); - var db_slot_meta_2: SlotMeta = (try db.get(schema.slot_meta, 2)) orelse return error.MissingSlotMeta; + var db_slot_meta_2: SlotMeta = (try db.get(allocator, schema.slot_meta, 2)) orelse + return error.MissingSlotMeta; defer db_slot_meta_2.deinit(); try std.testing.expectEqual(true, db_slot_meta_2.isParentConnected()); try std.testing.expectEqual(false, db_slot_meta_2.isConnected()); - var db_slot_meta_3: SlotMeta = (try db.get(schema.slot_meta, 3)) orelse return error.MissingSlotMeta; + var db_slot_meta_3: SlotMeta = (try db.get(allocator, schema.slot_meta, 3)) orelse + return error.MissingSlotMeta; defer db_slot_meta_3.deinit(); try std.testing.expectEqual(false, db_slot_meta_3.isParentConnected()); try std.testing.expectEqual(false, db_slot_meta_3.isConnected()); diff --git a/src/utils/collections.zig b/src/utils/collections.zig index 3ea481f0a..84b5f6f22 100644 --- a/src/utils/collections.zig +++ b/src/utils/collections.zig @@ -83,7 +83,7 @@ pub fn SortedSetCustom(comptime T: type, comptime config: SortedMapConfig(T)) ty return .{ .map = SortedMapCustom(T, void, config).init(allocator) }; } - pub fn deinit(self: *Self) void { + pub fn deinit(self: Self) void { self.map.deinit(); } @@ -119,6 +119,11 @@ pub fn SortedSetCustom(comptime T: type, comptime config: SortedMapConfig(T)) ty pub fn range(self: *Self, start: ?T, end: ?T) []const T { return self.map.range(start, end)[0]; } + + /// subslice of items ranging from start (inclusive) to end (exclusive) + pub fn rangeCustom(self: *Self, start: ?Bound(T), end: ?Bound(T)) []const T { + return self.map.rangeCustom(start, end)[0]; + } }; } @@ -154,8 +159,9 @@ pub fn SortedMapCustom( }; } - pub fn deinit(self: *Self) void { - self.inner.deinit(); + pub fn deinit(self: Self) void { + var self_mut = self; + self_mut.inner.deinit(); } pub fn clone(self: Self) !Self { @@ -282,7 +288,7 @@ pub fn SortedMapCustom( if (end) |end_| { // .any instead of .last because uniqueness is guaranteed const end_index = switch (binarySearch(K, keys_, end_, .any, order)) { - .found => |index| if (excl_end) index else if (index == 0) 0 else index - 1, + .found => |index| if (excl_end) index else index + 1, .after => |index| index + 1, .less => return .{ &.{}, &.{} }, .greater => keys_.len, @@ -360,6 +366,19 @@ pub fn order(a: anytype, b: anytype) std.math.Order { @compileError(std.fmt.comptimePrint("`order` not supported for {}", .{T})); } +pub const BinarySearchResult = union(enum) { + /// item was found at this index + found: usize, + /// not found, but it's between this and the next index + after: usize, + /// the search term is less than all items in the slice + less, + /// the search term is greater than all items in the slice + greater, + /// the input slice is empty + empty, +}; + /// binary search that is very specific about the outcome. /// only works with numbers pub fn binarySearch( @@ -375,18 +394,7 @@ pub fn binarySearch( /// - fn(a: T, b: T) std.math.Order /// - fn(a: anytype, b: anytype) std.math.Order comptime orderFn: anytype, -) union(enum) { - /// item was found at this index - found: usize, - /// not found, but it's between this and the next index - after: usize, - /// the search term is less than all items in the slice - less, - /// the search term is greater than all items in the slice - greater, - /// the input slice is empty - empty, -} { +) BinarySearchResult { if (items.len == 0) return .empty; // binary search for the item @@ -559,3 +567,64 @@ test "order slices" { try expectEqual(orderSlices(u8, std.math.order, &b, &e), .gt); try expectEqual(orderSlices(u8, std.math.order, &e, &b), .lt); } + +test "sorted set slice range" { + var set = SortedSet([]const u8).init(std.testing.allocator); + defer set.deinit(); + try set.put(&.{ 0, 0, 10 }); + try set.put(&.{ 0, 0, 20 }); + try set.put(&.{ 0, 0, 30 }); + try set.put(&.{ 0, 0, 40 }); + + const range = set.rangeCustom(null, .{ .inclusive = &.{ 0, 0, 40 } }); + + try std.testing.expectEqual(4, range.len); + try std.testing.expectEqualSlices(u8, &.{ 0, 0, 10 }, range[0]); + try std.testing.expectEqualSlices(u8, &.{ 0, 0, 20 }, range[1]); + try std.testing.expectEqualSlices(u8, &.{ 0, 0, 30 }, range[2]); + try std.testing.expectEqualSlices(u8, &.{ 0, 0, 40 }, range[3]); +} + +test "binarySearch slice of slices" { + const slices = [4][]const u8{ + &.{ 0, 0, 10 }, + &.{ 0, 0, 20 }, + &.{ 0, 0, 30 }, + &.{ 0, 0, 40 }, + }; + + try std.testing.expectEqual( + BinarySearchResult{ .found = 3 }, + binarySearch([]const u8, &slices, &.{ 0, 0, 40 }, .any, order), + ); + try std.testing.expectEqual( + BinarySearchResult{ .after = 2 }, + binarySearch([]const u8, &slices, &.{ 0, 0, 39 }, .any, order), + ); + try std.testing.expectEqual( + BinarySearchResult.greater, + binarySearch([]const u8, &slices, &.{ 0, 0, 41 }, .any, order), + ); + + try std.testing.expectEqual( + BinarySearchResult{ .found = 0 }, + binarySearch([]const u8, &slices, &.{ 0, 0, 10 }, .any, order), + ); + try std.testing.expectEqual( + BinarySearchResult{ .after = 0 }, + binarySearch([]const u8, &slices, &.{ 0, 0, 11 }, .any, order), + ); + try std.testing.expectEqual( + BinarySearchResult.less, + binarySearch([]const u8, &slices, &.{ 0, 0, 9 }, .any, order), + ); + + try std.testing.expectEqual( + BinarySearchResult{ .found = 1 }, + binarySearch([]const u8, &slices, &.{ 0, 0, 20 }, .any, order), + ); + try std.testing.expectEqual( + BinarySearchResult{ .after = 1 }, + binarySearch([]const u8, &slices, &.{ 0, 0, 21 }, .any, order), + ); +}