diff --git a/build.zig.zon b/build.zig.zon index 37fcafe6f..b18ea5a57 100644 --- a/build.zig.zon +++ b/build.zig.zon @@ -20,8 +20,8 @@ .hash = "122030ebe280b73693963a67ed656226a67b7f00a0a05665155da00c9fcdee90de88", }, .rocksdb = .{ - .url = "https://github.com/Syndica/rocksdb-zig/archive/cf966c0c6bb6205a08bb2006f399719542e4e770.tar.gz", - .hash = "1220d25e3ef18526cb6c980b441f05dc96330bedd88a512dcb3aed775a981ce3707d", + .url = "https://github.com/Syndica/rocksdb-zig/archive/cd747a88e66539e37585cccacb5ae311be2ad527.tar.gz", + .hash = "1220d917ac77a71426253d31e6c2b6db40494aa4e18c7284dfca9dc1eed5c7864274", }, .lsquic = .{ .url = "https://github.com/Syndica/lsquic/archive/ed6ced0cbc6447f7135a32db491e398debdf8af7.tar.gz", diff --git a/src/config.zig b/src/config.zig index 72e03362f..07fb1c2fb 100644 --- a/src/config.zig +++ b/src/config.zig @@ -15,7 +15,7 @@ pub const Cmd = struct { test_transaction_sender: TestTransactionSender = .{}, - max_shreds: u64 = 1_000, + max_shreds: u64 = 5_000_000, leader_schedule_path: ?[]const u8 = null, genesis_file_path: ?[]const u8 = null, // general config diff --git a/src/ledger/cleanup_service.zig b/src/ledger/cleanup_service.zig index 0bee26c6c..1f37852a6 100644 --- a/src/ledger/cleanup_service.zig +++ b/src/ledger/cleanup_service.zig @@ -89,14 +89,18 @@ pub fn cleanBlockstore( purge_interval: u64, ) !Slot { const logger = logger_.withScope(LOG_SCOPE); + // // TODO: add back when max_root is implemented with consensus // const root = blockstore_reader.max_root.load(.acquire); - // if (root - last_purge_slot <= purge_interval) return last_purge_slot; - _ = last_purge_slot; - _ = purge_interval; - // NOTE: this will clean everything past the lowest slot in the blockstore - const root: Slot = try blockstore_reader.lowestSlot(); + // hack to get a conservative estimate of a recent slot that is almost definitely rooted + const root = if (try blockstore_reader.highestSlot()) |highest| + highest -| 100 + else + try blockstore_reader.lowestSlot(); + + if (root - last_purge_slot <= purge_interval) return last_purge_slot; + const result = try findSlotsToClean(blockstore_reader, root, max_ledger_shreds); logger.info().logf("findSlotsToClean result: {any}", .{result}); @@ -399,6 +403,45 @@ fn purgeFileRangeWithCount( const Blockstore = ledger.BlockstoreDB; const TestDB = ledger.tests.TestDB; +test cleanBlockstore { + // test setup + const allocator = std.testing.allocator; + const logger = sig.trace.DirectPrintLogger.init(allocator, .warn).logger(); + const registry = sig.prometheus.globalRegistry(); + var db = try TestDB.init(@src()); + defer db.deinit(); + var lowest_cleanup_slot = sig.sync.RwMux(Slot).init(0); + var max_root = std.atomic.Value(Slot).init(0); + var reader = try BlockstoreReader + .init(allocator, logger, db, registry, &lowest_cleanup_slot, &max_root); + + // insert data + var batch = try db.initWriteBatch(); + defer batch.deinit(); + for (0..1_000) |i| { + for (0..10) |j| try batch.put(ledger.schema.schema.data_shred, .{ i, j }, &.{}); + try batch.put(ledger.schema.schema.slot_meta, i, undefined); + } + try db.commit(&batch); + try db.flush(ledger.schema.schema.data_shred); + + // run test subject + const slot = try cleanBlockstore(logger, &reader, &db, &lowest_cleanup_slot, 100, 0, 0); + try std.testing.expectEqual(899, slot); + + // verify correct data was purged + var shred_iter = try db.iterator(ledger.schema.schema.data_shred, .forward, .{ 0, 0 }); + defer shred_iter.deinit(); + var meta_iter = try db.iterator(ledger.schema.schema.slot_meta, .forward, 0); + defer meta_iter.deinit(); + for (900..1_000) |i| { + for (0..10) |j| try std.testing.expectEqual(.{ i, j }, (try shred_iter.nextKey()).?); + try std.testing.expectEqual(i, (try meta_iter.nextKey()).?); + } + try std.testing.expectEqual(null, shred_iter.nextKey()); + try std.testing.expectEqual(null, meta_iter.nextKey()); +} + test "findSlotsToClean" { const allocator = std.testing.allocator; var registry = sig.prometheus.Registry(.{}).init(allocator); diff --git a/src/ledger/database/hashmap.zig b/src/ledger/database/hashmap.zig index d14e4f3e6..142ddec73 100644 --- a/src/ledger/database/hashmap.zig +++ b/src/ledger/database/hashmap.zig @@ -445,6 +445,8 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type { } }; } + + pub fn flush(_: *Self, comptime _: ColumnFamily) anyerror!void {} }; } diff --git a/src/ledger/database/interface.zig b/src/ledger/database/interface.zig index 29fd69542..978bd3451 100644 --- a/src/ledger/database/interface.zig +++ b/src/ledger/database/interface.zig @@ -180,6 +180,10 @@ pub fn Database(comptime Impl: type) type { } }; } + + pub fn flush(self: *Self, comptime cf: ColumnFamily) anyerror!void { + self.impl.flush(cf); + } }; } diff --git a/src/ledger/database/rocksdb.zig b/src/ledger/database/rocksdb.zig index 8f6f10ff8..94495d8ad 100644 --- a/src/ledger/database/rocksdb.zig +++ b/src/ledger/database/rocksdb.zig @@ -329,6 +329,14 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type { }; } + pub fn flush(self: *Self, comptime cf: ColumnFamily) error{RocksDBFlush}!void { + try callRocks( + self.logger, + rocks.DB.flush, + .{ &self.db, self.cf_handles[cf.find(column_families)] }, + ); + } + const Error = error{ RocksDBOpen, RocksDBPut, @@ -337,6 +345,7 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type { RocksDBDeleteFilesInRange, RocksDBIterator, RocksDBWrite, + RocksDBFlush, } || Allocator.Error; }; } diff --git a/src/utils/collections.zig b/src/utils/collections.zig index b84c437c2..6f68445aa 100644 --- a/src/utils/collections.zig +++ b/src/utils/collections.zig @@ -296,8 +296,8 @@ pub fn SortedSetCustom(comptime T: type, comptime config: SortedMapConfig(T)) ty try self.map.put(item, {}); } - pub fn remove(self: *Self, item: T) bool { - return self.map.remove(item); + pub fn orderedRemove(self: *Self, item: T) bool { + return self.map.orderedRemove(item); } pub fn contains(self: Self, item: T) bool { @@ -394,7 +394,9 @@ pub fn SortedMapCustom( pub fn fetchSwapRemove(self: *Self, key: K) ?Inner.KV { const item = self.inner.fetchSwapRemove(key); - self.resetMaxOnRemove(key); + if (item != null and !self.resetMaxOnRemove(key)) { + self.is_sorted = false; + } return item; } @@ -417,21 +419,27 @@ pub fn SortedMapCustom( } } - pub fn remove(self: *Self, key: K) bool { - const item = self.inner.orderedRemove(key); - self.resetMaxOnRemove(key); - return item; + pub fn orderedRemove(self: *Self, key: K) bool { + const was_removed = self.inner.orderedRemove(key); + if (was_removed) _ = self.resetMaxOnRemove(key); + return was_removed; } - fn resetMaxOnRemove(self: *Self, removed_key: K) void { - if (self.max) |max| { - if (self.count() == 0) { - self.max = null; - } else if (order(removed_key, max) == .eq) { - self.sort(); + /// - returns whether the key was the prior max. + /// - don't call this unless an item was definitely removed. + fn resetMaxOnRemove(self: *Self, removed_key: K) bool { + std.debug.assert(self.max != null); + if (self.count() == 0) { + self.max = null; + return true; + } else switch (order(removed_key, self.max.?)) { + .eq => { const sorted_keys = self.keys(); self.max = sorted_keys[sorted_keys.len - 1]; - } + return true; + }, + .gt => unreachable, + .lt => return false, } } @@ -854,9 +862,9 @@ test SortedSet { try set.put(5); // remove - try expect(set.remove(5)); + try expect(set.orderedRemove(5)); try expect(!set.contains(5)); - try expect(!set.remove(5)); + try expect(!set.orderedRemove(5)); try set.put(5); try expect(set.contains(5)); diff --git a/src/utils/interface.zig b/src/utils/interface.zig index 772185311..aa8cdb31c 100644 --- a/src/utils/interface.zig +++ b/src/utils/interface.zig @@ -298,7 +298,7 @@ const FunctionSignature = struct { if (@typeInfo(sub).ErrorSet) |sub_set| if (@typeInfo(super).ErrorSet) |super_set| { sub: for (sub_set) |sub_err| { for (super_set) |super_err| { - if (std.mem.eql(sub_err.name, super_err.name)) { + if (std.mem.eql(u8, sub_err.name, super_err.name)) { continue :sub; } }