Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(shred-network): minimize dependencies #462

Merged
merged 18 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 149 additions & 0 deletions src/adapter.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
//! Links dependencies with dependents. Connects components from distant regions of the code.

const std = @import("std");
const sig = @import("sig.zig");

const leader_schedule = sig.core.leader_schedule;

const Allocator = std.mem.Allocator;

const Epoch = sig.core.Epoch;
const EpochContext = sig.core.EpochContext;
const EpochSchedule = sig.core.EpochSchedule;
const Slot = sig.core.Slot;

pub const EpochContextManager = struct {
schedule: sig.core.EpochSchedule,
contexts: ContextWindow,

const ContextWindow = sig.sync.SharedPointerWindow(
sig.core.EpochContext,
sig.core.EpochContext.deinit,
std.mem.Allocator,
);

const Self = @This();

/// all contexts that are `put` into this context manager must be
/// allocated using the same allocator passed here.
pub fn init(allocator: Allocator, schedule: EpochSchedule) Allocator.Error!Self {
return .{
.schedule = schedule,
.contexts = try ContextWindow.init(allocator, 3, 0, allocator),
};
}

pub fn deinit(self: Self) void {
self.contexts.deinit();
}

pub fn put(self: *Self, epoch: Epoch, context: sig.core.EpochContext) !void {
try self.contexts.put(epoch, context);
}

/// call `release` when done with pointer
pub fn get(self: *Self, epoch: Epoch) ?*const sig.core.EpochContext {
return self.contexts.get(@intCast(epoch));
}

pub fn contains(self: *Self, epoch: Epoch) bool {
return self.contexts.contains(@intCast(epoch));
}

pub fn setEpoch(self: *Self, epoch: Epoch) void {
self.contexts.realign(@intCast(epoch));
}

pub fn setSlot(self: *Self, slot: Slot) void {
self.contexts.realign(@intCast(self.schedule.getEpoch(slot)));
}

pub fn release(self: *Self, context: *const sig.core.EpochContext) void {
self.contexts.release(context);
}

pub fn getLeader(self: *Self, slot: Slot) ?sig.core.Pubkey {
const epoch, const slot_index = self.schedule.getEpochAndSlotIndex(slot);
const context = self.contexts.get(epoch) orelse return null;
defer self.contexts.release(context);
return context.leader_schedule[slot_index];
}

pub fn slotLeaders(self: *Self) sig.core.leader_schedule.SlotLeaders {
return sig.core.leader_schedule.SlotLeaders.init(self, getLeader);
}
};

