Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ledger): user-selectable db, hashmapdb fixes, add tests, safer memory management #372

Merged
merged 5 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ jobs:
version: 0.13.0

- name: test
run: zig build test -Denable-tsan=true
run: |
zig build test -Denable-tsan=true
zig build test -Denable-tsan=true -Dblockstore-db=hashmap -Dfilter=ledger

kcov_test:
strategy:
Expand Down
34 changes: 30 additions & 4 deletions build.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ pub fn build(b: *Build) void {
const filters = b.option([]const []const u8, "filter", "List of filters, used for example to filter unit tests by name"); // specified as a series like `-Dfilter="filter1" -Dfilter="filter2"`
const enable_tsan = b.option(bool, "enable-tsan", "Enable TSan for the test suite");
const no_run = b.option(bool, "no-run", "Do not run the selected step and install it") orelse false;
const blockstore_db = b.option(BlockstoreDB, "blockstore-db", "Blockstore database backend") orelse .rocksdb;

// Build options
const build_options = b.addOptions();
build_options.addOption(BlockstoreDB, "blockstore_db", blockstore_db);

// CLI build steps
const sig_step = b.step("run", "Run the sig executable");
Expand Down Expand Up @@ -55,7 +60,11 @@ pub fn build(b: *Build) void {
sig_mod.addImport("httpz", httpz_mod);
sig_mod.addImport("zstd", zstd_mod);
sig_mod.addImport("curl", curl_mod);
sig_mod.addImport("rocksdb", rocksdb_mod);
switch (blockstore_db) {
.rocksdb => sig_mod.addImport("rocksdb", rocksdb_mod),
.hashmap => {},
}
sig_mod.addOptions("build-options", build_options);

// main executable
const sig_exe = b.addExecutable(.{
Expand All @@ -72,7 +81,11 @@ pub fn build(b: *Build) void {
sig_exe.root_module.addImport("zig-cli", zig_cli_module);
sig_exe.root_module.addImport("zig-network", zig_network_module);
sig_exe.root_module.addImport("zstd", zstd_mod);
sig_exe.root_module.addImport("rocksdb", rocksdb_mod);
switch (blockstore_db) {
.rocksdb => sig_exe.root_module.addImport("rocksdb", rocksdb_mod),
.hashmap => {},
}
sig_exe.root_module.addOptions("build-options", build_options);
sig_exe.linkLibC();

const main_exe_run = b.addRunArtifact(sig_exe);
Expand Down Expand Up @@ -110,7 +123,11 @@ pub fn build(b: *Build) void {
unit_tests_exe.root_module.addImport("httpz", httpz_mod);
unit_tests_exe.root_module.addImport("zig-network", zig_network_module);
unit_tests_exe.root_module.addImport("zstd", zstd_mod);
unit_tests_exe.root_module.addImport("rocksdb", rocksdb_mod);
switch (blockstore_db) {
.rocksdb => unit_tests_exe.root_module.addImport("rocksdb", rocksdb_mod),
.hashmap => {},
}
unit_tests_exe.root_module.addOptions("build-options", build_options);
unit_tests_exe.linkLibC();

const unit_tests_exe_run = b.addRunArtifact(unit_tests_exe);
Expand Down Expand Up @@ -150,8 +167,12 @@ pub fn build(b: *Build) void {
benchmark_exe.root_module.addImport("zig-network", zig_network_module);
benchmark_exe.root_module.addImport("httpz", httpz_mod);
benchmark_exe.root_module.addImport("zstd", zstd_mod);
benchmark_exe.root_module.addImport("rocksdb", rocksdb_mod);
benchmark_exe.root_module.addImport("prettytable", pretty_table_mod);
switch (blockstore_db) {
.rocksdb => benchmark_exe.root_module.addImport("rocksdb", rocksdb_mod),
.hashmap => {},
}
benchmark_exe.root_module.addOptions("build-options", build_options);
benchmark_exe.linkLibC();

const benchmark_exe_run = b.addRunArtifact(benchmark_exe);
Expand Down Expand Up @@ -195,3 +216,8 @@ fn makeZlsNotInstallAnythingDuringBuildOnSave(b: *Build) void {
artifact.generated_bin = null;
}
}

const BlockstoreDB = enum {
rocksdb,
hashmap,
};
4 changes: 2 additions & 2 deletions build.zig.zon
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
.hash = "1220f70ac854b59315a8512861e039648d677feb4f9677bd873d6b9b7074a5906485",
},
.rocksdb = .{
.url = "https://github.com/Syndica/rocksdb-zig/archive/9c09659a5e41f226b6b8f3fa21149247eb26dfae.tar.gz",
.hash = "1220aeb80b2f8bb48c131ef306fe48ddfcb537210c5f77742e921cbf40fc4c19b41e",
.url = "https://github.com/Syndica/rocksdb-zig/archive/5b4f3e3bb120f8e002705ee4a34b3027b8d1989d.tar.gz",
.hash = "1220fc8f92afabc4100fe431ebb0a34eb4c7cae80a216d42fd7342f87a9cf7cbfd58",
},
.prettytable = .{
.url = "https://github.com/dying-will-bullet/prettytable-zig/archive/46b6ad9b5970def35fa43c9613cd244f28862fa9.tar.gz",
Expand Down
2 changes: 1 addition & 1 deletion src/ledger/benchmarks.zig
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ pub const BenchmarkLedgerSlow = struct {
// connect the chain
parent_slot = slot;
}
try db.commit(write_batch);
try db.commit(&write_batch);

var timer = try sig.time.Timer.start();
const is_connected = try reader.slotRangeConnected(1, slot_per_epoch);
Expand Down
6 changes: 5 additions & 1 deletion src/ledger/blockstore.zig
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
const build_options = @import("build-options");
const ledger = @import("lib.zig");

pub const BlockstoreDB = ledger.database.RocksDB(&ledger.schema.list);
pub const BlockstoreDB = switch (build_options.blockstore_db) {
.rocksdb => ledger.database.RocksDB(&ledger.schema.list),
.hashmap => ledger.database.SharedHashMapDB(&ledger.schema.list),
};

test BlockstoreDB {
ledger.database.assertIsDatabase(BlockstoreDB);
Expand Down
8 changes: 5 additions & 3 deletions src/ledger/cleanup_service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ fn findSlotsToClean(
/// analog to [`run_purge_with_stats`](https://github.com/anza-xyz/agave/blob/26692e666454d340a6691e2483194934e6a8ddfc/ledger/src/blockstore/blockstore_purge.rs#L202)
pub fn purgeSlots(db: *BlockstoreDB, from_slot: Slot, to_slot: Slot) !bool {
var write_batch = try db.initWriteBatch();
defer write_batch.deinit();

// the methods used below are exclusive [from_slot, to_slot), so we add 1 to purge inclusive
const purge_to_slot = to_slot + 1;
Expand All @@ -212,7 +213,7 @@ pub fn purgeSlots(db: *BlockstoreDB, from_slot: Slot, to_slot: Slot) !bool {
writePurgeRange(&write_batch, from_slot, purge_to_slot) catch {
did_purge = false;
};
try db.commit(write_batch);
try db.commit(&write_batch);

if (did_purge and from_slot == 0) {
try purgeFilesInRange(db, from_slot, purge_to_slot);
Expand Down Expand Up @@ -372,7 +373,7 @@ test "findSlotsToClean" {
defer write_batch.deinit();
try write_batch.put(ledger.schema.schema.slot_meta, lowest_slot_meta.slot, lowest_slot_meta);
try write_batch.put(ledger.schema.schema.slot_meta, highest_slot_meta.slot, highest_slot_meta);
try db.commit(write_batch);
try db.commit(&write_batch);
}

const r = try findSlotsToClean(&reader, 0, 100);
Expand Down Expand Up @@ -421,6 +422,7 @@ test "purgeSlots" {

// write another type
var write_batch = try db.initWriteBatch();
defer write_batch.deinit();
for (0..roots.len + 1) |i| {
const merkle_root_meta = sig.ledger.shred.ErasureSetId{
.erasure_set_index = i,
Expand All @@ -434,7 +436,7 @@ test "purgeSlots" {

try write_batch.put(schema.merkle_root_meta, merkle_root_meta, merkle_meta);
}
try db.commit(write_batch);
try db.commit(&write_batch);

// purge the range [0, 5]
const did_purge2 = try purgeSlots(&db, 0, 5);
Expand Down
51 changes: 34 additions & 17 deletions src/ledger/database/hashmap.zig
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
const std = @import("std");
const sig = @import("../../sig.zig");
const database = @import("lib.zig");
const build_options = @import("build-options");

const Allocator = std.mem.Allocator;
const DefaultRwLock = std.Thread.RwLock.DefaultRwLock;
const RwLock = std.Thread.RwLock;

const BytesRef = database.interface.BytesRef;
const ColumnFamily = database.interface.ColumnFamily;
Expand All @@ -20,31 +21,36 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
maps: []SharedHashMap,
/// shared lock is required to call locking map methods.
/// exclusive lock is required to call non-locking map methods.
transaction_lock: DefaultRwLock = .{},
/// to avoid deadlocks, always acquire the shared lock *before* acquiring the map lock.
transaction_lock: *RwLock,

const Self = @This();

pub fn open(
allocator: Allocator,
_: Logger,
logger: Logger,
_: []const u8,
) Allocator.Error!Self {
logger.info().log("Initializing SharedHashMapDB");
var maps = try allocator.alloc(SharedHashMap, column_families.len);
const lock = try allocator.create(RwLock);
lock.* = .{};
errdefer {
for (maps) |*m| m.deinit();
allocator.free(maps);
}
inline for (0..column_families.len) |i| {
maps[i] = try SharedHashMap.init(allocator);
}
return .{ .allocator = allocator, .maps = maps };
return .{ .allocator = allocator, .maps = maps, .transaction_lock = lock };
}

pub fn deinit(self: *Self) void {
for (self.maps) |*map| {
map.deinit();
}
self.allocator.free(self.maps);
self.allocator.destroy(self.transaction_lock);
}

pub fn count(self: *Self, comptime cf: ColumnFamily) anyerror!u64 {
Expand Down Expand Up @@ -111,7 +117,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
const ret = try self.allocator.alloc(u8, val_bytes.len);
@memcpy(ret, val_bytes);
return .{
.allocator = self.allocator,
.deinitializer = .{ .allocator = self.allocator },
.data = ret,
};
}
Expand Down Expand Up @@ -164,7 +170,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {

/// Atomicity may be violated if there is insufficient
/// memory to complete a PUT.
pub fn commit(self: *Self, batch: WriteBatch) Allocator.Error!void {
pub fn commit(self: *Self, batch: *WriteBatch) Allocator.Error!void {
self.transaction_lock.lock();
defer self.transaction_lock.unlock();

Expand All @@ -178,9 +184,17 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
const cf_index, const key = delete_ix;
self.maps[cf_index].delete(self.allocator, key);
},
.delete_range => {
// TODO: also add to database tests
@panic("not implemented");
.delete_range => |delete_range_ix| {
const cf_index, const start, const end = delete_range_ix;
const keys, _ = self.maps[cf_index].map.range(start, end);
const to_delete = try batch.allocator.alloc([]const u8, keys.len);
defer batch.allocator.free(to_delete);
for (keys, 0..) |key, i| {
to_delete[i] = key;
}
for (to_delete) |delete_key| {
self.maps[cf_index].delete(self.allocator, delete_key);
}
},
}
}
Expand Down Expand Up @@ -232,6 +246,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
key: cf.Key,
value: cf.Value,
) anyerror!void {
std.debug.assert(!self.executed.*);
const k_bytes = try key_serializer.serializeAlloc(self.allocator, key);
errdefer self.allocator.free(k_bytes);
const v_bytes = try value_serializer.serializeAlloc(self.allocator, value);
Expand All @@ -247,6 +262,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
comptime cf: ColumnFamily,
key: cf.Key,
) anyerror!void {
std.debug.assert(!self.executed.*);
const k_bytes = try key_serializer.serializeAlloc(self.allocator, key);
errdefer self.allocator.free(k_bytes);
return try self.instructions.append(
Expand All @@ -261,12 +277,13 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
start: cf.Key,
end: cf.Key,
) anyerror!void {
std.debug.assert(!self.executed.*);
const start_bytes = try key_serializer.serializeAlloc(self.allocator, start);
errdefer self.allocator.free(start_bytes);
const end_bytes = try key_serializer.serializeAlloc(self.allocator, end);
errdefer self.allocator.free(end_bytes);
const cf_index = cf.find(column_families);
self.instructions.append(
try self.instructions.append(
self.allocator,
.{ .delete_range = .{ cf_index, start_bytes, end_bytes } },
);
Expand All @@ -282,10 +299,10 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
const shared_map = &self.maps[cf.find(column_families)];
const map = &shared_map.map;

shared_map.lock.lockShared();
defer shared_map.lock.unlockShared();
self.transaction_lock.lockShared();
defer self.transaction_lock.unlockShared();
shared_map.lock.lockShared();
defer shared_map.lock.unlockShared();

const keys, const vals = if (start) |start_| b: {
const search_bytes = try key_serializer.serializeAlloc(self.allocator, start_);
Expand Down Expand Up @@ -351,19 +368,19 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {

pub fn nextKey(self: *@This()) anyerror!?cf.Key {
const index = self.nextIndex() orelse return null;
return key_serializer.deserialize(cf.Key, self.allocator, self.keys[index]);
return try key_serializer.deserialize(cf.Key, self.allocator, self.keys[index]);
}

pub fn nextValue(self: *@This()) anyerror!?cf.Value {
const index = self.nextIndex() orelse return null;
return value_serializer.deserialize(cf.Value, self.allocator, self.vals[index]);
return try value_serializer.deserialize(cf.Value, self.allocator, self.vals[index]);
}

pub fn nextBytes(self: *@This()) error{}!?[2]BytesRef {
const index = self.nextIndex() orelse return null;
return .{
.{ .allocator = null, .data = self.keys[index] },
.{ .allocator = null, .data = self.vals[index] },
.{ .deinitializer = null, .data = self.keys[index] },
.{ .deinitializer = null, .data = self.vals[index] },
};
}

Expand All @@ -387,7 +404,7 @@ pub fn SharedHashMapDB(comptime column_families: []const ColumnFamily) type {
const SharedHashMap = struct {
allocator: Allocator,
map: SortedMap([]const u8, []const u8),
lock: DefaultRwLock = .{},
lock: RwLock = .{},

const Self = @This();

Expand Down
Loading
Loading