Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(leader_schedule): SlotLeaderProvider from PointerClosure to direct SlotLeaders struct #457

Merged
merged 3 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/cmd/cmd.zig
Original file line number Diff line number Diff line change
Expand Up @@ -767,7 +767,7 @@ fn validator() !void {
}
// This provider will fail at epoch boundary unless another thread updated the leader schedule cache
// i.e. called leader_schedule_cache.getSlotLeaderMaybeCompute(slot, bank_fields);
const leader_provider = leader_schedule_cache.slotLeaderProvider();
const leader_provider = leader_schedule_cache.slotLeaders();

// blockstore
var blockstore_db = try sig.ledger.BlockstoreDB.open(
Expand Down Expand Up @@ -885,7 +885,7 @@ fn shredCollector() !void {
}
// This provider will fail at epoch boundary unless another thread updated the leader schedule cache
// i.e. called leader_schedule_cache.getSlotLeaderMaybeCompute(slot, bank_fields);
const leader_provider = leader_schedule_cache.slotLeaderProvider();
const leader_provider = leader_schedule_cache.slotLeaders();

// blockstore
var blockstore_db = try sig.ledger.BlockstoreDB.open(
Expand Down
32 changes: 27 additions & 5 deletions src/core/leader_schedule.zig
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,40 @@ const RwMux = sig.sync.RwMux;
pub const NUM_CONSECUTIVE_LEADER_SLOTS: u64 = 4;
pub const MAX_CACHED_LEADER_SCHEDULES: usize = 10;

pub const SlotLeaderProvider = sig.utils.closure.PointerClosure(Slot, ?Pubkey);
/// interface to express a dependency on slot leaders
pub const SlotLeaders = struct {
state: *anyopaque,
getFn: *const fn (*anyopaque, Slot) ?Pubkey,

pub fn init(
state: anytype,
getSlotLeader: fn (@TypeOf(state), Slot) ?Pubkey,
) SlotLeaders {
return .{
.state = state,
.getFn = struct {
fn genericFn(generic_state: *anyopaque, slot: Slot) ?Pubkey {
return getSlotLeader(@alignCast(@ptrCast(generic_state)), slot);
}
}.genericFn,
};
}

pub fn get(self: SlotLeaders, slot: Slot) ?Pubkey {
return self.getFn(self.state, slot);
}
};

/// LeaderScheduleCache is a cache of leader schedules for each epoch.
/// Leader schedules are expensive to compute, so this cache is used to avoid
/// recomputing leader schedules for the same epoch.
/// LeaderScheduleCache also keeps a copy of the epoch_schedule so that it can
/// compute epoch and slot index from a slot.
/// NOTE: This struct is not really a 'cache', we should consider renaming it
/// to a SlotLeaderProvider and maybe even moving it outside of the core module.
/// to a SlotLeaders and maybe even moving it outside of the core module.
/// This more accurately describes the purpose of this struct as caching is a means
/// to an end, not the end itself. It may then follow that we could remove the
/// above pointer closure in favor of passing the SlotLeaderProvider directly.
/// above pointer closure in favor of passing the SlotLeaders directly.
pub const LeaderScheduleCache = struct {
epoch_schedule: EpochSchedule,
leader_schedules: RwMux(std.AutoArrayHashMap(Epoch, LeaderSchedule)),
Expand All @@ -41,8 +63,8 @@ pub const LeaderScheduleCache = struct {
};
}

pub fn slotLeaderProvider(self: *Self) SlotLeaderProvider {
return SlotLeaderProvider.init(self, LeaderScheduleCache.slotLeader);
pub fn slotLeaders(self: *Self) SlotLeaders {
return SlotLeaders.init(self, LeaderScheduleCache.slotLeader);
}

pub fn put(self: *Self, epoch: Epoch, leader_schedule: LeaderSchedule) !void {
Expand Down
30 changes: 15 additions & 15 deletions src/ledger/shred_inserter/shred_inserter.zig
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const CodeShred = ledger.shred.CodeShred;
const DataShred = ledger.shred.DataShred;
const ReedSolomonCache = lib.recovery.ReedSolomonCache;
const ShredId = ledger.shred.ShredId;
const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider;
const SlotLeaders = sig.core.leader_schedule.SlotLeaders;
const SortedSet = sig.utils.collections.SortedSet;
const SortedMap = sig.utils.collections.SortedMap;
const Timer = sig.time.Timer;
Expand Down Expand Up @@ -145,7 +145,7 @@ pub const ShredInserter = struct {
self: *Self,
shreds: []const Shred,
is_repaired: []const bool,
leader_schedule: ?SlotLeaderProvider,
maybe_slot_leaders: ?SlotLeaders,
is_trusted: bool,
retransmit_sender: ?PointerClosure([]const []const u8, void),
) !InsertShredsResult {
Expand Down Expand Up @@ -195,7 +195,7 @@ pub const ShredInserter = struct {
merkle_root_validator,
write_batch,
is_trusted,
leader_schedule,
maybe_slot_leaders,
shred_source,
)) |completed_data_sets| {
if (is_repair) {
Expand Down Expand Up @@ -239,7 +239,7 @@ pub const ShredInserter = struct {
var shred_recovery_timer = try Timer.start();
var valid_recovered_shreds = ArrayList([]const u8).init(allocator);
defer valid_recovered_shreds.deinit();
if (leader_schedule) |slot_leader_provider| {
if (maybe_slot_leaders) |slot_leaders| {
var reed_solomon_cache = try ReedSolomonCache.init(allocator);
defer reed_solomon_cache.deinit();
const recovered_shreds = try self.tryShredRecovery(
Expand All @@ -259,7 +259,7 @@ pub const ShredInserter = struct {
if (shred == .data) {
self.metrics.num_recovered.inc();
}
const leader = slot_leader_provider.call(shred.commonHeader().slot);
const leader = slot_leaders.get(shred.commonHeader().slot);
if (leader == null) {
continue;
}
Expand All @@ -280,7 +280,7 @@ pub const ShredInserter = struct {
merkle_root_validator,
write_batch,
is_trusted,
leader_schedule,
maybe_slot_leaders,
.recovered,
)) |completed_data_sets| {
defer completed_data_sets.deinit();
Expand Down Expand Up @@ -590,7 +590,7 @@ pub const ShredInserter = struct {
merkle_root_validator: MerkleRootValidator,
write_batch: *WriteBatch,
is_trusted: bool,
leader_schedule: ?SlotLeaderProvider,
leader_schedule: ?SlotLeaders,
shred_source: ShredSource,
) !ArrayList(CompletedDataSetInfo) {
const slot = shred.common.slot;
Expand Down Expand Up @@ -708,7 +708,7 @@ pub const ShredInserter = struct {
slot_meta: *const SlotMeta,
shred_store: ShredWorkingStore,
max_root: Slot,
leader_schedule: ?SlotLeaderProvider,
leader_schedule: ?SlotLeaders,
shred_source: ShredSource,
duplicate_shreds: *ArrayList(PossibleDuplicateShred),
) !bool {
Expand Down Expand Up @@ -975,8 +975,8 @@ fn verifyShredSlots(slot: Slot, parent: Slot, root: Slot) bool {
return root <= parent and parent < slot;
}

fn slotLeader(provider: ?SlotLeaderProvider, slot: Slot) ?Pubkey {
return if (provider) |p| if (p.call(slot)) |l| l else null else null;
fn slotLeader(provider: ?SlotLeaders, slot: Slot) ?Pubkey {
return if (provider) |p| if (p.get(slot)) |l| l else null else null;
}

/// update_slot_meta
Expand Down Expand Up @@ -1486,7 +1486,7 @@ test "recovery" {
const data_shreds = shreds[0..34];
const code_shreds = shreds[34..68];

var leader_schedule = OneSlotLeaderProvider{
var leader_schedule = OneSlotLeaders{
.leader = try Pubkey.fromString("2iWGQbhdWWAA15KTBJuqvAxCdKmEvY26BoFRBU4419Sn"),
};

Expand All @@ -1512,15 +1512,15 @@ test "recovery" {
// TODO: verify index integrity
}

const OneSlotLeaderProvider = struct {
const OneSlotLeaders = struct {
leader: Pubkey,

fn getLeader(self: *OneSlotLeaderProvider, _: Slot) ?Pubkey {
fn getLeader(self: *OneSlotLeaders, _: Slot) ?Pubkey {
return self.leader;
}

fn provider(self: *OneSlotLeaderProvider) SlotLeaderProvider {
return SlotLeaderProvider.init(self, OneSlotLeaderProvider.getLeader);
fn provider(self: *OneSlotLeaders) SlotLeaders {
return SlotLeaders.init(self, OneSlotLeaders.getLeader);
}
};

Expand Down
4 changes: 2 additions & 2 deletions src/shred_network/service.zig
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const RwMux = sig.sync.RwMux;
const Registry = sig.prometheus.Registry;
const ServiceManager = sig.utils.service_manager.ServiceManager;
const Slot = sig.core.Slot;
const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider;
const SlotLeaders = sig.core.leader_schedule.SlotLeaders;
const LeaderScheduleCache = sig.core.leader_schedule.LeaderScheduleCache;

const BasicShredTracker = shred_network.shred_tracker.BasicShredTracker;
Expand Down Expand Up @@ -52,7 +52,7 @@ pub const ShredCollectorDependencies = struct {
/// Shared state that is read from gossip
my_shred_version: *const Atomic(u16),
my_contact_info: ThreadSafeContactInfo,
leader_schedule: SlotLeaderProvider,
leader_schedule: SlotLeaders,
shred_inserter: sig.ledger.ShredInserter,
n_retransmit_threads: ?usize,
overwrite_turbine_stake_for_testing: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/shred_network/shred_processor.zig
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ pub fn runShredProcessor(
verified_shred_receiver: *Channel(Packet),
tracker: *BasicShredTracker,
shred_inserter_: ShredInserter,
leader_schedule: sig.core.leader_schedule.SlotLeaderProvider,
leader_schedule: sig.core.leader_schedule.SlotLeaders,
) !void {
const logger = logger_.withScope(LOG_SCOPE);
var shred_inserter = shred_inserter_;
Expand Down
8 changes: 4 additions & 4 deletions src/shred_network/shred_verifier.zig
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ const Counter = sig.prometheus.Counter;
const Histogram = sig.prometheus.Histogram;
const Packet = sig.net.Packet;
const Registry = sig.prometheus.Registry;
const SlotLeaderProvider = sig.core.leader_schedule.SlotLeaderProvider;
const SlotLeaders = sig.core.leader_schedule.SlotLeaders;
const VariantCounter = sig.prometheus.VariantCounter;

const VerifiedMerkleRoots = sig.common.lru.LruCache(.non_locking, sig.core.Hash, void);
Expand All @@ -25,7 +25,7 @@ pub fn runShredVerifier(
verified_shred_sender: *Channel(Packet),
/// me --> retransmit service
maybe_retransmit_shred_sender: ?*Channel(Packet),
leader_schedule: SlotLeaderProvider,
leader_schedule: SlotLeaders,
) !void {
const metrics = try registry.initStruct(Metrics);
var verified_merkle_roots = try VerifiedMerkleRoots.init(std.heap.c_allocator, 1024);
Expand Down Expand Up @@ -53,7 +53,7 @@ pub fn runShredVerifier(
/// Analogous to [verify_shred_cpu](https://github.com/anza-xyz/agave/blob/83e7d84bcc4cf438905d07279bc07e012a49afd9/ledger/src/sigverify_shreds.rs#L35)
fn verifyShred(
packet: *const Packet,
leader_schedule: SlotLeaderProvider,
leader_schedule: SlotLeaders,
verified_merkle_roots: *VerifiedMerkleRoots,
metrics: Metrics,
) ShredVerificationFailure!void {
Expand All @@ -66,7 +66,7 @@ fn verifyShred(
return;
}
metrics.cache_miss_count.inc();
const leader = leader_schedule.call(slot) orelse return error.leader_unknown;
const leader = leader_schedule.get(slot) orelse return error.leader_unknown;
const valid = signature.verify(leader, &signed_data.data) catch
return error.failed_verification;
if (!valid) return error.failed_verification;
Expand Down
Loading