Skip to content

Commit

Permalink
fix(ledger): running out of disk space due to cleanup service not cle…
Browse files Browse the repository at this point in the history
…aning anything (#591)

## Fix

The cleanup service is supposed to delete data from the ledger from very
old slots. It also refuses to delete data from unrooted slots, since we
definitely need those slots.

Since we don't have consensus, we have no way to know which slots are
rooted. So the cleanup service is just assuming no slots are rooted, and
likewise isn't cleaning up anything. That means we will eventually run
out of storage on any server.

I added an assumption that anything over 100 slots old is rooted. It's
not a perfectly safe assumption because you really need consensus to be
sure about this. So we'll fix this when consensus is done. But for now
we need some kind of basic assumption here so the validator can function
without crashing the entire server.

A slot typically becomes rooted after 32 slots pass. The probability of
passing 100 slots without being rooted is extremely unlikely and would
only happen in the event of a massive consensus failure. This is a
reasonable assumption to live with for the time being, until we have
consensus.

I also increased the number of shreds to allow before cleaning them up.
Previously the limit was 1,000 which could represent only a few slots.
This configurable basically means we delete all rooted slots. I bumped
this up to 5 million shreds. This means the ledger will use about 10-20
GB of storage space once it reaches the limit, and it should contain
shreds going back for roughly an hour. Later we'll likely need to make
this larger so replay can catch up from snapshots more reliably, or
configurable so RPC can serve arbitrarily old data. For now, while the
shreds are not actually used for anything, I'm keeping it more
conservative to minimize hardware requirements.

## Test and RocksDB upgrade
To satisfy test coverage requirements I needed to add a unit test for
cleanBlockstore. This test wouldn't work without flushing the data_shred
column family. This required changes to rocksdb-zig to add `flush`. So
those upgrades are also included in this pr.
  • Loading branch information
dnut authored Mar 4, 2025
1 parent 6d0b413 commit 267e61e
Show file tree
Hide file tree
Showing 8 changed files with 91 additions and 25 deletions.
4 changes: 2 additions & 2 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
.hash = "122030ebe280b73693963a67ed656226a67b7f00a0a05665155da00c9fcdee90de88",
},
.rocksdb = .{
.url = "https://github.com/Syndica/rocksdb-zig/archive/cf966c0c6bb6205a08bb2006f399719542e4e770.tar.gz",
.hash = "1220d25e3ef18526cb6c980b441f05dc96330bedd88a512dcb3aed775a981ce3707d",
.url = "https://github.com/Syndica/rocksdb-zig/archive/cd747a88e66539e37585cccacb5ae311be2ad527.tar.gz",
.hash = "1220d917ac77a71426253d31e6c2b6db40494aa4e18c7284dfca9dc1eed5c7864274",
},
.lsquic = .{
.url = "https://github.com/Syndica/lsquic/archive/ed6ced0cbc6447f7135a32db491e398debdf8af7.tar.gz",
Expand Down
2 changes: 1 addition & 1 deletion src/config.zig
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ pub const Cmd = struct {

test_transaction_sender: TestTransactionSender = .{},

max_shreds: u64 = 1_000,
max_shreds: u64 = 5_000_000,
leader_schedule_path: ?[]const u8 = null,
genesis_file_path: ?[]const u8 = null,
// general config
Expand Down
53 changes: 48 additions & 5 deletions src/ledger/cleanup_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,18 @@ pub fn cleanBlockstore(
purge_interval: u64,
) !Slot {
const logger = logger_.withScope(LOG_SCOPE);

// // TODO: add back when max_root is implemented with consensus
// const root = blockstore_reader.max_root.load(.acquire);
// if (root - last_purge_slot <= purge_interval) return last_purge_slot;
_ = last_purge_slot;
_ = purge_interval;

// NOTE: this will clean everything past the lowest slot in the blockstore
const root: Slot = try blockstore_reader.lowestSlot();
// hack to get a conservative estimate of a recent slot that is almost definitely rooted
const root = if (try blockstore_reader.highestSlot()) |highest|
highest -| 100
else
try blockstore_reader.lowestSlot();

if (root - last_purge_slot <= purge_interval) return last_purge_slot;

const result = try findSlotsToClean(blockstore_reader, root, max_ledger_shreds);
logger.info().logf("findSlotsToClean result: {any}", .{result});

Expand Down Expand Up @@ -399,6 +403,45 @@ fn purgeFileRangeWithCount(
const Blockstore = ledger.BlockstoreDB;
const TestDB = ledger.tests.TestDB;

test cleanBlockstore {
// test setup
const allocator = std.testing.allocator;
const logger = sig.trace.DirectPrintLogger.init(allocator, .warn).logger();
const registry = sig.prometheus.globalRegistry();
var db = try TestDB.init(@src());
defer db.deinit();
var lowest_cleanup_slot = sig.sync.RwMux(Slot).init(0);
var max_root = std.atomic.Value(Slot).init(0);
var reader = try BlockstoreReader
.init(allocator, logger, db, registry, &lowest_cleanup_slot, &max_root);

// insert data
var batch = try db.initWriteBatch();
defer batch.deinit();
for (0..1_000) |i| {
for (0..10) |j| try batch.put(ledger.schema.schema.data_shred, .{ i, j }, &.{});
try batch.put(ledger.schema.schema.slot_meta, i, undefined);
}
try db.commit(&batch);
try db.flush(ledger.schema.schema.data_shred);

// run test subject
const slot = try cleanBlockstore(logger, &reader, &db, &lowest_cleanup_slot, 100, 0, 0);
try std.testing.expectEqual(899, slot);

// verify correct data was purged
var shred_iter = try db.iterator(ledger.schema.schema.data_shred, .forward, .{ 0, 0 });
defer shred_iter.deinit();
var meta_iter = try db.iterator(ledger.schema.schema.slot_meta, .forward, 0);
defer meta_iter.deinit();
for (900..1_000) |i| {
for (0..10) |j| try std.testing.expectEqual(.{ i, j }, (try shred_iter.nextKey()).?);
try std.testing.expectEqual(i, (try meta_iter.nextKey()).?);
}
try std.testing.expectEqual(null, shred_iter.nextKey());
try std.testing.expectEqual(null, meta_iter.nextKey());
}

test "findSlotsToClean" {
const allocator = std.testing.allocator;
var registry = sig.prometheus.Registry(.{}).init(allocator);
Expand Down
2 changes: 2 additions & 0 deletions src/ledger/database/hashmap.zig
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,8 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
}
};
}

pub fn flush(_: *Self, comptime _: ColumnFamily) anyerror!void {}
};
}