pub const RpcEpochContextService = struct {
allocator: std.mem.Allocator,
logger: sig.trace.ScopedLogger(@typeName(Self)),
rpc_client: sig.rpc.Client,
state: *EpochContextManager,

const Self = @This();

pub fn init(
allocator: Allocator,
logger: sig.trace.Logger,
state: *EpochContextManager,
rpc_client: sig.rpc.Client,
) Self {
return .{
.allocator = allocator,
.logger = logger.withScope(@typeName(Self)),
.rpc_client = rpc_client,
.state = state,
};
}

pub fn run(self: *Self, exit: *std.atomic.Value(bool)) void {
var i: usize = 0;
while (!exit.load(.monotonic)) {
if (i % 1000 == 0) {
self.refresh() catch |e| {
self.logger.err().logf("failed to refresh epoch context via rpc: {}", .{e});
};
}
std.time.sleep(100 * std.time.ns_per_ms);
i += 1;
}
}

fn refresh(self: *Self) !void {
const response = try self.rpc_client.getSlot(self.allocator, .{});
defer response.deinit();
const old_slot = try response.result() - self.state.schedule.slots_per_epoch;
const last_epoch = self.state.schedule.getEpoch(old_slot);

self.state.setEpoch(last_epoch);

const ls1 = try self.getLeaderSchedule(old_slot);
const ctx1 = EpochContext{ .staked_nodes = .{}, .leader_schedule = ls1 };
try self.state.put(last_epoch, ctx1);

for (0..3) |epoch_offset| {
const selected_slot = old_slot + epoch_offset * self.state.schedule.slots_per_epoch;
const selected_epoch = last_epoch + epoch_offset;
std.debug.assert(selected_epoch == self.state.schedule.getEpoch(selected_slot));

if (self.state.contains(selected_epoch)) {
continue;
}

if (self.getLeaderSchedule(selected_slot)) |ls2| {
const ctx2 = EpochContext{ .staked_nodes = .{}, .leader_schedule = ls2 };
try self.state.put(selected_epoch, ctx2);
} else |e| if (selected_epoch == last_epoch) {
return e;
}
}
}

fn getLeaderSchedule(self: *Self, slot: sig.core.Slot) ![]const sig.core.Pubkey {
const response = try self.rpc_client.getLeaderSchedule(self.allocator, slot, .{});
defer response.deinit();
const rpc_schedule = try response.result();
const schedule = try leader_schedule.LeaderSchedule.fromMap(self.allocator, rpc_schedule);
return schedule.slot_leaders;
}
};
76 changes: 46 additions & 30 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -765,9 +765,6 @@ fn validator() !void {
errdefer schedule.deinit();
try leader_schedule_cache.put(loaded_snapshot.collapsed_manifest.bank_fields.epoch, schedule);
}
// This provider will fail at epoch boundary unless another thread updated the leader schedule cache
// i.e. called leader_schedule_cache.getSlotLeaderMaybeCompute(slot, bank_fields);
const leader_provider = leader_schedule_cache.slotLeaders();

