From 988fe6eb9363d39c76a4aa87da0ba54c6863abe6 Mon Sep 17 00:00:00 2001 From: Drew Nutter Date: Mon, 4 Nov 2024 14:36:43 +0000 Subject: [PATCH] fix(ledger): backward merkle root chaining bugs (#347) ## bug fixes Previously, backwards merkle root chaining checks were: - ignoring the actual result of the check - always marked it as a duplicate (as if the check failed) - always returned true (as if the check succeeded) - comparing chained root to chained root, which is wrong. it should be chained to actual root ## refactoring consolidate common chaining logic into single function "checkAndReportChaining" Improve dependency management by: - organizing merkle root check functions into a struct since they all share some common dependencies. this simplifies the function signatures and keeps things standardized and consistent - utilizing working stores. - this is better than passing down the granular transitive dependencies (database and hashmap) separately because it clarifies what this code actually needs. all it really needs is some kind of store for erasure metas. it's not actually important in this context that it's managed through a negotiation between database and hashmap calls. this is the whole point of the pending state, to abstract away that negotiation. passing down all the transitive dependencies makes the function parameters a mess, and it becomes unclear why the code actually needs any of the dependencies. - this is better than passing down the whole pending state struct because it reduces the number of dependencies and it clarifies what is actually needed, rather than pretending the entire state is needed. ## change in behavior merkle root checks now return an error if the merkle root is missing, instead of ignoring it, because that means there is a severe bug in the validator and it needs to be shutdown and patched immediately. --- src/ledger/shred.zig | 34 +- .../shred_inserter/merkle_root_checks.zig | 479 ++++++++---------- src/ledger/shred_inserter/shred_inserter.zig | 83 ++- src/ledger/shred_inserter/working_state.zig | 200 ++++++-- src/shred_collector/shred_receiver.zig | 2 +- src/shred_collector/shred_verifier.zig | 2 +- 6 files changed, 423 insertions(+), 377 deletions(-) diff --git a/src/ledger/shred.zig b/src/ledger/shred.zig index 9d096a832..19338c34f 100644 --- a/src/ledger/shred.zig +++ b/src/ledger/shred.zig @@ -1048,16 +1048,20 @@ pub const ShredConstants = struct { }; pub const layout = struct { - pub const SIZE_OF_COMMON_SHRED_HEADER: usize = 83; - pub const SIZE_OF_DATA_SHRED_HEADERS: usize = 88; - pub const SIZE_OF_CODE_SHRED_HEADERS: usize = 89; - pub const SIZE_OF_SIGNATURE: usize = sig.core.Signature.size; - pub const SIZE_OF_SHRED_VARIANT: usize = 1; - pub const SIZE_OF_SHRED_SLOT: usize = 8; - - pub const OFFSET_OF_SHRED_VARIANT: usize = SIZE_OF_SIGNATURE; // 64 - pub const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT; // 64 + 1 = 65 - pub const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; // 65 + 8 = 73 + const SIZE_OF_COMMON_SHRED_HEADER: usize = 83; + const SIZE_OF_DATA_SHRED_HEADERS: usize = 88; + const SIZE_OF_CODE_SHRED_HEADERS: usize = 89; + const SIZE_OF_SIGNATURE: usize = sig.core.Signature.size; + const SIZE_OF_SHRED_VARIANT: usize = 1; + const SIZE_OF_SHRED_SLOT: usize = 8; + const SIZE_OF_INDEX: usize = 4; + const SIZE_OF_VERSION: usize = 2; + + const OFFSET_OF_SHRED_VARIANT: usize = SIZE_OF_SIGNATURE; // 64 + const OFFSET_OF_SHRED_SLOT: usize = SIZE_OF_SIGNATURE + SIZE_OF_SHRED_VARIANT; // 64 + 1 = 65 + const OFFSET_OF_SHRED_INDEX: usize = OFFSET_OF_SHRED_SLOT + SIZE_OF_SHRED_SLOT; // 65 + 8 = 73 + const OFFSET_OF_ERASURE_SET_INDEX: usize = + OFFSET_OF_SHRED_INDEX + SIZE_OF_INDEX + SIZE_OF_VERSION; pub fn getShred(packet: *const Packet) ?[]const u8 { if (getShredSize(packet) > packet.data.len) return null; @@ -1096,7 +1100,7 @@ pub const layout = struct { return Signature.init(shred[0..SIZE_OF_SIGNATURE].*); } - pub fn getSignedData(shred: []const u8) ?Hash { + pub fn merkleRoot(shred: []const u8) ?Hash { const variant = getShredVariant(shred) orelse return null; const constants = switch (variant.shred_type) { .code => code_shred_constants, @@ -1105,6 +1109,10 @@ pub const layout = struct { return getMerkleRoot(shred, constants, variant) catch null; } + pub fn getErasureSetIndex(shred: []const u8) ?u32 { + return getInt(u32, shred, OFFSET_OF_ERASURE_SET_INDEX); + } + /// must be a data shred, otherwise the return value will be corrupted and meaningless pub fn getParentSlotOffset(shred: []const u8) ?u16 { std.debug.assert(getShredVariant(shred).?.shred_type == .data); @@ -1213,8 +1221,8 @@ test "getLeaderSignature" { try std.testing.expect(std.mem.eql(u8, &expected_signature, &signature.data)); } -test "getSignedData" { - const signed_data = layout.getSignedData(&test_data_shred).?; +test "layout.merkleRoot" { + const signed_data = layout.merkleRoot(&test_data_shred).?; const expected_signed_data = [_]u8{ 224, 241, 85, 253, 247, 62, 137, 179, 152, 192, 186, 203, 121, 194, 178, 130, 33, 181, 143, 156, 220, 150, 69, 197, 81, 97, 237, 11, 74, 156, 129, 134, diff --git a/src/ledger/shred_inserter/merkle_root_checks.zig b/src/ledger/shred_inserter/merkle_root_checks.zig index 8a6280491..2ac2a4e1b 100644 --- a/src/ledger/shred_inserter/merkle_root_checks.zig +++ b/src/ledger/shred_inserter/merkle_root_checks.zig @@ -3,322 +3,263 @@ const sig = @import("../../sig.zig"); const ledger = @import("../lib.zig"); const shred_inserter = @import("lib.zig"); -const schema = ledger.schema.schema; const shred_mod = sig.ledger.shred; const Allocator = std.mem.Allocator; -const AutoHashMap = std.AutoHashMap; const ErasureSetId = sig.ledger.shred.ErasureSetId; const Hash = sig.core.Hash; const Logger = sig.trace.Logger; const Slot = sig.core.Slot; -const SortedMap = sig.utils.collections.SortedMap; -const BlockstoreDB = ledger.blockstore.BlockstoreDB; const CodeShred = ledger.shred.CodeShred; const ErasureMeta = ledger.meta.ErasureMeta; -const MerkleRootMeta = ledger.meta.MerkleRootMeta; -const PossibleDuplicateShred = shred_inserter.working_state.PossibleDuplicateShred; const Shred = ledger.shred.Shred; const ShredId = ledger.shred.ShredId; -const WorkingEntry = shred_inserter.working_state.WorkingEntry; -const WorkingShredStore = shred_inserter.working_state.WorkingShredStore; -const key_serializer = ledger.database.key_serializer; -const value_serializer = ledger.database.value_serializer; +const DuplicateShredsWorkingStore = shred_inserter.working_state.DuplicateShredsWorkingStore; +const ErasureMetaWorkingStore = shred_inserter.working_state.ErasureMetaWorkingStore; +const MerkleRootMetaWorkingStore = shred_inserter.working_state.MerkleRootMetaWorkingStore; +const PendingInsertShredsState = shred_inserter.working_state.PendingInsertShredsState; +const ShredWorkingStore = shred_inserter.working_state.ShredWorkingStore; const newlinesToSpaces = sig.utils.fmt.newlinesToSpaces; -/// agave: check_merkle_root_consistency -pub fn checkMerkleRootConsistency( +pub const MerkleRootValidator = struct { + allocator: Allocator, logger: Logger, - db: *BlockstoreDB, - shred_store: WorkingShredStore, - slot: Slot, - merkle_root_meta: *const ledger.meta.MerkleRootMeta, - shred: *const Shred, - duplicate_shreds: *std.ArrayList(PossibleDuplicateShred), -) !bool { - const new_merkle_root = shred.merkleRoot() catch null; - if (new_merkle_root == null and merkle_root_meta.merkle_root == null or - new_merkle_root != null and merkle_root_meta.merkle_root != null and - std.mem.eql(u8, &merkle_root_meta.merkle_root.?.data, &new_merkle_root.?.data)) - { - // No conflict, either both merkle shreds with same merkle root - // or both legacy shreds with merkle_root `None` - return true; - } + shreds: ShredWorkingStore, + duplicate_shreds: DuplicateShredsWorkingStore, - logger.warn().logf(&newlinesToSpaces( - \\Received conflicting merkle roots for slot: {}, erasure_set: {any} original merkle - \\root meta {any} vs conflicting merkle root {any} shred index {} type {any}. Reporting - \\as duplicate - ), .{ - slot, - shred.commonHeader().erasureSetId(), - merkle_root_meta, - new_merkle_root, - shred.commonHeader().index, - shred, - }); + const Self = @This(); - if (!try db.contains(schema.duplicate_slots, slot)) { - // TODO this could be handled by caller (similar for chaining methods) - const shred_id = ShredId{ - .slot = slot, - .index = merkle_root_meta.first_received_shred_index, - .shred_type = merkle_root_meta.first_received_shred_type, + pub fn init(pending_state: *PendingInsertShredsState) Self { + return .{ + .allocator = pending_state.allocator, + .logger = pending_state.logger, + .shreds = pending_state.shreds(), + .duplicate_shreds = pending_state.duplicateShreds(), }; - if (try shred_store.get(shred_id)) |conflicting_shred| { - try duplicate_shreds.append(.{ - .MerkleRootConflict = .{ - .original = shred.*, // TODO lifetimes (cloned in rust) - .conflict = conflicting_shred, - }, - }); - } else { - logger.err().logf(&newlinesToSpaces( - \\Shred {any} indiciated by merkle root meta {any} is - \\missing from blockstore. This should only happen in extreme cases where - \\blockstore cleanup has caught up to the root. Skipping the merkle root - \\consistency check - ), .{ shred_id, merkle_root_meta }); - return true; - } } - return false; -} -/// Returns true if there is no chaining conflict between -/// the `shred` and `merkle_root_meta` of the next FEC set, -/// or if shreds from the next set are yet to be received. -/// -/// Otherwise return false and add duplicate proof to -/// `duplicate_shreds`. -/// -/// This is intended to be used right after `shred`'s `erasure_meta` -/// has been created for the first time. -/// -/// agave: check_forward_chained_merkle_root_consistency -pub fn checkForwardChainedMerkleRootConsistency( - allocator: Allocator, - logger: Logger, - db: *BlockstoreDB, - shred: CodeShred, - erasure_meta: ErasureMeta, - shred_store: WorkingShredStore, - merkle_root_metas: *AutoHashMap(ErasureSetId, WorkingEntry(MerkleRootMeta)), - duplicate_shreds: *std.ArrayList(PossibleDuplicateShred), -) !bool { - std.debug.assert(erasure_meta.checkCodeShred(shred)); - const slot = shred.common.slot; - const erasure_set_id = shred.common.erasureSetId(); - - // If a shred from the next fec set has already been inserted, check the chaining - const next_erasure_set_index = if (erasure_meta.nextErasureSetIndex()) |n| n else { - logger.err().logf( - "Invalid erasure meta, unable to compute next fec set index {any}", - .{erasure_meta}, - ); - return false; - }; - const next_erasure_set = ErasureSetId{ .slot = slot, .erasure_set_index = next_erasure_set_index }; - const next_merkle_root_meta = if (merkle_root_metas.get(next_erasure_set)) |nes| - nes.asRef().* - else if (try db.get(allocator, schema.merkle_root_meta, next_erasure_set)) |nes| - nes - else - // No shred from the next fec set has been received - return true; - - const next_shred_id = ShredId{ - .slot = slot, - .index = next_merkle_root_meta.first_received_shred_index, - .shred_type = next_merkle_root_meta.first_received_shred_type, - }; - const next_shred = if (try shred_store.get(next_shred_id)) |ns| - ns - else { - logger.err().logf(&newlinesToSpaces( - \\Shred {any} indicated by merkle root meta {any} \ - \\is missing from blockstore. This should only happen in extreme cases where \ - \\blockstore cleanup has caught up to the root. Skipping the forward chained \ - \\merkle root consistency check - ), .{ next_shred_id, next_merkle_root_meta }); - return true; - }; - const merkle_root = shred.merkleRoot() catch null; - const chained_merkle_root = shred_mod.layout.getChainedMerkleRoot(next_shred); + /// agave: check_merkle_root_consistency + pub fn checkConsistency( + self: Self, + slot: Slot, + merkle_root_meta: *const ledger.meta.MerkleRootMeta, + shred: *const Shred, + ) !bool { + const new_merkle_root = shred.merkleRoot() catch null; + if (new_merkle_root == null and merkle_root_meta.merkle_root == null or + new_merkle_root != null and merkle_root_meta.merkle_root != null and + std.mem.eql(u8, &merkle_root_meta.merkle_root.?.data, &new_merkle_root.?.data)) + { + // No conflict, either both merkle shreds with same merkle root + // or both legacy shreds with merkle_root `None` + return true; + } - if (!checkChaining(merkle_root, chained_merkle_root)) { - logger.warn().logf(&newlinesToSpaces( - \\Received conflicting chained merkle roots for slot: {}, shred \ - \\{any} type {any} has merkle root {any}, however next fec set \ - \\shred {any} type {any} chains to merkle root \ - \\{any}. Reporting as duplicate + self.logger.warn().logf(&newlinesToSpaces( + \\Received conflicting merkle roots for slot: {}, erasure_set: {any} original merkle + \\root meta {any} vs conflicting merkle root {any} shred index {} type {any}. Reporting + \\as duplicate ), .{ slot, - erasure_set_id, - shred.common.variant.shred_type, - merkle_root, - next_erasure_set, - next_merkle_root_meta.first_received_shred_type, - chained_merkle_root, + shred.commonHeader().erasureSetId(), + merkle_root_meta, + new_merkle_root, + shred.commonHeader().index, + shred, }); - if (!try db.contains(schema.duplicate_slots, slot)) { - // TODO lifetime - try duplicate_shreds.append(.{ .ChainedMerkleRootConflict = .{ - .original = .{ .code = shred }, - .conflict = next_shred, - } }); + + if (!try self.duplicate_shreds.contains(slot)) { + // TODO this could be handled by caller (similar for chaining methods) + const shred_id = ShredId{ + .slot = slot, + .index = merkle_root_meta.first_received_shred_index, + .shred_type = merkle_root_meta.first_received_shred_type, + }; + if (try self.shreds.get(shred_id)) |conflicting_shred| { + try self.duplicate_shreds.append(.{ + .MerkleRootConflict = .{ + .original = shred.*, // TODO lifetimes (cloned in rust) + .conflict = conflicting_shred, + }, + }); + } else { + self.logger.err().logf(&newlinesToSpaces( + \\Shred {any} indiciated by merkle root meta {any} is + \\missing from blockstore. This should only happen in extreme cases where + \\blockstore cleanup has caught up to the root. Skipping the merkle root + \\consistency check + ), .{ shred_id, merkle_root_meta }); + return true; + } } return false; } - return true; -} + /// Returns true if there is no chaining conflict between + /// the `shred` and `merkle_root_meta` of the next FEC set, + /// or if shreds from the next set are yet to be received. + /// + /// Otherwise return false and add duplicate proof to + /// `duplicate_shreds`. + /// + /// This is intended to be used right after `shred`'s `erasure_meta` + /// has been created for the first time. + /// + /// agave: check_forward_chained_merkle_root_consistency + pub fn checkForwardChaining( + self: Self, + shred: CodeShred, + erasure_meta: ErasureMeta, + merkle_root_metas: MerkleRootMetaWorkingStore, + ) !bool { + std.debug.assert(erasure_meta.checkCodeShred(shred)); + const slot = shred.common.slot; -/// Returns true if there is no chaining conflict between -/// the `shred` and `merkle_root_meta` of the previous FEC set, -/// or if shreds from the previous set are yet to be received. -/// -/// Otherwise return false and add duplicate proof to -/// `duplicate_shreds`. -/// -/// This is intended to be used right after `shred`'s `merkle_root_meta` -/// has been created for the first time. -/// -/// agave: check_backwards_chained_merkle_root_consistency -pub fn checkBackwardsChainedMerkleRootConsistency( - allocator: Allocator, - logger: Logger, - db: *BlockstoreDB, - shred: Shred, - shred_store: WorkingShredStore, - erasure_metas: *SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), // BTreeMap in agave - duplicate_shreds: *std.ArrayList(PossibleDuplicateShred), -) !bool { - const slot = shred.commonHeader().slot; - const erasure_set_id = shred.commonHeader().erasureSetId(); - const erasure_set_index = shred.commonHeader().erasure_set_index; + // If a shred from the next fec set has already been inserted, check the chaining + const next_erasure_set_index = if (erasure_meta.nextErasureSetIndex()) |n| n else { + self.logger.err().logf( + "Invalid erasure meta, unable to compute next fec set index {any}", + .{erasure_meta}, + ); + return false; + }; + const next_erasure_set = + ErasureSetId{ .slot = slot, .erasure_set_index = next_erasure_set_index }; + const next_merkle_root_meta = try merkle_root_metas.get(next_erasure_set) orelse { + // No shred from the next fec set has been received + return true; + }; - if (erasure_set_index == 0) { - // Although the first fec set chains to the last fec set of the parent block, - // if this chain is incorrect we do not know which block is the duplicate until votes - // are received. We instead delay this check until the block reaches duplicate - // confirmation. - return true; + const next_shred_id = ShredId{ + .slot = slot, + .index = next_merkle_root_meta.first_received_shred_index, + .shred_type = next_merkle_root_meta.first_received_shred_type, + }; + + return self.checkAndReportChaining(.forward, .{ .code = shred }, next_shred_id); } - // If a shred from the previous fec set has already been inserted, check the chaining. - // Since we cannot compute the previous fec set index, we check the in memory map, otherwise - // check the previous key from blockstore to see if it is consecutive with our current set. - const prev_erasure_set, const prev_erasure_meta = - if (try previousErasureSet(allocator, db, erasure_set_id, erasure_metas)) |pes| - pes - else - // No shreds from the previous erasure batch have been received, - // so nothing to check. Once the previous erasure batch is received, - // we will verify this chain through the forward check above. - return true; + /// Returns true if there is no chaining conflict between + /// the `shred` and `merkle_root_meta` of the previous FEC set, + /// or if shreds from the previous set are yet to be received. + /// + /// Otherwise return false and add duplicate proof to + /// `duplicate_shreds`. + /// + /// This is intended to be used right after `shred`'s `merkle_root_meta` + /// has been created for the first time. + /// + /// agave: check_backwards_chained_merkle_root_consistency + pub fn checkBackwardChaining( + self: Self, + shred: Shred, + erasure_metas: ErasureMetaWorkingStore, + ) !bool { + const slot = shred.commonHeader().slot; + const erasure_set_id = shred.commonHeader().erasureSetId(); + const erasure_set_index = shred.commonHeader().erasure_set_index; - const prev_shred_id = ShredId{ - .slot = slot, - .index = @intCast(prev_erasure_meta.first_received_code_index), - .shred_type = .code, - }; - const prev_shred = - if (try shred_store.get(prev_shred_id)) |ps| ps else { - logger.warn().logf(&newlinesToSpaces( - \\Shred {any} indicated by the erasure meta {any} \ - \\is missing from blockstore. This can happen if you have recently upgraded \ - \\from a version < v1.18.13, or if blockstore cleanup has caught up to the root. \ - \\Skipping the backwards chained merkle root consistency check - ), .{ prev_shred_id, prev_erasure_meta }); - return true; - }; - const merkle_root = shred_mod.layout.getChainedMerkleRoot(prev_shred); - const chained_merkle_root = shred.chainedMerkleRoot() catch null; + if (erasure_set_index == 0) { + // Although the first fec set chains to the last fec set of the parent block, + // if this chain is incorrect we do not know which block is the duplicate until votes + // are received. We instead delay this check until the block reaches duplicate + // confirmation. + return true; + } - if (!checkChaining(merkle_root, chained_merkle_root)) { - logger.warn().logf(&newlinesToSpaces( - \\Received conflicting chained merkle roots for slot: {}, shred {any} type {any} \ - \\chains to merkle root {any}, however previous fec set code \ - \\shred {any} has merkle root {any}. Reporting as duplicate - ), .{ - slot, - shred.commonHeader().erasureSetId(), - shred.commonHeader().variant.shred_type, - chained_merkle_root, - prev_erasure_set, - merkle_root, - }); - } + // If a shred from the previous fec set has already been inserted, check the chaining. + // Since we cannot compute the previous fec set index, we check the in memory map, otherwise + // check the previous key from blockstore to see if it is consecutive with our current set. + _, const prev_erasure_meta = if (try erasure_metas.previousSet(erasure_set_id)) |pes| + pes + else + // No shreds from the previous erasure batch have been received, + // so nothing to check. Once the previous erasure batch is received, + // we will verify this chain through the forward check above. + return true; + + const prev_shred_id = ShredId{ + .slot = slot, + .index = @intCast(prev_erasure_meta.first_received_code_index), + .shred_type = .code, + }; - if (!try db.contains(schema.duplicate_slots, slot)) { - // TODO lifetime - try duplicate_shreds.append(.{ .ChainedMerkleRootConflict = .{ - .original = shred, - .conflict = prev_shred, - } }); + return self.checkAndReportChaining(.backward, shred, prev_shred_id); } - return true; -} + /// The input shreds must be from adjacent erasure sets in the same slot, + /// or this function will not work correctly. + fn checkAndReportChaining( + self: Self, + direction: enum { forward, backward }, + shred: Shred, + other_shred_id: ShredId, + ) !bool { + const other_shred = if (try self.shreds.get(other_shred_id)) |other_shred| + other_shred + else { + self.logger.warn().logf( + "Shred {any} is missing from blockstore. " ++ + "This can happen if blockstore cleanup has caught up to the root. " ++ + "Skipping the {} chained merkle root consistency check.", + .{ other_shred_id, direction }, + ); + return true; + }; -/// agave: previous_erasure_set -fn previousErasureSet( - allocator: Allocator, - db: *BlockstoreDB, - erasure_set: ErasureSetId, - erasure_metas: *SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), -) !?struct { ErasureSetId, ErasureMeta } { // TODO: agave uses CoW here - const slot = erasure_set.slot; - const erasure_set_index = erasure_set.erasure_set_index; + const older_shred, const newer_shred = switch (direction) { + .forward => .{ shred.payload(), other_shred }, + .backward => .{ other_shred, shred.payload() }, + }; - // Check the previous entry from the in memory map to see if it is the consecutive - // set to `erasure set` - const id_range, const meta_range = erasure_metas.range( - .{ .slot = slot, .erasure_set_index = 0 }, - erasure_set, - ); - if (id_range.len != 0) { - const i = id_range.len - 1; - const last_meta = meta_range[i].asRef(); - if (@as(u32, @intCast(erasure_set_index)) == last_meta.nextErasureSetIndex()) { - return .{ id_range[i], last_meta.* }; + const chained_merkle_root = shred_mod.layout.getChainedMerkleRoot(newer_shred); + const early_merkle_root = shred_mod.layout.merkleRoot(older_shred) orelse { + return error.NoMerkleRoot; + }; + + if (chainedMerkleRootIsConsistent(early_merkle_root, chained_merkle_root)) { + return true; } - } - // Consecutive set was not found in memory, scan blockstore for a potential candidate - var iter = try db.iterator(schema.erasure_meta, .reverse, erasure_set); - defer iter.deinit(); - const candidate_set: ErasureSetId, // - const candidate: ErasureMeta // - = while (try iter.nextBytes()) |entry| { - defer for (entry) |e| e.deinit(); - const key = try key_serializer.deserialize(ErasureSetId, allocator, entry[0].data); - if (key.slot != slot) return null; - if (key.erasure_set_index != erasure_set_index) break .{ - key, - try value_serializer.deserialize(ErasureMeta, allocator, entry[1].data), - }; - } else return null; + const slot = other_shred_id.slot; - // Check if this is actually the consecutive erasure set - const next = if (candidate.nextErasureSetIndex()) |n| n else return error.InvalidErasureConfig; - return if (next == erasure_set_index) - .{ candidate_set, candidate } - else - return null; -} + self.logger.warn().logf( + "Received conflicting chained merkle roots for slot: {}. Reporting as duplicate. " ++ + \\Conflicting shreds: + \\ erasure set: {?}, type: {?}, index: {?}, merkle root: {any} + \\ erasure set: {?}, type: {?}, index: {?}, chained merkle root: {any} + , + .{ + slot, + // early shred + shred_mod.layout.getErasureSetIndex(older_shred), + if (shred_mod.layout.getShredVariant(older_shred)) |v| v.shred_type else null, + shred_mod.layout.getIndex(older_shred), + early_merkle_root, + // late shred + shred_mod.layout.getErasureSetIndex(newer_shred), + if (shred_mod.layout.getShredVariant(newer_shred)) |v| v.shred_type else null, + shred_mod.layout.getIndex(newer_shred), + chained_merkle_root, + }, + ); + + if (!try self.duplicate_shreds.contains(slot)) { + try self.duplicate_shreds.append(.{ + .ChainedMerkleRootConflict = .{ .original = shred, .conflict = other_shred }, + }); + } + + return false; + } +}; /// agave: check_chaining -fn checkChaining( - merkle_root: ?Hash, - chained_merkle_root: ?Hash, -) bool { +fn chainedMerkleRootIsConsistent(merkle_root: Hash, chained_merkle_root: ?Hash) bool { return chained_merkle_root == null or // Chained merkle roots have not been enabled yet sig.utils.types.eql(chained_merkle_root, merkle_root); } diff --git a/src/ledger/shred_inserter/shred_inserter.zig b/src/ledger/shred_inserter/shred_inserter.zig index 1d3aed439..8d427aabb 100644 --- a/src/ledger/shred_inserter/shred_inserter.zig +++ b/src/ledger/shred_inserter/shred_inserter.zig @@ -30,10 +30,11 @@ const Timer = sig.time.Timer; const BlockstoreDB = ledger.blockstore.BlockstoreDB; const IndexMetaWorkingSetEntry = lib.working_state.IndexMetaWorkingSetEntry; +const MerkleRootValidator = lib.merkle_root_checks.MerkleRootValidator; const PendingInsertShredsState = lib.working_state.PendingInsertShredsState; const PossibleDuplicateShred = lib.working_state.PossibleDuplicateShred; const WorkingEntry = lib.working_state.WorkingEntry; -const WorkingShredStore = lib.working_state.WorkingShredStore; +const ShredWorkingStore = lib.working_state.ShredWorkingStore; const WriteBatch = BlockstoreDB.WriteBatch; const ErasureMeta = meta.ErasureMeta; @@ -42,11 +43,6 @@ const MerkleRootMeta = meta.MerkleRootMeta; const ShredIndex = meta.ShredIndex; const SlotMeta = meta.SlotMeta; -const checkForwardChainedMerkleRootConsistency = - lib.merkle_root_checks.checkForwardChainedMerkleRootConsistency; -const checkBackwardsChainedMerkleRootConsistency = - lib.merkle_root_checks.checkBackwardsChainedMerkleRootConsistency; -const checkMerkleRootConsistency = lib.merkle_root_checks.checkMerkleRootConsistency; const handleChaining = lib.slot_chaining.handleChaining; const recover = lib.recovery.recover; const newlinesToSpaces = sig.utils.fmt.newlinesToSpaces; @@ -175,6 +171,7 @@ pub const ShredInserter = struct { ); defer state.deinit(); var write_batch = state.write_batch; + const merkle_root_validator = MerkleRootValidator.init(&state); var get_lock_timer = try Timer.start(); self.lock.lock(); @@ -194,6 +191,7 @@ pub const ShredInserter = struct { if (self.checkInsertDataShred( data_shred, &state, + merkle_root_validator, &write_batch, is_trusted, leader_schedule, @@ -224,6 +222,7 @@ pub const ShredInserter = struct { _ = try self.checkInsertCodeShred( code_shred, &state, + merkle_root_validator, &write_batch, is_trusted, shred_source, @@ -245,7 +244,7 @@ pub const ShredInserter = struct { const recovered_shreds = try self.tryShredRecovery( &state.erasure_metas, &state.index_working_set, - state.shredStore(), + state.shreds(), &reed_solomon_cache, ); defer { @@ -277,6 +276,7 @@ pub const ShredInserter = struct { if (self.checkInsertDataShred( shred.data, &state, + merkle_root_validator, &write_batch, is_trusted, leader_schedule, @@ -337,15 +337,10 @@ pub const ShredInserter = struct { // unreachable: Erasure meta was just created, initial shred must exist const shred = state.just_inserted_shreds.get(shred_id) orelse unreachable; // TODO: agave discards the result here. should we also? - _ = try checkForwardChainedMerkleRootConsistency( - allocator, - self.logger, - &self.db, + _ = try merkle_root_validator.checkForwardChaining( shred.code, erasure_meta, - state.shredStore(), - &state.merkle_root_metas, - &state.duplicate_shreds, + state.merkleRootMetas(), ); }; @@ -371,15 +366,7 @@ pub const ShredInserter = struct { // unreachable: Merkle root meta was just created, initial shred must exist const shred = state.just_inserted_shreds.get(shred_id) orelse unreachable; // TODO: agave discards the result here. should we also? - _ = try checkBackwardsChainedMerkleRootConsistency( - allocator, - self.logger, - &self.db, - shred, - state.shredStore(), - &state.erasure_metas, - &state.duplicate_shreds, - ); + _ = try merkle_root_validator.checkBackwardChaining(shred, state.erasureMetas()); } self.metrics.merkle_chaining_elapsed_us.add(merkle_chaining_timer.read().asMicros()); @@ -404,6 +391,7 @@ pub const ShredInserter = struct { self: *Self, shred: CodeShred, state: *PendingInsertShredsState, + merkle_root_validator: MerkleRootValidator, write_batch: *WriteBatch, is_trusted: bool, shred_source: ShredSource, @@ -414,9 +402,15 @@ pub const ShredInserter = struct { const index_meta = &index_meta_working_set_entry.index; const erasure_set_id = shred.common.erasureSetId(); - try state.loadMerkleRootMeta(erasure_set_id); + try state.merkleRootMetas().load(erasure_set_id); - if (!try self.shouldInsertCodeShred(state, shred, index_meta, is_trusted)) { + if (!try self.shouldInsertCodeShred( + state, + merkle_root_validator, + shred, + index_meta, + is_trusted, + )) { return false; } @@ -430,7 +424,7 @@ pub const ShredInserter = struct { if (was_inserted) { index_meta_working_set_entry.did_insert_occur = true; self.metrics.num_inserted.inc(); - try state.initMerkleRootMetaIfMissing(erasure_set_id, shred); + try state.merkleRootMetas().initIfMissing(erasure_set_id, shred); } // NOTE: it's not accurate to say the shred was "just inserted" if was_inserted is false, @@ -449,6 +443,7 @@ pub const ShredInserter = struct { fn shouldInsertCodeShred( self: *Self, state: *PendingInsertShredsState, + merkle_root_validator: MerkleRootValidator, shred: CodeShred, index_meta: *const Index, is_trusted: bool, @@ -479,14 +474,10 @@ pub const ShredInserter = struct { // A previous shred has been inserted in this batch or in blockstore // Compare our current shred against the previous shred for potential // conflicts - if (!try checkMerkleRootConsistency( - self.logger, - &self.db, - state.shredStore(), + if (!try merkle_root_validator.checkConsistency( shred.common.slot, merkle_root_meta.asRef(), &.{ .code = shred }, - &state.duplicate_shreds, )) { return false; } @@ -498,7 +489,7 @@ pub const ShredInserter = struct { // NOTE perf: maybe this can be skipped for trusted shreds. // agave runs this regardless of trust, but we can check if it has // a meaningful performance impact to skip this for trusted shreds. - const erasure_meta = try state.getOrPutErasureMeta(erasure_set_id, shred); + const erasure_meta = try state.erasureMetas().getOrPut(erasure_set_id, shred); if (!erasure_meta.checkCodeShred(shred)) { self.metrics.num_code_shreds_invalid_erasure_config.inc(); try self.recordShredConflict(state, shred, erasure_meta); @@ -523,7 +514,7 @@ pub const ShredInserter = struct { // erasure set, but not this one. is it worth persisting this one as well? if (!try self.hasDuplicateShredsInSlot(slot)) { if (try findConflictingCodeShred( - state.shredStore(), + state.shreds(), shred, slot, erasure_meta, @@ -571,7 +562,7 @@ pub const ShredInserter = struct { /// agave: find_conflicting_coding_shred fn findConflictingCodeShred( - shred_store: WorkingShredStore, + shred_store: ShredWorkingStore, _: CodeShred, // TODO: figure out why this is here. delete it or add what is missing. slot: Slot, erasure_meta: *const ErasureMeta, @@ -594,6 +585,7 @@ pub const ShredInserter = struct { self: *Self, shred: DataShred, state: *PendingInsertShredsState, + merkle_root_validator: MerkleRootValidator, write_batch: *WriteBatch, is_trusted: bool, leader_schedule: ?SlotLeaderProvider, @@ -609,7 +601,7 @@ pub const ShredInserter = struct { const slot_meta = &slot_meta_entry.new_slot_meta; const erasure_set_id = shred.common.erasureSetId(); - try state.loadMerkleRootMeta(erasure_set_id); + try state.merkleRootMetas().load(erasure_set_id); if (!is_trusted) { if (isDataShredPresent(shred, slot_meta, &index_meta.data_index)) { @@ -640,7 +632,7 @@ pub const ShredInserter = struct { if (!try self.shouldInsertDataShred( shred, slot_meta, - state.shredStore(), + state.shreds(), self.max_root.load(.acquire), leader_schedule, shred_source, @@ -653,14 +645,10 @@ pub const ShredInserter = struct { // A previous shred has been inserted in this batch or in blockstore // Compare our current shred against the previous shred for potential // conflicts - if (!try checkMerkleRootConsistency( - self.logger, - &self.db, - state.shredStore(), + if (!try merkle_root_validator.checkConsistency( slot, merkle_root_meta.asRef(), &shred_union, - &state.duplicate_shreds, )) { return error.InvalidShred; } @@ -674,12 +662,12 @@ pub const ShredInserter = struct { write_batch, shred_source, ); - try state.initMerkleRootMetaIfMissing(erasure_set_id, shred); + try state.merkleRootMetas().initIfMissing(erasure_set_id, shred); try state.just_inserted_shreds.put(shred.id(), shred_union); // TODO check first? index_meta_working_set_entry.did_insert_occur = true; slot_meta_entry.did_insert_occur = true; - try state.loadErasureMeta(erasure_set_id); + try state.erasureMetas().load(erasure_set_id); return newly_completed_data_sets; } @@ -716,7 +704,7 @@ pub const ShredInserter = struct { self: *Self, shred: DataShred, slot_meta: *const SlotMeta, - shred_store: WorkingShredStore, + shred_store: ShredWorkingStore, max_root: Slot, leader_schedule: ?SlotLeaderProvider, shred_source: ShredSource, @@ -853,7 +841,7 @@ pub const ShredInserter = struct { self: *Self, erasure_metas: *SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), index_working_set: *AutoHashMap(u64, IndexMetaWorkingSetEntry), - shred_store: WorkingShredStore, + shred_store: ShredWorkingStore, reed_solomon_cache: *ReedSolomonCache, ) !ArrayList(Shred) { // Recovery rules: @@ -892,7 +880,7 @@ pub const ShredInserter = struct { self: *Self, index: *const Index, erasure_meta: *const ErasureMeta, - shred_store: WorkingShredStore, + shred_store: ShredWorkingStore, reed_solomon_cache: *ReedSolomonCache, ) !std.ArrayList(Shred) { var available_shreds = ArrayList(Shred).init(self.allocator); @@ -938,7 +926,7 @@ pub const ShredInserter = struct { index: *const ShredIndex, slot: Slot, shred_indices: [2]u64, - shred_store: WorkingShredStore, + shred_store: ShredWorkingStore, available_shreds: *ArrayList(Shred), ) !void { for (shred_indices[0]..shred_indices[1]) |i| { @@ -1193,6 +1181,7 @@ const ShredInserterTestState = struct { return try self.inserter.checkInsertCodeShred( shred.code, state, + MerkleRootValidator.init(state), write_batch, false, .turbine, diff --git a/src/ledger/shred_inserter/working_state.zig b/src/ledger/shred_inserter/working_state.zig index ee69a9247..46dad1b10 100644 --- a/src/ledger/shred_inserter/working_state.zig +++ b/src/ledger/shred_inserter/working_state.zig @@ -121,24 +121,6 @@ pub const PendingInsertShredsState = struct { self.write_batch.deinit(); } - pub fn getOrPutErasureMeta( - self: *Self, - erasure_set_id: ErasureSetId, - code_shred: CodeShred, - ) !*const ErasureMeta { - const erasure_meta_entry = try self.erasure_metas.getOrPut(erasure_set_id); - if (!erasure_meta_entry.found_existing) { - if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { - erasure_meta_entry.value_ptr.* = .{ .clean = meta_ }; - } else { - erasure_meta_entry.value_ptr.* = .{ - .dirty = ErasureMeta.fromCodeShred(code_shred) orelse return error.Unwrap, - }; - } - } - return erasure_meta_entry.value_ptr.asRef(); - } - /// agave: get_index_meta_entry pub fn getIndexMetaEntry(self: *Self, slot: Slot) !*IndexMetaWorkingSetEntry { var timer = try Timer.start(); @@ -185,7 +167,7 @@ pub const PendingInsertShredsState = struct { return entry.value_ptr; } - pub fn shredStore(self: *Self) WorkingShredStore { + pub fn shreds(self: *Self) ShredWorkingStore { return .{ .logger = self.logger, .db = self.db, @@ -193,36 +175,24 @@ pub const PendingInsertShredsState = struct { }; } - // TODO: should this actually be called externally? - // consider moving this logic into a getOrPut-style method - pub fn loadErasureMeta(self: *Self, erasure_set_id: ErasureSetId) !void { - if (!self.erasure_metas.contains(erasure_set_id)) { - if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { - try self.erasure_metas.put(erasure_set_id, .{ .clean = meta_ }); - } - } + pub fn erasureMetas(self: *Self) ErasureMetaWorkingStore { + return .{ + .allocator = self.allocator, + .db = self.db, + .working_entries = &self.erasure_metas, + }; } - // TODO: should this actually be called externally? - // consider moving this logic into a getOrPut-style method - pub fn loadMerkleRootMeta(self: *Self, erasure_set_id: ErasureSetId) !void { - if (!self.merkle_root_metas.contains(erasure_set_id)) { - if (try self.db.get(self.allocator, schema.merkle_root_meta, erasure_set_id)) |meta_| { - try self.merkle_root_metas.put(erasure_set_id, .{ .clean = meta_ }); - } - } + pub fn merkleRootMetas(self: *Self) MerkleRootMetaWorkingStore { + return .{ + .allocator = self.allocator, + .db = self.db, + .working_entries = &self.merkle_root_metas, + }; } - // TODO: should this actually be called externally? - pub fn initMerkleRootMetaIfMissing( - self: *Self, - erasure_set_id: ErasureSetId, - shred: anytype, - ) !void { - const entry = try self.merkle_root_metas.getOrPut(erasure_set_id); - if (!entry.found_existing) { - entry.value_ptr.* = .{ .dirty = MerkleRootMeta.fromFirstReceivedShred(shred) }; - } + pub fn duplicateShreds(self: *Self) DuplicateShredsWorkingStore { + return .{ .db = self.db, .duplicate_shreds = &self.duplicate_shreds }; } pub fn commit(self: *Self) !void { @@ -316,6 +286,144 @@ pub const PendingInsertShredsState = struct { } }; +pub const MerkleRootMetaWorkingStore = struct { + allocator: Allocator, + db: *BlockstoreDB, + working_entries: *AutoHashMap(ErasureSetId, WorkingEntry(MerkleRootMeta)), + + const Self = @This(); + + pub fn get(self: Self, id: ErasureSetId) !?MerkleRootMeta { + return if (self.working_entries.get(id)) |nes| + nes.asRef().* + else + try self.db.get(self.allocator, schema.merkle_root_meta, id); + } + + // TODO: should this actually be called externally? + // consider moving this logic into a getOrPut-style method + pub fn load(self: Self, erasure_set_id: ErasureSetId) !void { + if (!self.working_entries.contains(erasure_set_id)) { + if (try self.db.get(self.allocator, schema.merkle_root_meta, erasure_set_id)) |meta_| { + try self.working_entries.put(erasure_set_id, .{ .clean = meta_ }); + } + } + } + + // TODO: should this actually be called externally? + pub fn initIfMissing(self: Self, erasure_set_id: ErasureSetId, shred: anytype) !void { + const entry = try self.working_entries.getOrPut(erasure_set_id); + if (!entry.found_existing) { + entry.value_ptr.* = .{ .dirty = MerkleRootMeta.fromFirstReceivedShred(shred) }; + } + } +}; + +pub const ErasureMetaWorkingStore = struct { + allocator: Allocator, + db: *BlockstoreDB, + working_entries: *SortedMap(ErasureSetId, WorkingEntry(ErasureMeta)), + + const Self = @This(); + + pub fn get(self: Self, id: ErasureSetId) !?MerkleRootMeta { + return if (self.working_entries.get(id)) |nes| + nes.asRef().* + else if (try self.db.get(self.allocator, schema.erasure_meta, id)) |nes| + nes; + } + + pub fn getOrPut( + self: Self, + erasure_set_id: ErasureSetId, + code_shred: CodeShred, + ) !*const ErasureMeta { + const erasure_meta_entry = try self.working_entries.getOrPut(erasure_set_id); + if (!erasure_meta_entry.found_existing) { + if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { + erasure_meta_entry.value_ptr.* = .{ .clean = meta_ }; + } else { + erasure_meta_entry.value_ptr.* = .{ + .dirty = ErasureMeta.fromCodeShred(code_shred) orelse return error.Unwrap, + }; + } + } + return erasure_meta_entry.value_ptr.asRef(); + } + + // TODO: should this actually be called externally? + // consider moving this logic into a getOrPut-style method + pub fn load(self: Self, erasure_set_id: ErasureSetId) !void { + if (!self.working_entries.contains(erasure_set_id)) { + if (try self.db.get(self.allocator, schema.erasure_meta, erasure_set_id)) |meta_| { + try self.working_entries.put(erasure_set_id, .{ .clean = meta_ }); + } + } + } + + /// agave: previous_erasure_set + pub fn previousSet( + self: Self, + erasure_set: ErasureSetId, + ) !?struct { ErasureSetId, ErasureMeta } { // TODO: agave uses CoW here + const slot = erasure_set.slot; + const erasure_set_index = erasure_set.erasure_set_index; + + // Check the previous entry from the in memory map to see if it is the consecutive + // set to `erasure set` + const id_range, const meta_range = self.working_entries.range( + .{ .slot = slot, .erasure_set_index = 0 }, + erasure_set, + ); + if (id_range.len != 0) { + const i = id_range.len - 1; + const last_meta = meta_range[i].asRef(); + if (@as(u32, @intCast(erasure_set_index)) == last_meta.nextErasureSetIndex()) { + return .{ id_range[i], last_meta.* }; + } + } + + // Consecutive set was not found in memory, scan blockstore for a potential candidate + const key_serializer = ledger.database.key_serializer; + const value_serializer = ledger.database.value_serializer; + var iter = try self.db.iterator(schema.erasure_meta, .reverse, erasure_set); + defer iter.deinit(); + const candidate_set: ErasureSetId, // + const candidate: ErasureMeta // + = while (try iter.nextBytes()) |entry| { + defer for (entry) |e| e.deinit(); + const key = try key_serializer.deserialize(ErasureSetId, self.allocator, entry[0].data); + if (key.slot != slot) return null; + if (key.erasure_set_index != erasure_set_index) break .{ + key, + try value_serializer.deserialize(ErasureMeta, self.allocator, entry[1].data), + }; + } else return null; + + // Check if this is actually the consecutive erasure set + const next = if (candidate.nextErasureSetIndex()) |n| n else return error.InvalidErasureConfig; + return if (next == erasure_set_index) + .{ candidate_set, candidate } + else + return null; + } +}; + +pub const DuplicateShredsWorkingStore = struct { + db: *BlockstoreDB, + duplicate_shreds: *std.ArrayList(PossibleDuplicateShred), + + const Self = DuplicateShredsWorkingStore; + + pub fn contains(self: Self, slot: Slot) !bool { + return try self.db.contains(schema.duplicate_slots, slot); + } + + pub fn append(self: Self, dupe: PossibleDuplicateShred) !void { + try self.duplicate_shreds.append(dupe); + } +}; + pub fn WorkingEntry(comptime T: type) type { return union(enum) { // Value has been modified with respect to the blockstore column @@ -380,7 +488,7 @@ const ShredConflict = struct { conflict: []const u8, }; -pub const WorkingShredStore = struct { +pub const ShredWorkingStore = struct { logger: sig.trace.Logger, db: *BlockstoreDB, just_inserted_shreds: *const AutoHashMap(ShredId, Shred), diff --git a/src/shred_collector/shred_receiver.zig b/src/shred_collector/shred_receiver.zig index 5cc1f89e4..43876c56b 100644 --- a/src/shred_collector/shred_receiver.zig +++ b/src/shred_collector/shred_receiver.zig @@ -193,7 +193,7 @@ fn validateShred( // https://github.com/solana-labs/solana/pull/35076 _ = layout.getLeaderSignature(shred) orelse return error.signature_missing; - _ = layout.getSignedData(shred) orelse return error.signed_data_missing; + _ = layout.merkleRoot(shred) orelse return error.signed_data_missing; } /// TODO: this may need to move to blockstore diff --git a/src/shred_collector/shred_verifier.zig b/src/shred_collector/shred_verifier.zig index 0aaad2b9d..dd2255893 100644 --- a/src/shred_collector/shred_verifier.zig +++ b/src/shred_collector/shred_verifier.zig @@ -51,7 +51,7 @@ fn verifyShred( const shred = shred_layout.getShred(packet) orelse return error.insufficient_shred_size; const slot = shred_layout.getSlot(shred) orelse return error.slot_missing; const signature = shred_layout.getLeaderSignature(shred) orelse return error.signature_missing; - const signed_data = shred_layout.getSignedData(shred) orelse return error.signed_data_missing; + const signed_data = shred_layout.merkleRoot(shred) orelse return error.signed_data_missing; const leader = leader_schedule.call(slot) orelse return error.leader_unknown; _ = signature.verify(leader, &signed_data.data) or return error.failed_verification;