Expand Down
4 changes: 4 additions & 0 deletions src/ledger/database/interface.zig
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,10 @@ pub fn Database(comptime Impl: type) type {
}
};
}

pub fn flush(self: *Self, comptime cf: ColumnFamily) anyerror!void {
self.impl.flush(cf);
}
};
}

Expand Down
9 changes: 9 additions & 0 deletions src/ledger/database/rocksdb.zig
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,14 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type {
};
}

pub fn flush(self: *Self, comptime cf: ColumnFamily) error{RocksDBFlush}!void {
try callRocks(
self.logger,
rocks.DB.flush,
.{ &self.db, self.cf_handles[cf.find(column_families)] },
);
}

const Error = error{
RocksDBOpen,
RocksDBPut,
Expand All @@ -337,6 +345,7 @@ pub fn RocksDB(comptime column_families: []const ColumnFamily) type {
RocksDBDeleteFilesInRange,
RocksDBIterator,
RocksDBWrite,
RocksDBFlush,
} || Allocator.Error;
};
}
Expand Down
40 changes: 24 additions & 16 deletions src/utils/collections.zig
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,8 @@ pub fn SortedSetCustom(comptime T: type, comptime config: SortedMapConfig(T)) ty
try self.map.put(item, {});
}

pub fn remove(self: *Self, item: T) bool {
return self.map.remove(item);
pub fn orderedRemove(self: *Self, item: T) bool {
return self.map.orderedRemove(item);
}

pub fn contains(self: Self, item: T) bool {
Expand Down Expand Up @@ -394,7 +394,9 @@ pub fn SortedMapCustom(

pub fn fetchSwapRemove(self: *Self, key: K) ?Inner.KV {
const item = self.inner.fetchSwapRemove(key);
self.resetMaxOnRemove(key);
if (item != null and !self.resetMaxOnRemove(key)) {
self.is_sorted = false;
}
return item;
}

Expand All @@ -417,21 +419,27 @@ pub fn SortedMapCustom(
}
}

pub fn remove(self: *Self, key: K) bool {
const item = self.inner.orderedRemove(key);
self.resetMaxOnRemove(key);
return item;
pub fn orderedRemove(self: *Self, key: K) bool {
const was_removed = self.inner.orderedRemove(key);
if (was_removed) _ = self.resetMaxOnRemove(key);
return was_removed;
}

fn resetMaxOnRemove(self: *Self, removed_key: K) void {
if (self.max) |max| {
if (self.count() == 0) {
self.max = null;
} else if (order(removed_key, max) == .eq) {
self.sort();
/// - returns whether the key was the prior max.
/// - don't call this unless an item was definitely removed.
fn resetMaxOnRemove(self: *Self, removed_key: K) bool {
std.debug.assert(self.max != null);
if (self.count() == 0) {
self.max = null;
return true;
} else switch (order(removed_key, self.max.?)) {
.eq => {
const sorted_keys = self.keys();
self.max = sorted_keys[sorted_keys.len - 1];
}
return true;
},
.gt => unreachable,
.lt => return false,
}
}

Expand Down Expand Up @@ -854,9 +862,9 @@ test SortedSet {
try set.put(5);

// remove
try expect(set.remove(5));
try expect(set.orderedRemove(5));
try expect(!set.contains(5));
try expect(!set.remove(5));
try expect(!set.orderedRemove(5));
try set.put(5);
try expect(set.contains(5));

Expand Down
2 changes: 1 addition & 1 deletion src/utils/interface.zig
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ const FunctionSignature = struct {
if (@typeInfo(sub).ErrorSet) |sub_set| if (@typeInfo(super).ErrorSet) |super_set| {
sub: for (sub_set) |sub_err| {
for (super_set) |super_err| {
if (std.mem.eql(sub_err.name, super_err.name)) {
if (std.mem.eql(u8, sub_err.name, super_err.name)) {
continue :sub;
}
}
Expand Down

0 comments on commit 267e61e

Please sign in to comment.