// blockstore
var blockstore_db = try sig.ledger.BlockstoreDB.open(
Expand Down Expand Up @@ -817,6 +814,28 @@ fn validator() !void {

// shred networking
const my_contact_info = sig.gossip.data.ThreadSafeContactInfo.fromContactInfo(gossip_service.my_contact_info);

const epoch_schedule = loaded_snapshot.collapsed_manifest.bank_fields.epoch_schedule;
const epoch = loaded_snapshot.collapsed_manifest.bank_fields.epoch;
const staked_nodes = try loaded_snapshot.collapsed_manifest.bank_fields.getStakedNodes(allocator, epoch);

var epoch_context_manager = try sig.adapter.EpochContextManager.init(allocator, epoch_schedule);
try epoch_context_manager.put(epoch, .{
.staked_nodes = try staked_nodes.clone(allocator),
.leader_schedule = try LeaderSchedule
.fromStakedNodes(allocator, epoch, epoch_schedule.slots_per_epoch, staked_nodes),
});
var rpc_client = sig.rpc.Client.init(allocator, loaded_snapshot.genesis_config.cluster_type, .{});
defer rpc_client.deinit();
var rpc_epoch_ctx_service = sig.adapter.RpcEpochContextService
.init(allocator, app_base.logger.unscoped(), &epoch_context_manager, rpc_client);
const rpc_epoch_ctx_service_thread = try std.Thread.spawn(
.{},
sig.adapter.RpcEpochContextService.run,
.{ &rpc_epoch_ctx_service, &app_base.exit },
);

// shred collector
var shred_col_conf = config.current.shred_network;
shred_col_conf.start_slot = shred_col_conf.start_slot orelse loaded_snapshot.collapsed_manifest.bank_fields.slot;
var shred_network_manager = try sig.shred_network.start(
Expand All @@ -830,17 +849,16 @@ fn validator() !void {
.exit = &app_base.exit,
.gossip_table_rw = &gossip_service.gossip_table_rw,
.my_shred_version = &gossip_service.my_shred_version,
.leader_schedule = leader_provider,
.epoch_context_mgr = &epoch_context_manager,
.shred_inserter = shred_inserter,
.my_contact_info = my_contact_info,
.n_retransmit_threads = config.current.turbine.num_retransmit_threads,
.overwrite_turbine_stake_for_testing = config.current.turbine.overwrite_stake_for_testing,
.leader_schedule_cache = &leader_schedule_cache,
.bank_fields = &loaded_snapshot.collapsed_manifest.bank_fields,
},
);
defer shred_network_manager.deinit();

rpc_epoch_ctx_service_thread.join();
gossip_service.service_manager.join();
shred_network_manager.join();
}
Expand All @@ -853,6 +871,13 @@ fn shredCollector() !void {
app_base.deinit();
}

const genesis_path = try config.current.genesisFilePath() orelse
return error.GenesisPathNotProvided;
const genesis_config = try GenesisConfig.init(allocator, genesis_path);

var rpc_client = sig.rpc.Client.init(allocator, genesis_config.cluster_type, .{});
defer rpc_client.deinit();

const repair_port: u16 = config.current.shred_network.repair_port;
const turbine_recv_port: u16 = config.current.shred_network.turbine_recv_port;

Expand All @@ -866,26 +891,15 @@ fn shredCollector() !void {
allocator.destroy(gossip_service);
}

var loaded_snapshot = try loadSnapshot(allocator, app_base.logger.unscoped(), .{
.gossip_service = gossip_service,
.geyser_writer = null,
.validate_snapshot = true,
.metadata_only = config.current.accounts_db.snapshot_metadata_only,
});
defer loaded_snapshot.deinit();

// leader schedule
var leader_schedule_cache = LeaderScheduleCache.init(allocator, loaded_snapshot.collapsed_manifest.bank_fields.epoch_schedule);
if (try getLeaderScheduleFromCli(allocator)) |leader_schedule| {
try leader_schedule_cache.put(loaded_snapshot.collapsed_manifest.bank_fields.epoch, leader_schedule[1]);
} else {
const schedule = try loaded_snapshot.collapsed_manifest.bank_fields.leaderSchedule(allocator);
errdefer schedule.deinit();
try leader_schedule_cache.put(loaded_snapshot.collapsed_manifest.bank_fields.epoch, schedule);
}
// This provider will fail at epoch boundary unless another thread updated the leader schedule cache
// i.e. called leader_schedule_cache.getSlotLeaderMaybeCompute(slot, bank_fields);
const leader_provider = leader_schedule_cache.slotLeaders();
var epoch_context_manager = try sig.adapter.EpochContextManager
.init(allocator, genesis_config.epoch_schedule);
var rpc_epoch_ctx_service = sig.adapter.RpcEpochContextService
.init(allocator, app_base.logger.unscoped(), &epoch_context_manager, rpc_client);
const rpc_epoch_ctx_service_thread = try std.Thread.spawn(
.{},
sig.adapter.RpcEpochContextService.run,
.{ &rpc_epoch_ctx_service, &app_base.exit },
);

// blockstore
var blockstore_db = try sig.ledger.BlockstoreDB.open(
Expand Down Expand Up @@ -936,7 +950,10 @@ fn shredCollector() !void {

// shred networking
var shred_col_conf = config.current.shred_network;
shred_col_conf.start_slot = shred_col_conf.start_slot orelse @panic("No start slot found");
shred_col_conf.start_slot = shred_col_conf.start_slot orelse blk: {
const response = try rpc_client.getSlot(allocator, .{});
break :blk try response.result();
};
var shred_network_manager = try sig.shred_network.start(
shred_col_conf,
.{
Expand All @@ -948,17 +965,16 @@ fn shredCollector() !void {
.exit = &app_base.exit,
.gossip_table_rw = &gossip_service.gossip_table_rw,
.my_shred_version = &gossip_service.my_shred_version,
.leader_schedule = leader_provider,
.epoch_context_mgr = &epoch_context_manager,
.shred_inserter = shred_inserter,
.my_contact_info = my_contact_info,
.n_retransmit_threads = config.current.turbine.num_retransmit_threads,
.overwrite_turbine_stake_for_testing = config.current.turbine.overwrite_stake_for_testing,
.leader_schedule_cache = &leader_schedule_cache,
.bank_fields = &loaded_snapshot.collapsed_manifest.bank_fields,
},
);
defer shred_network_manager.deinit();

rpc_epoch_ctx_service_thread.join();
gossip_service.service_manager.join();
shred_network_manager.join();
}
Expand Down
28 changes: 5 additions & 23 deletions src/common/lru.zig
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
const std = @import("std");
const sig = @import("../sig.zig");

const Allocator = std.mem.Allocator;
const TailQueue = std.TailQueue;
const testing = std.testing;
const Mutex = std.Thread.Mutex;

const normalizeDeinitFunction = sig.sync.normalizeDeinitFunction;

pub const Kind = enum {
locking,
non_locking,
Expand All @@ -30,29 +34,7 @@ pub fn LruCacheCustom(
comptime DeinitContext: type,
comptime deinitFn_: anytype,
) type {
const deinitFn = switch (@TypeOf(deinitFn_)) {
fn (*V, DeinitContext) void => deinitFn_,

fn (V, DeinitContext) void => struct {
fn f(v: *V, ctx: DeinitContext) void {
deinitFn_(v.*, ctx);
}
}.f,

fn (V) void => struct {
fn f(v: *V, _: DeinitContext) void {
V.deinit(v.*);
}
}.f,

fn (*V) void => struct {
fn f(v: *V, _: DeinitContext) void {
V.deinit(v);
}
}.f,

else => @compileError("unsupported deinit function type"),
};
const deinitFn = normalizeDeinitFunction(V, DeinitContext, deinitFn_);
return struct {
mux: if (kind == .locking) Mutex else void,
allocator: Allocator,
Expand Down
16 changes: 16 additions & 0 deletions src/core/epoch_context.zig
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
const std = @import("std");
const core = @import("lib.zig");

/// constant data about a particular epoch.
/// this can be computed before the epoch begins, and does not change during the epoch
pub const EpochContext = struct {
/// the staked nodes for this particular cluster to use for the leader schedule and turbine tree
staked_nodes: std.AutoArrayHashMapUnmanaged(core.Pubkey, u64),
/// the leader schedule for this epoch
leader_schedule: []const core.Pubkey,

pub fn deinit(self: *EpochContext, allocator: std.mem.Allocator) void {
self.staked_nodes.deinit(allocator);
allocator.free(self.leader_schedule);
}
};
2 changes: 2 additions & 0 deletions src/core/lib.zig
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
pub const account = @import("account.zig");
pub const entry = @import("entry.zig");
pub const epoch_schedule = @import("epoch_schedule.zig");
pub const epoch_context = @import("epoch_context.zig");
pub const hard_forks = @import("hard_forks.zig");
pub const hash = @import("hash.zig");
pub const leader_schedule = @import("leader_schedule.zig");
Expand All @@ -13,6 +14,7 @@ pub const transaction = @import("transaction.zig");
pub const Account = account.Account;
pub const Entry = entry.Entry;
pub const EpochSchedule = epoch_schedule.EpochSchedule;
pub const EpochContext = epoch_context.EpochContext;
pub const HardForks = hard_forks.HardForks;
pub const HardFork = hard_forks.HardFork;
pub const Hash = hash.Hash;
Expand Down
2 changes: 1 addition & 1 deletion src/ledger/database/interface.zig
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ pub const BytesRef = struct {
switch (self) {
.allocator => |allocator| allocator.free(data),
.rocksdb => |func| func(@ptrCast(@constCast(data))),
.rc_slice => |allocator| sig.sync.RcSlice(u8).deinitPayload(data, allocator),
.rc_slice => |allocator| sig.sync.RcSlice(u8).fromPayload(data).deinit(allocator),
}
}
};
Expand Down
Loading