diff --git a/src/ledger/shred.zig b/src/ledger/shred.zig index 9d096a832..19338c34f 100644 --- a/src/ledger/shred.zig +++ b/src/ledger/shred.zig @@ -1048,16 +1048,20 @@ pub const ShredConstants = struct { }; pub const layout = struct { - pub const SIZE_OF_COMMON_SHRED_HEADER: usize = 83; - pub const SIZE_OF_DATA_SHRED_HEADERS: usize = 88; - pub const SIZE_OF_CODE_SHRED_HEADERS: usize = 89; - pub const SIZE_OF_SIGNATURE: usize = sig.core.Signature.size; - pub const SIZE_OF_SHRED_VARIANT: usize = 1; - pub const SIZE_OF_SHRED_SLOT: usize = 8; - - pub const OFFSET_OF_SHRED_VARIANT: usize = SIZE_OF_SIGNATURE; // 64 - pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT; // 64 + 1 = 65 - pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; // 65 + 8 = 73 + const SIZE_OF_COMMON_SHRED_HEADER: usize = 83; + const SIZE_OF_DATA_SHRED_HEADERS: usize = 88; + const SIZE_OF_CODE_SHRED_HEADERS: usize = 89; + const SIZE_OF_SIGNATURE: usize = sig.core.Signature.size; + const SIZE_OF_SHRED_VARIANT: usize = 1; + const SIZE_OF_SHRED_SLOT: usize = 8; + const SIZE_OF_INDEX: usize = 4; + const SIZE_OF_VERSION: usize = 2; + + const OFFSET_OF_SHRED_VARIANT: usize = SIZE_OF_SIGNATURE; // 64 + const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT; // 64 + 1 = 65 + const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; // 65 + 8 = 73 + const OFFSET_OF_ERASURE_SET_INDEX: usize = + OFFSET_OF_SHRED_INDEX + SIZE_OF_INDEX + SIZE_OF_VERSION; pub fn getShred(packet: *const Packet) ?[]const u8 { if (getShredSize(packet) > packet.data.len) return null; @@ -1096,7 +1100,7 @@ pub const layout = struct { return Signature.init(shred[0..SIZE_OF_SIGNATURE].*); } - pub fn getSignedData(shred: []const u8) ?Hash { + pub fn merkleRoot(shred: []const u8) ?Hash { const variant = getShredVariant(shred) orelse return null; const constants = switch (variant.shred_type) { .code => code_shred_constants, @@ -1105,6 +1109,10 @@ pub const layout = struct { return getMerkleRoot(shred, constants, variant) catch null; } + pub fn getErasureSetIndex(shred: []const u8) ?u32 { + return getInt(u32, shred, OFFSET_OF_ERASURE_SET_INDEX); + } + /// must be a data shred, otherwise the return value will be corrupted and meaningless pub fn getParentSlotOffset(shred: []const u8) ?u16 { std.debug.assert(getShredVariant(shred).?.shred_type == .data); @@ -1213,8 +1221,8 @@ test "getLeaderSignature" { try std.testing.expect(std.mem.eql(u8, &expected_signature, &signature.data)); } -test "getSignedData" { - const signed_data = layout.getSignedData(&test_data_shred).?; +test "layout.merkleRoot" { + const signed_data = layout.merkleRoot(&test_data_shred).?; const expected_signed_data = [_]u8{ 224, 241, 85, 253, 247, 62, 137, 179, 152, 192, 186, 203, 121, 194, 178, 130, 33, 181, 143, 156, 220, 150, 69, 197, 81, 97, 237, 11, 74, 156, 129, 134, diff --git a/src/ledger/shred_inserter/merkle_root_checks.zig b/src/ledger/shred_inserter/merkle_root_checks.zig index 8a6280491..2ac2a4e1b 100644 --- a/src/ledger/shred_inserter/merkle_root_checks.zig +++ b/src/ledger/shred_inserter/merkle_root_checks.zig @@ -3,322 +3,263 @@ const sig = @import("../../sig.zig"); const ledger = @import("../lib.zig"); const shred_inserter = @import("lib.zig"); -const schema = ledger.schema.schema; const shred_mod = sig.ledger.shred; const Allocator = std.mem.Allocator; -const AutoHashMap = std.AutoHashMap; const ErasureSetId = sig.ledger.shred.ErasureSetId; const Hash = sig.core.Hash; const Logger = sig.trace.Logger; const Slot = sig.core.Slot; -const SortedMap = sig.utils.collections.SortedMap; -const BlockstoreDB = ledger.blockstore.BlockstoreDB; const CodeShred = ledger.shred.CodeShred; const ErasureMeta = ledger.meta.ErasureMeta; -const MerkleRootMeta = ledger.meta.MerkleRootMeta; -const PossibleDuplicateShred = shred_inserter.working_state.PossibleDuplicateShred; const Shred = ledger.shred.Shred; const ShredId = ledger.shred.ShredId; -const WorkingEntry = shred_inserter.working_state.WorkingEntry; -const WorkingShredStore = shred_inserter.working_state.WorkingShredStore; -const key_serializer = ledger.database.key_serializer; -const value_serializer = ledger.database.value_serializer; +const DuplicateShredsWorkingStore = shred_inserter.working_state.DuplicateShredsWorkingStore; +const ErasureMetaWorkingStore = shred_inserter.working_state.ErasureMetaWorkingStore; +const MerkleRootMetaWorkingStore = shred_inserter.working_state.MerkleRootMetaWorkingStore; +const PendingInsertShredsState = shred_inserter.working_state.PendingInsertShredsState; +const ShredWorkingStore = shred_inserter.working_state.ShredWorkingStore; const newlinesToSpaces = sig.utils.fmt.newlinesToSpaces; -/// agave: check_merkle_root_consistency -pub fn checkMerkleRootConsistency( +pub const MerkleRootValidator = struct { + allocator: Allocator, logger: Logger, - db: *BlockstoreDB, - shred_store: WorkingShredStore, - slot: Slot, - merkle_root_meta: *const ledger.meta.MerkleRootMeta, - shred: *const Shred, - duplicate_shreds: *std.ArrayList(PossibleDuplicateShred), -) !bool { - const new_merkle_root = shred.merkleRoot() catch null; - if (new_merkle_root == null and merkle_root_meta.merkle_root == null or - new_merkle_root != null and merkle_root_meta.merkle_root != null and - std.mem.eql(u8, &merkle_root_meta.merkle_root.?.data, &new_merkle_root.?.data)) - { - // No conflict, either both merkle shreds with same merkle root - // or both legacy shreds with merkle_root `None` - return true; - } + shreds: ShredWorkingStore, + duplicate_shreds: DuplicateShredsWorkingStore, - logger.warn().logf(&newlinesToSpaces( - \\Received conflicting merkle roots for slot: {}, erasure_set: {any} original merkle - \\root meta {any} vs conflicting merkle root {any} shred index {} type {any}. Reporting - \\as duplicate - ), .{ - slot, - shred.commonHeader().erasureSetId(), - merkle_root_meta, - new_merkle_root, - shred.commonHeader().index, - shred, - }); + const Self = @This(); - if (!try db.contains(schema.duplicate_slots, slot)) { - // TODO this could be handled by caller (similar for chaining methods) - const shred_id = ShredId{ - .slot = slot, - .index = merkle_root_meta.first_received_shred_index, - .shred_type = merkle_root_meta.first_received_shred_type, + pub fn init(pending_state: *PendingInsertShredsState) Self { + return .{ + .allocator = pending_state.allocator, + .logger = pending_state.logger, + .shreds = pending_state.shreds(), + .duplicate_shreds = pending_state.duplicateShreds(), }; - if (try shred_store.get(shred_id)) |conflicting_shred| { - try duplicate_shreds.append(.{ - .MerkleRootConflict = .{ - .original = shred.*, // TODO lifetimes (cloned in rust) - .conflict = conflicting_shred, - }, - }); - } else { - logger.err().logf(&newlinesToSpaces( - \\Shred {any} indiciated by merkle root meta {any} is - \\missing from blockstore. This should only happen in extreme cases where - \\blockstore cleanup has caught up to the root. Skipping the merkle root - \\consistency check - ), .{ shred_id, merkle_root_meta }); - return true; - } } - return false; -} -/// Returns true if there is no chaining conflict between -/// the `shred` and `merkle_root_meta` of the next FEC set, -/// or if shreds from the next set are yet to be received. -/// -/// Otherwise return false and add duplicate proof to -/// `duplicate_shreds`. -/// -/// This is intended to be used right after `shred`'s `erasure_meta` -/// has been created for the first time. -/// -/// agave: check_forward_chained_merkle_root_consistency -pub fn checkForwardChainedMerkleRootConsistency( - allocator: Allocator, - logger: Logger, - db: *BlockstoreDB, - shred: CodeShred, - erasure_meta: ErasureMeta, - shred_store: WorkingShredStore, - merkle_root_metas: *AutoHashMap(ErasureSetId, WorkingEntry(MerkleRootMeta)), - duplicate_shreds: *std.ArrayList(PossibleDuplicateShred), -) !bool { - std.debug.assert(erasure_meta.checkCodeShred(shred)); - const slot = shred.common.slot; - const erasure_set_id = shred.common.erasureSetId(); - - // If a shred from the next fec set has already been inserted, check the chaining - const next_erasure_set_index = if (erasure_meta.nextErasureSetIndex()) |n| n else { - logger.err().logf( - "Invalid erasure meta, unable to compute next fec set index {any}", - .{erasure_meta}, - ); - return false; - }; - const next_erasure_set = ErasureSetId{ .slot = slot, .erasure_set_index = next_erasure_set_index }; - const next_merkle_root_meta = if (merkle_root_metas.get(next_erasure_set)) |nes| - nes.asRef().* - else if (try db.get(allocator, schema.merkle_root_meta, next_erasure_set)) |nes| - nes - else - // No shred from the next fec set has been received - return true; - - const next_shred_id = ShredId{ - .slot = slot, - .index = next_merkle_root_meta.first_received_shred_index, - .shred_type = next_merkle_root_meta.first_received_shred_type, - }; - const next_shred = if (try shred_store.get(next_shred_id)) |ns| - ns - else { - logger.err().logf(&newlinesToSpaces( - \\Shred {any} indicated by merkle root meta {any} \ - \\is missing from blockstore. This should only happen in extreme cases where \ - \\blockstore cleanup has caught up to the root. Skipping the forward chained \ - \\merkle root consistency check - ), .{ next_shred_id, next_merkle_root_meta }); - return true; - }; - const merkle_root = shred.merkleRoot() catch null; - const chained_merkle_root = shred_mod.layout.getChainedMerkleRoot(next_shred); + /// agave: check_merkle_root_consistency + pub fn checkConsistency( + self: Self, + slot: Slot, + merkle_root_meta: *const ledger.meta.MerkleRootMeta, + shred: *const Shred, + ) !bool { + const new_merkle_root = shred.merkleRoot() catch null; + if (new_merkle_root == null and merkle_root_meta.merkle_root == null or + new_merkle_root != null and merkle_root_meta.merkle_root != null and + std.mem.eql(u8, &merkle_root_meta.merkle_root.?.data, &new_merkle_root.?.data)) + { + // No conflict, either both merkle shreds with same merkle root + // or both legacy shreds with merkle_root `None` + return true; + } - if (!checkChaining(merkle_root, chained_merkle_root)) { - logger.warn().logf(&newlinesToSpaces( - \\Received conflicting chained merkle roots for slot: {}, shred \ - \\{any} type {any} has merkle root {any}, however next fec set \ - \\shred {any} type {any} chains to merkle root \ - \\{any}. Reporting as duplicate + self.logger.warn().logf(&newlinesToSpaces( + \\Received conflicting merkle roots for slot: {}, erasure_set: {any} original merkle + \\root meta {any} vs conflicting merkle root {any} shred index {} type {any}. Reporting + \\as duplicate ), .{ slot, - erasure_set_id, - shred.common.variant.shred_type, - merkle_root, - next_erasure_set, - next_merkle_root_meta.first_received_shred_type, - chained_merkle_root, + shred.commonHeader().erasureSetId(), + merkle_root_meta, + new_merkle_root, + shred.commonHeader().index, + shred, }); - if (!try db.contains(schema.duplicate_slots, slot)) { - // TODO lifetime - try duplicate_shreds.append(.{ .ChainedMerkleRootConflict = .{ - .original = .{ .code = shred }, - .conflict = next_shred, - } }); + + if (!try self.duplicate_shreds.contains(slot)) { + // TODO this could be handled by caller (similar for chaining methods) + const shred_id = ShredId{ + .slot = slot, + .index = merkle_root_meta.first_received_shred_index, + .shred_type = merkle_root_meta.first_received_shred_type, + }; + if (try self.shreds.get(shred_id)) |conflicting_shred| { + try self.duplicate_shreds.append(.{ + .MerkleRootConflict = .{ + .original = shred.*, // TODO lifetimes (cloned in rust) + .conflict = conflicting_shred, + }, + }); + } else { + self.logger.err().logf(&newlinesToSpaces( + \\Shred {any} indiciated by merkle root meta {any} is + \\missing from blockstore. This should only happen in extreme cases where + \\blockstore cleanup has caught up to the root. Skipping the merkle root + \\consistency check + ), .{ shred_id, merkle_root_meta }); + return true; + } } return false; } - return true; -} + /// Returns true if there is no chaining conflict between + /// the `shred` and `merkle_root_meta` of the next FEC set, + /// or if shreds from the next set are yet to be received. + /// + /// Otherwise return false and add duplicate proof to + /// `duplicate_shreds`. + /// + /// This is intended to be used right after `shred`'s `erasure_meta` + /// has been created for the first time. + /// + /// agave: check_forward_chained_merkle_root_consistency + pub fn checkForwardChaining( + self: Self, + shred: CodeShred, + erasure_meta: ErasureMeta, + merkle_root_metas: MerkleRootMetaWorkingStore, + ) !bool { + std.debug.assert(erasure_meta.checkCodeShred(shred)); + const slot = shred.common.slot; -/// Returns true if there is no chaining conflict between -/// the `shred` and `merkle_root_meta` of the previous FEC set, -/// or if shreds from the previous set are yet to be received. -/// -/// Otherwise return false and add duplicate proof to -/// `duplicate_shreds`. -/// -/// This is intended to be used right after `shred`'s `merkle_root_meta` -/// has been created for the first time. -/// -/// agave: check_backwards_chained_merkle_root_consistency -pub fn checkBackwardsChainedMerkleRootConsistency( - allocator: Allocator, - logger: Logger, - db: *BlockstoreDB, - shred: Shred, - shred_store: WorkingShredStore, - erasure_metas: *SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), // BTreeMap in agave - duplicate_shreds: *std.ArrayList(PossibleDuplicateShred), -) !bool { - const slot = shred.commonHeader().slot; - const erasure_set_id = shred.commonHeader().erasureSetId(); - const erasure_set_index = shred.commonHeader().erasure_set_index; + // If a shred from the next fec set has already been inserted, check the chaining + const next_erasure_set_index = if (erasure_meta.nextErasureSetIndex()) |n| n else { + self.logger.err().logf( + "Invalid erasure meta, unable to compute next fec set index {any}", + .{erasure_meta}, + ); + return false; + }; + const next_erasure_set = + ErasureSetId{ .slot = slot, .erasure_set_index = next_erasure_set_index }; + const next_merkle_root_meta = try merkle_root_metas.get(next_erasure_set) orelse { + // No shred from the next fec set has been received + return true; + }; - if (erasure_set_index == 0) { - // Although the first fec set chains to the last fec set of the parent block, - // if this chain is incorrect we do not know which block is the duplicate until votes - // are received. We instead delay this check until the block reaches duplicate - // confirmation. - return true; + const next_shred_id = ShredId{ + .slot = slot, + .index = next_merkle_root_meta.first_received_shred_index, + .shred_type = next_merkle_root_meta.first_received_shred_type, + }; + + return self.checkAndReportChaining(.forward, .{ .code = shred }, next_shred_id); } - // If a shred from the previous fec set has already been inserted, check the chaining. - // Since we cannot compute the previous fec set index, we check the in memory map, otherwise - // check the previous key from blockstore to see if it is consecutive with our current set. - const prev_erasure_set, const prev_erasure_meta = - if (try previousErasureSet(allocator, db, erasure_set_id, erasure_metas)) |pes| - pes - else - // No shreds from the previous erasure batch have been received, - // so nothing to check. Once the previous erasure batch is received, - // we will verify this chain through the forward check above. - return true; + /// Returns true if there is no chaining conflict between + /// the `shred` and `merkle_root_meta` of the previous FEC set, + /// or if shreds from the previous set are yet to be received. + /// + /// Otherwise return false and add duplicate proof to + /// `duplicate_shreds`. + /// + /// This is intended to be used right after `shred`'s `merkle_root_meta` + /// has been created for the first time. + /// + /// agave: check_backwards_chained_merkle_root_consistency + pub fn checkBackwardChaining( + self: Self, + shred: Shred, + erasure_metas: ErasureMetaWorkingStore, + ) !bool { + const slot = shred.commonHeader().slot; + const erasure_set_id = shred.commonHeader().erasureSetId(); + const erasure_set_index = shred.commonHeader().erasure_set_index; - const prev_shred_id = ShredId{ - .slot = slot, - .index = @intCast(prev_erasure_meta.first_received_code_index), - .shred_type = .code, - }; - const prev_shred = - if (try shred_store.get(prev_shred_id)) |ps| ps else { - logger.warn().logf(&newlinesToSpaces( - \\Shred {any} indicated by the erasure meta {any} \ - \\is missing from blockstore. This can happen if you have recently upgraded \ - \\from a version < v1.18.13, or if blockstore cleanup has caught up to the root. \ - \\Skipping the backwards chained merkle root consistency check - ), .{ prev_shred_id, prev_erasure_meta }); - return true; - }; - const merkle_root = shred_mod.layout.getChainedMerkleRoot(prev_shred); - const chained_merkle_root = shred.chainedMerkleRoot() catch null; + if (erasure_set_index == 0) { + // Although the first fec set chains to the last fec set of the parent block, + // if this chain is incorrect we do not know which block is the duplicate until votes + // are received. We instead delay this check until the block reaches duplicate + // confirmation. + return true; + } - if (!checkChaining(merkle_root, chained_merkle_root)) { - logger.warn().logf(&newlinesToSpaces( - \\Received conflicting chained merkle roots for slot: {}, shred {any} type {any} \ - \\chains to merkle root {any}, however previous fec set code \ - \\shred {any} has merkle root {any}. Reporting as duplicate - ), .{ - slot, - shred.commonHeader().erasureSetId(), - shred.commonHeader().variant.shred_type, - chained_merkle_root, - prev_erasure_set, - merkle_root, - }); - } + // If a shred from the previous fec set has already been inserted, check the chaining. + // Since we cannot compute the previous fec set index, we check the in memory map, otherwise + // check the previous key from blockstore to see if it is consecutive with our current set. + _, const prev_erasure_meta = if (try erasure_metas.previousSet(erasure_set_id)) |pes| + pes + else + // No shreds from the previous erasure batch have been received, + // so nothing to check. Once the previous erasure batch is received, + // we will verify this chain through the forward check above. + return true; + + const prev_shred_id = ShredId{ + .slot = slot, + .index = @intCast(prev_erasure_meta.first_received_code_index), + .shred_type = .code, + }; - if (!try db.contains(schema.duplicate_slots, slot)) { - // TODO lifetime - try duplicate_shreds.append(.{ .ChainedMerkleRootConflict = .{ - .original = shred, - .conflict = prev_shred, - } }); + return self.checkAndReportChaining(.backward, shred, prev_shred_id); } - return true; -} + /// The input shreds must be from adjacent erasure sets in the same slot, + /// or this function will not work correctly. + fn checkAndReportChaining( + self: Self, + direction: enum { forward, backward }, + shred: Shred, + other_shred_id: ShredId, + ) !bool { + const other_shred = if (try self.shreds.get(other_shred_id)) |other_shred| + other_shred + else { + self.logger.warn().logf( + "Shred {any} is missing from blockstore. " ++ + "This can happen if blockstore cleanup has caught up to the root. " ++ + "Skipping the {} chained merkle root consistency check.", + .{ other_shred_id, direction }, + ); + return true; + }; -/// agave: previous_erasure_set -fn previousErasureSet( - allocator: Allocator, - db: *BlockstoreDB, - erasure_set: ErasureSetId, - erasure_metas: *SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), -) !?struct { ErasureSetId, ErasureMeta } { // TODO: agave uses CoW here - const slot = erasure_set.slot; - const erasure_set_index = erasure_set.erasure_set_index; + const older_shred, const newer_shred = switch (direction) { + .forward => .{ shred.payload(), other_shred }, + .backward => .{ other_shred, shred.payload() }, + }; - // Check the previous entry from the in memory map to see if it is the consecutive - // set to `erasure set` - const id_range, const meta_range = erasure_metas.range( - .{ .slot = slot, .erasure_set_index = 0 }, - erasure_set, - ); - if (id_range.len != 0) { - const i = id_range.len - 1; - const last_meta = meta_range[i].asRef(); - if (@as(u32, @intCast(erasure_set_index)) == last_meta.nextErasureSetIndex()) { - return .{ id_range[i], last_meta.* }; + const chained_merkle_root = shred_mod.layout.getChainedMerkleRoot(newer_shred); + const early_merkle_root = shred_mod.layout.merkleRoot(older_shred) orelse { + return error.NoMerkleRoot; + }; + + if (chainedMerkleRootIsConsistent(early_merkle_root, chained_merkle_root)) { + return true; } - } - // Consecutive set was not found in memory, scan blockstore for a potential candidate - var iter = try db.iterator(schema.erasure_meta, .reverse, erasure_set); - defer iter.deinit(); - const candidate_set: ErasureSetId, // - const candidate: ErasureMeta // - = while (try iter.nextBytes()) |entry| { - defer for (entry) |e| e.deinit(); - const key = try key_serializer.deserialize(ErasureSetId, allocator, entry[0].data); - if (key.slot != slot) return null; - if (key.erasure_set_index != erasure_set_index) break .{ - key, - try value_serializer.deserialize(ErasureMeta, allocator, entry[1].data), - }; - } else return null; + const slot = other_shred_id.slot; - // Check if this is actually the consecutive erasure set - const next = if (candidate.nextErasureSetIndex()) |n| n else return error.InvalidErasureConfig; - return if (next == erasure_set_index) - .{ candidate_set, candidate } - else - return null; -} + self.logger.warn().logf( + "Received conflicting chained merkle roots for slot: {}. Reporting as duplicate. " ++ + \\Conflicting shreds: + \\ erasure set: {?}, type: {?}, index: {?}, merkle root: {any} + \\ erasure set: {?}, type: {?}, index: {?}, chained merkle root: {any} + , + .{ + slot, + // early shred + shred_mod.layout.getErasureSetIndex(older_shred), + if (shred_mod.layout.getShredVariant(older_shred)) |v| v.shred_type else null, + shred_mod.layout.getIndex(older_shred), + early_merkle_root, + // late shred + shred_mod.layout.getErasureSetIndex(newer_shred), + if (shred_mod.layout.getShredVariant(newer_shred)) |v| v.shred_type else null, + shred_mod.layout.getIndex(newer_shred), + chained_merkle_root, + }, + ); + + if (!try self.duplicate_shreds.contains(slot)) { + try self.duplicate_shreds.append(.{ + .ChainedMerkleRootConflict = .{ .original = shred, .conflict = other_shred }, + }); + } + + return false; + } +}; /// agave: check_chaining -fn checkChaining( - merkle_root: ?Hash, - chained_merkle_root: ?Hash, -) bool { +fn chainedMerkleRootIsConsistent(merkle_root: Hash, chained_merkle_root: ?Hash) bool { return chained_merkle_root == null or // Chained merkle roots have not been enabled yet sig.utils.types.eql(chained_merkle_root, merkle_root); } diff --git a/src/ledger/shred_inserter/shred_inserter.zig b/src/ledger/shred_inserter/shred_inserter.zig index 1d3aed439..8d427aabb 100644 --- a/src/ledger/shred_inserter/shred_inserter.zig +++ b/src/ledger/shred_inserter/shred_inserter.zig @@ -30,10 +30,11 @@ const Timer = sig.time.Timer; const BlockstoreDB = ledger.blockstore.BlockstoreDB; const IndexMetaWorkingSetEntry = lib.working_state.IndexMetaWorkingSetEntry; +const MerkleRootValidator = lib.merkle_root_checks.MerkleRootValidator; const PendingInsertShredsState = lib.working_state.PendingInsertShredsState; const PossibleDuplicateShred = lib.working_state.PossibleDuplicateShred; const WorkingEntry = lib.working_state.WorkingEntry; -const WorkingShredStore = lib.working_state.WorkingShredStore; +const ShredWorkingStore = lib.working_state.ShredWorkingStore; const WriteBatch = BlockstoreDB.WriteBatch; const ErasureMeta = meta.ErasureMeta; @@ -42,11 +43,6 @@ const MerkleRootMeta = meta.MerkleRootMeta; const ShredIndex = meta.ShredIndex; const SlotMeta = meta.SlotMeta; -const checkForwardChainedMerkleRootConsistency = - lib.merkle_root_checks.checkForwardChainedMerkleRootConsistency; -const checkBackwardsChainedMerkleRootConsistency = - lib.merkle_root_checks.checkBackwardsChainedMerkleRootConsistency; -const checkMerkleRootConsistency = lib.merkle_root_checks.checkMerkleRootConsistency; const handleChaining = lib.slot_chaining.handleChaining; const recover = lib.recovery.recover; const newlinesToSpaces = sig.utils.fmt.newlinesToSpaces; @@ -175,6 +171,7 @@ pub const ShredInserter = struct { ); defer state.deinit(); var write_batch = state.write_batch; + const merkle_root_validator = MerkleRootValidator.init(&state); var get_lock_timer = try Timer.start(); self.lock.lock(); @@ -194,6 +191,7 @@ pub const ShredInserter = struct { if (self.checkInsertDataShred( data_shred, &state, + merkle_root_validator, &write_batch, is_trusted, leader_schedule, @@ -224,6 +222,7 @@ pub const ShredInserter = struct { _ = try self.checkInsertCodeShred( code_shred, &state, + merkle_root_validator, &write_batch, is_trusted, shred_source, @@ -245,7 +244,7 @@ pub const ShredInserter = struct { const recovered_shreds = try self.tryShredRecovery( &state.erasure_metas, &state.index_working_set, - state.shredStore(), + state.shreds(), &reed_solomon_cache, ); defer { @@ -277,6 +276,7 @@ pub const ShredInserter = struct { if (self.checkInsertDataShred( shred.data, &state, + merkle_root_validator, &write_batch, is_trusted, leader_schedule, @@ -337,15 +337,10 @@ pub const ShredInserter = struct { // unreachable: Erasure meta was just created, initial shred must exist const shred = state.just_inserted_shreds.get(shred_id) orelse unreachable; // TODO: agave discards the result here. should we also? - _ = try checkForwardChainedMerkleRootConsistency( - allocator, - self.logger, - &self.db, + _ = try merkle_root_validator.checkForwardChaining( shred.code, erasure_meta, - state.shredStore(), - &state.merkle_root_metas, - &state.duplicate_shreds, + state.merkleRootMetas(), ); }; @@ -371,15 +366,7 @@ pub const ShredInserter = struct { // unreachable: Merkle root meta was just created, initial shred must exist const shred = state.just_inserted_shreds.get(shred_id) orelse unreachable; // TODO: agave discards the result here. should we also? - _ = try checkBackwardsChainedMerkleRootConsistency( - allocator, - self.logger, - &self.db, - shred, - state.shredStore(), - &state.erasure_metas, - &state.duplicate_shreds, - ); + _ = try merkle_root_validator.checkBackwardChaining(shred, state.erasureMetas()); } self.metrics.merkle_chaining_elapsed_us.add(merkle_chaining_timer.read().asMicros()); @@ -404,6 +391,7 @@ pub const ShredInserter = struct { self: *Self, shred: CodeShred, state: *PendingInsertShredsState, + merkle_root_validator: MerkleRootValidator, write_batch: *WriteBatch, is_trusted: bool, shred_source: ShredSource, @@ -414,9 +402,15 @@ pub const ShredInserter = struct { const index_meta = &index_meta_working_set_entry.index; const erasure_set_id = shred.common.erasureSetId(); - try state.loadMerkleRootMeta(erasure_set_id); + try state.merkleRootMetas().load(erasure_set_id); - if (!try self.shouldInsertCodeShred(state, shred, index_meta, is_trusted)) { + if (!try self.shouldInsertCodeShred( + state, + merkle_root_validator, + shred, + index_meta, + is_trusted, + )) { return false; } @@ -430,7 +424,7 @@ pub const ShredInserter = struct { if (was_inserted) { index_meta_working_set_entry.did_insert_occur = true; self.metrics.num_inserted.inc(); - try state.initMerkleRootMetaIfMissing(erasure_set_id, shred); + try state.merkleRootMetas().initIfMissing(erasure_set_id, shred); } // NOTE: it's not accurate to say the shred was "just inserted" if was_inserted is false, @@ -449,6 +443,7 @@ pub const ShredInserter = struct { fn shouldInsertCodeShred( self: *Self, state: *PendingInsertShredsState, + merkle_root_validator: MerkleRootValidator, shred: CodeShred, index_meta: *const Index, is_trusted: bool, @@ -479,14 +474,10 @@ pub const ShredInserter = struct { // A previous shred has been inserted in this batch or in blockstore // Compare our current shred against the previous shred for potential // conflicts - if (!try checkMerkleRootConsistency( - self.logger, - &self.db, - state.shredStore(), + if (!try merkle_root_validator.checkConsistency( shred.common.slot, merkle_root_meta.asRef(), &.{ .code = shred }, - &state.duplicate_shreds, )) { return false; } @@ -498,7 +489,7 @@ pub const ShredInserter = struct { // NOTE perf: maybe this can be skipped for trusted shreds. // agave runs this regardless of trust, but we can check if it has // a meaningful performance impact to skip this for trusted shreds. - const erasure_meta = try state.getOrPutErasureMeta(erasure_set_id, shred); + const erasure_meta = try state.erasureMetas().getOrPut(erasure_set_id, shred); if (!erasure_meta.checkCodeShred(shred)) { self.metrics.num_code_shreds_invalid_erasure_config.inc(); try self.recordShredConflict(state, shred, erasure_meta); @@ -523,7 +514,7 @@ pub const ShredInserter = struct { // erasure set, but not this one. is it worth persisting this one as well? if (!try self.hasDuplicateShredsInSlot(slot)) { if (try findConflictingCodeShred( - state.shredStore(), + state.shreds(), shred, slot, erasure_meta, @@ -571,7 +562,7 @@ pub const ShredInserter = struct { /// agave: find_conflicting_coding_shred fn findConflictingCodeShred( - shred_store: WorkingShredStore, + shred_store: ShredWorkingStore, _: CodeShred, // TODO: figure out why this is here. delete it or add what is missing. slot: Slot, erasure_meta: *const ErasureMeta, @@ -594,6 +585,7 @@ pub const ShredInserter = struct { self: *Self, shred: DataShred, state: *PendingInsertShredsState, + merkle_root_validator: MerkleRootValidator, write_batch: *WriteBatch, is_trusted: bool, leader_schedule: ?SlotLeaderProvider, @@ -609,7 +601,7 @@ pub const ShredInserter = struct { const slot_meta = &slot_meta_entry.new_slot_meta; const erasure_set_id = shred.common.erasureSetId(); - try state.loadMerkleRootMeta(erasure_set_id); + try state.merkleRootMetas().load(erasure_set_id); if (!is_trusted) { if (isDataShredPresent(shred, slot_meta, &index_meta.data_index)) { @@ -640,7 +632,7 @@ pub const ShredInserter = struct { if (!try self.shouldInsertDataShred( shred, slot_meta, - state.shredStore(), + state.shreds(), self.max_root.load(.acquire), leader_schedule, shred_source, @@ -653,14 +645,10 @@ pub const ShredInserter = struct { // A previous shred has been inserted in this batch or in blockstore // Compare our current shred against the previous shred for potential // conflicts - if (!try checkMerkleRootConsistency( - self.logger, - &self.db, - state.shredStore(), + if (!try merkle_root_validator.checkConsistency( slot, merkle_root_meta.asRef(), &shred_union, - &state.duplicate_shreds, )) { return error.InvalidShred; } @@ -674,12 +662,12 @@ pub const ShredInserter = struct { write_batch, shred_source, ); - try state.initMerkleRootMetaIfMissing(erasure_set_id, shred); + try state.merkleRootMetas().initIfMissing(erasure_set_id, shred); try state.just_inserted_shreds.put(shred.id(), shred_union); // TODO check first? index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; - try state.loadErasureMeta(erasure_set_id); + try state.erasureMetas().load(erasure_set_id); return newly_completed_data_sets; } @@ -716,7 +704,7 @@ pub const ShredInserter = struct { self: *Self, shred: DataShred, slot_meta: *const SlotMeta, - shred_store: WorkingShredStore, + shred_store: ShredWorkingStore, max_root: Slot, leader_schedule: ?SlotLeaderProvider, shred_source: ShredSource, @@ -853,7 +841,7 @@ pub const ShredInserter = struct { self: *Self, erasure_metas: *SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), index_working_set: *AutoHashMap(u64, IndexMetaWorkingSetEntry), - shred_store: WorkingShredStore, + shred_store: ShredWorkingStore, reed_solomon_cache: *ReedSolomonCache, ) !ArrayList(Shred) { // Recovery rules: @@ -892,7 +880,7 @@ pub const ShredInserter = struct { self: *Self, index: *const Index, erasure_meta: *const ErasureMeta, - shred_store: WorkingShredStore, + shred_store: ShredWorkingStore, reed_solomon_cache: *ReedSolomonCache, ) !std.ArrayList(Shred) { var available_shreds = ArrayList(Shred).init(self.allocator); @@ -938,7 +926,7 @@ pub const ShredInserter = struct { index: *const ShredIndex, slot: Slot, shred_indices: [2]u64, - shred_store: WorkingShredStore, + shred_store: ShredWorkingStore, available_shreds: *ArrayList(Shred), ) !void { for (shred_indices[0]..shred_indices[1]) |i| { @@ -1193,6 +1181,7 @@ const ShredInserterTestState = struct { return try self.inserter.checkInsertCodeShred( shred.code, state, + MerkleRootValidator.init(state), write_batch, false, .turbine, diff --git a/src/ledger/shred_inserter/working_state.zig b/src/ledger/shred_inserter/working_state.zig index ee69a9247..46dad1b10 100644 --- a/src/ledger/shred_inserter/working_state.zig +++ b/src/ledger/shred_inserter/working_state.zig @@ -121,24 +121,6 @@ pub const PendingInsertShredsState = struct { self.write_batch.deinit(); } - pub fn getOrPutErasureMeta( - self: *Self, - erasure_set_id: ErasureSetId, - code_shred: CodeShred, - ) !*const ErasureMeta { - const erasure_meta_entry = try self.erasure_metas.getOrPut(erasure_set_id); - if (!erasure_meta_entry.found_existing) { - if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { - erasure_meta_entry.value_ptr.* = .{ .clean = meta_ }; - } else { - erasure_meta_entry.value_ptr.* = .{ - .dirty = ErasureMeta.fromCodeShred(code_shred) orelse return error.Unwrap, - }; - } - } - return erasure_meta_entry.value_ptr.asRef(); - } - /// agave: get_index_meta_entry pub fn getIndexMetaEntry(self: *Self, slot: Slot) !*IndexMetaWorkingSetEntry { var timer = try Timer.start(); @@ -185,7 +167,7 @@ pub const PendingInsertShredsState = struct { return entry.value_ptr; } - pub fn shredStore(self: *Self) WorkingShredStore { + pub fn shreds(self: *Self) ShredWorkingStore { return .{ .logger = self.logger, .db = self.db, @@ -193,36 +175,24 @@ pub const PendingInsertShredsState = struct { }; } - // TODO: should this actually be called externally? - // consider moving this logic into a getOrPut-style method - pub fn loadErasureMeta(self: *Self, erasure_set_id: ErasureSetId) !void { - if (!self.erasure_metas.contains(erasure_set_id)) { - if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { - try self.erasure_metas.put(erasure_set_id, .{ .clean = meta_ }); - } - } + pub fn erasureMetas(self: *Self) ErasureMetaWorkingStore { + return .{ + .allocator = self.allocator, + .db = self.db, + .working_entries = &self.erasure_metas, + }; } - // TODO: should this actually be called externally? - // consider moving this logic into a getOrPut-style method - pub fn loadMerkleRootMeta(self: *Self, erasure_set_id: ErasureSetId) !void { - if (!self.merkle_root_metas.contains(erasure_set_id)) { - if (try self.db.get(self.allocator, schema.merkle_root_meta, erasure_set_id)) |meta_| { - try self.merkle_root_metas.put(erasure_set_id, .{ .clean = meta_ }); - } - } + pub fn merkleRootMetas(self: *Self) MerkleRootMetaWorkingStore { + return .{ + .allocator = self.allocator, + .db = self.db, + .working_entries = &self.merkle_root_metas, + }; } - // TODO: should this actually be called externally? - pub fn initMerkleRootMetaIfMissing( - self: *Self, - erasure_set_id: ErasureSetId, - shred: anytype, - ) !void { - const entry = try self.merkle_root_metas.getOrPut(erasure_set_id); - if (!entry.found_existing) { - entry.value_ptr.* = .{ .dirty = MerkleRootMeta.fromFirstReceivedShred(shred) }; - } + pub fn duplicateShreds(self: *Self) DuplicateShredsWorkingStore { + return .{ .db = self.db, .duplicate_shreds = &self.duplicate_shreds }; } pub fn commit(self: *Self) !void { @@ -316,6 +286,144 @@ pub const PendingInsertShredsState = struct { } }; +pub const MerkleRootMetaWorkingStore = struct { + allocator: Allocator, + db: *BlockstoreDB, + working_entries: *AutoHashMap(ErasureSetId, WorkingEntry(MerkleRootMeta)), + + const Self = @This(); + + pub fn get(self: Self, id: ErasureSetId) !?MerkleRootMeta { + return if (self.working_entries.get(id)) |nes| + nes.asRef().* + else + try self.db.get(self.allocator, schema.merkle_root_meta, id); + } + + // TODO: should this actually be called externally? + // consider moving this logic into a getOrPut-style method + pub fn load(self: Self, erasure_set_id: ErasureSetId) !void { + if (!self.working_entries.contains(erasure_set_id)) { + if (try self.db.get(self.allocator, schema.merkle_root_meta, erasure_set_id)) |meta_| { + try self.working_entries.put(erasure_set_id, .{ .clean = meta_ }); + } + } + } + + // TODO: should this actually be called externally? + pub fn initIfMissing(self: Self, erasure_set_id: ErasureSetId, shred: anytype) !void { + const entry = try self.working_entries.getOrPut(erasure_set_id); + if (!entry.found_existing) { + entry.value_ptr.* = .{ .dirty = MerkleRootMeta.fromFirstReceivedShred(shred) }; + } + } +}; + +pub const ErasureMetaWorkingStore = struct { + allocator: Allocator, + db: *BlockstoreDB, + working_entries: *SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), + + const Self = @This(); + + pub fn get(self: Self, id: ErasureSetId) !?MerkleRootMeta { + return if (self.working_entries.get(id)) |nes| + nes.asRef().* + else if (try self.db.get(self.allocator, schema.erasure_meta, id)) |nes| + nes; + } + + pub fn getOrPut( + self: Self, + erasure_set_id: ErasureSetId, + code_shred: CodeShred, + ) !*const ErasureMeta { + const erasure_meta_entry = try self.working_entries.getOrPut(erasure_set_id); + if (!erasure_meta_entry.found_existing) { + if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { + erasure_meta_entry.value_ptr.* = .{ .clean = meta_ }; + } else { + erasure_meta_entry.value_ptr.* = .{ + .dirty = ErasureMeta.fromCodeShred(code_shred) orelse return error.Unwrap, + }; + } + } + return erasure_meta_entry.value_ptr.asRef(); + } + + // TODO: should this actually be called externally? + // consider moving this logic into a getOrPut-style method + pub fn load(self: Self, erasure_set_id: ErasureSetId) !void { + if (!self.working_entries.contains(erasure_set_id)) { + if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { + try self.working_entries.put(erasure_set_id, .{ .clean = meta_ }); + } + } + } + + /// agave: previous_erasure_set + pub fn previousSet( + self: Self, + erasure_set: ErasureSetId, + ) !?struct { ErasureSetId, ErasureMeta } { // TODO: agave uses CoW here + const slot = erasure_set.slot; + const erasure_set_index = erasure_set.erasure_set_index; + + // Check the previous entry from the in memory map to see if it is the consecutive + // set to `erasure set` + const id_range, const meta_range = self.working_entries.range( + .{ .slot = slot, .erasure_set_index = 0 }, + erasure_set, + ); + if (id_range.len != 0) { + const i = id_range.len - 1; + const last_meta = meta_range[i].asRef(); + if (@as(u32, @intCast(erasure_set_index)) == last_meta.nextErasureSetIndex()) { + return .{ id_range[i], last_meta.* }; + } + } + + // Consecutive set was not found in memory, scan blockstore for a potential candidate + const key_serializer = ledger.database.key_serializer; + const value_serializer = ledger.database.value_serializer; + var iter = try self.db.iterator(schema.erasure_meta, .reverse, erasure_set); + defer iter.deinit(); + const candidate_set: ErasureSetId, // + const candidate: ErasureMeta // + = while (try iter.nextBytes()) |entry| { + defer for (entry) |e| e.deinit(); + const key = try key_serializer.deserialize(ErasureSetId, self.allocator, entry[0].data); + if (key.slot != slot) return null; + if (key.erasure_set_index != erasure_set_index) break .{ + key, + try value_serializer.deserialize(ErasureMeta, self.allocator, entry[1].data), + }; + } else return null; + + // Check if this is actually the consecutive erasure set + const next = if (candidate.nextErasureSetIndex()) |n| n else return error.InvalidErasureConfig; + return if (next == erasure_set_index) + .{ candidate_set, candidate } + else + return null; + } +}; + +pub const DuplicateShredsWorkingStore = struct { + db: *BlockstoreDB, + duplicate_shreds: *std.ArrayList(PossibleDuplicateShred), + + const Self = DuplicateShredsWorkingStore; + + pub fn contains(self: Self, slot: Slot) !bool { + return try self.db.contains(schema.duplicate_slots, slot); + } + + pub fn append(self: Self, dupe: PossibleDuplicateShred) !void { + try self.duplicate_shreds.append(dupe); + } +}; + pub fn WorkingEntry(comptime T: type) type { return union(enum) { // Value has been modified with respect to the blockstore column @@ -380,7 +488,7 @@ const ShredConflict = struct { conflict: []const u8, }; -pub const WorkingShredStore = struct { +pub const ShredWorkingStore = struct { logger: sig.trace.Logger, db: *BlockstoreDB, just_inserted_shreds: *const AutoHashMap(ShredId, Shred), diff --git a/src/shred_collector/shred_receiver.zig b/src/shred_collector/shred_receiver.zig index 5cc1f89e4..43876c56b 100644 --- a/src/shred_collector/shred_receiver.zig +++ b/src/shred_collector/shred_receiver.zig @@ -193,7 +193,7 @@ fn validateShred( // https://github.com/solana-labs/solana/pull/35076 _ = layout.getLeaderSignature(shred) orelse return error.signature_missing; - _ = layout.getSignedData(shred) orelse return error.signed_data_missing; + _ = layout.merkleRoot(shred) orelse return error.signed_data_missing; } /// TODO: this may need to move to blockstore diff --git a/src/shred_collector/shred_verifier.zig b/src/shred_collector/shred_verifier.zig index 0aaad2b9d..dd2255893 100644 --- a/src/shred_collector/shred_verifier.zig +++ b/src/shred_collector/shred_verifier.zig @@ -51,7 +51,7 @@ fn verifyShred( const shred = shred_layout.getShred(packet) orelse return error.insufficient_shred_size; const slot = shred_layout.getSlot(shred) orelse return error.slot_missing; const signature = shred_layout.getLeaderSignature(shred) orelse return error.signature_missing; - const signed_data = shred_layout.getSignedData(shred) orelse return error.signed_data_missing; + const signed_data = shred_layout.merkleRoot(shred) orelse return error.signed_data_missing; const leader = leader_schedule.call(slot) orelse return error.leader_unknown; _ = signature.verify(leader, &signed_data.data) or return error.failed_verification;