Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

gossip: process duplicate proofs for merkle root conflicts #34066

Merged
merged 2 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
#[frozen_abi(digest = "HvA9JnnQrJnmkcGxrp8SmTB1b4iSyQ4VK2p6LpSBaoWR")]
#[frozen_abi(digest = "FW5Ycg6GXPsY5Ek9b2VjP69toxRb95bSNQRRWLSdKv2Y")]
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
Expand Down
229 changes: 206 additions & 23 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pub struct DuplicateShred {
pub(crate) wallclock: u64,
pub(crate) slot: Slot,
_unused: u32,
shred_type: ShredType,
_unused_shred_type: ShredType,
// Serialized DuplicateSlotProof split into chunks.
num_chunks: u8,
chunk_index: u8,
Expand Down Expand Up @@ -90,8 +90,8 @@ pub enum Error {

/// Check that `shred1` and `shred2` indicate a valid duplicate proof
/// - Must be for the same slot
/// - Must have the same `shred_type`
/// - Must both sigverify for the correct leader
/// - Must have a merkle root conflict, otherwise `shred1` and `shred2` must have the same `shred_type`
/// - If `shred1` and `shred2` share the same index they must be not equal
/// - If `shred1` and `shred2` do not share the same index and are data shreds
/// verify that they indicate an index conflict. One of them must be the
Expand All @@ -106,10 +106,6 @@ where
return Err(Error::SlotMismatch);
}

if shred1.shred_type() != shred2.shred_type() {
return Err(Error::ShredTypeMismatch);
}

if let Some(leader_schedule) = leader_schedule {
let slot_leader =
leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?;
Expand All @@ -118,6 +114,20 @@ where
}
}

// Merkle root conflict check
if shred1.fec_set_index() == shred2.fec_set_index()
&& shred1.merkle_root().ok() != shred2.merkle_root().ok()
{
// This catches a mixture of legacy and merkle shreds
// as well as merkle shreds with different roots in the
// same fec set
return Ok(());
}

if shred1.shred_type() != shred2.shred_type() {
return Err(Error::ShredTypeMismatch);
}

if shred1.index() == shred2.index() {
if shred1.payload() != shred2.payload() {
return Ok(());
Expand Down Expand Up @@ -164,7 +174,7 @@ where
}
let other_shred = Shred::new_from_serialized_shred(other_payload)?;
check_shreds(leader_schedule, &shred, &other_shred)?;
let (slot, shred_type) = (shred.slot(), shred.shred_type());
let slot = shred.slot();
let proof = DuplicateSlotProof {
shred1: shred.into_payload(),
shred2: other_shred.into_payload(),
Expand All @@ -184,27 +194,21 @@ where
from: self_pubkey,
wallclock,
slot,
shred_type,
num_chunks,
chunk_index: i as u8,
chunk,
_unused: 0,
_unused_shred_type: ShredType::Code,
});
Ok(chunks)
}

// Returns a predicate checking if a duplicate-shred chunk matches
// (slot, shred_type) and has valid chunk_index.
fn check_chunk(
slot: Slot,
shred_type: ShredType,
num_chunks: u8,
) -> impl Fn(&DuplicateShred) -> Result<(), Error> {
// the slot and has valid chunk_index.
fn check_chunk(slot: Slot, num_chunks: u8) -> impl Fn(&DuplicateShred) -> Result<(), Error> {
move |dup| {
if dup.slot != slot {
Err(Error::SlotMismatch)
} else if dup.shred_type != shred_type {
Err(Error::ShredTypeMismatch)
} else if dup.num_chunks != num_chunks {
Err(Error::NumChunksMismatch)
} else if dup.chunk_index >= num_chunks {
Expand All @@ -226,13 +230,12 @@ pub(crate) fn into_shreds(
let mut chunks = chunks.into_iter();
let DuplicateShred {
slot,
shred_type,
num_chunks,
chunk_index,
chunk,
..
} = chunks.next().ok_or(Error::InvalidDuplicateShreds)?;
let check_chunk = check_chunk(slot, shred_type, num_chunks);
let check_chunk = check_chunk(slot, num_chunks);
let mut data = HashMap::new();
data.insert(chunk_index, chunk);
for chunk in chunks {
Expand Down Expand Up @@ -260,8 +263,6 @@ pub(crate) fn into_shreds(
let shred2 = Shred::new_from_serialized_shred(proof.shred2)?;
if shred1.slot() != slot || shred2.slot() != slot {
Err(Error::SlotMismatch)
} else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type {
Err(Error::ShredTypeMismatch)
} else {
check_shreds(Some(|_| Some(slot_leader).copied()), &shred1, &shred2)?;
Ok((shred1, shred2))
Expand Down Expand Up @@ -300,7 +301,7 @@ pub(crate) mod tests {
from: Pubkey::new_unique(),
wallclock: u64::MAX,
slot: Slot::MAX,
shred_type: ShredType::Data,
_unused_shred_type: ShredType::Data,
num_chunks: u8::MAX,
chunk_index: u8::MAX,
chunk: Vec::default(),
Expand Down Expand Up @@ -421,7 +422,7 @@ pub(crate) mod tests {
wallclock: u64,
max_size: usize, // Maximum serialized size of each DuplicateShred.
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
let (slot, shred_type) = (shred.slot(), shred.shred_type());
let slot = shred.slot();
let proof = DuplicateSlotProof {
shred1: shred.into_payload(),
shred2: other_shred.into_payload(),
Expand All @@ -437,11 +438,11 @@ pub(crate) mod tests {
from: self_pubkey,
wallclock,
slot,
shred_type,
num_chunks,
chunk_index: i as u8,
chunk,
_unused: 0,
_unused_shred_type: ShredType::Code,
});
Ok(chunks)
}
Expand Down Expand Up @@ -949,4 +950,186 @@ pub(crate) mod tests {
);
}
}

#[test]
fn test_merkle_root_conflict_round_trip() {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..31_000);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};

let (data_shreds, coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
true, /* merkle_variant */
&shredder,
&leader,
false,
);

let (legacy_data_shreds, legacy_coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
false, /* merkle_variant */
&shredder,
&leader,
true,
);

let (diff_data_shreds, diff_coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
true, /* merkle_variant */
&shredder,
&leader,
false,
);

let test_cases = vec![
(data_shreds[0].clone(), diff_data_shreds[1].clone()),
(coding_shreds[0].clone(), diff_coding_shreds[1].clone()),
(data_shreds[0].clone(), diff_coding_shreds[0].clone()),
(coding_shreds[0].clone(), diff_data_shreds[0].clone()),
// Mix of legacy and merkle for same fec set
(legacy_coding_shreds[0].clone(), data_shreds[0].clone()),
(coding_shreds[0].clone(), legacy_data_shreds[0].clone()),
(legacy_data_shreds[0].clone(), coding_shreds[0].clone()),
(data_shreds[0].clone(), legacy_coding_shreds[0].clone()),
];
for (shred1, shred2) in test_cases.into_iter() {
let chunks: Vec<_> = from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
)
.unwrap()
.collect();
assert!(chunks.len() > 4);
let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap();
assert_eq!(shred1, shred3);
assert_eq!(shred2, shred4);
}
}

#[test]
fn test_merkle_root_conflict_invalid() {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..31_000);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};

let (data_shreds, coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
true,
&shredder,
&leader,
true,
);

let (next_data_shreds, next_coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index + 1,
next_shred_index + 1,
10,
true,
&shredder,
&leader,
true,
);

let (legacy_data_shreds, legacy_coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
false,
&shredder,
&leader,
true,
);

let test_cases = vec![
// Same fec set same merkle root
(coding_shreds[0].clone(), data_shreds[0].clone()),
(data_shreds[0].clone(), coding_shreds[0].clone()),
// Different FEC set different merkle root
(coding_shreds[0].clone(), next_data_shreds[0].clone()),
(next_coding_shreds[0].clone(), data_shreds[0].clone()),
(data_shreds[0].clone(), next_coding_shreds[0].clone()),
(next_data_shreds[0].clone(), coding_shreds[0].clone()),
// Legacy shreds
(
legacy_coding_shreds[0].clone(),
legacy_data_shreds[0].clone(),
),
(
legacy_data_shreds[0].clone(),
legacy_coding_shreds[0].clone(),
),
// Mix of legacy and merkle with different fec index
(legacy_coding_shreds[0].clone(), next_data_shreds[0].clone()),
(next_coding_shreds[0].clone(), legacy_data_shreds[0].clone()),
(legacy_data_shreds[0].clone(), next_coding_shreds[0].clone()),
(next_data_shreds[0].clone(), legacy_coding_shreds[0].clone()),
];
for (shred1, shred2) in test_cases.into_iter() {
assert_matches!(
from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
)
.err()
.unwrap(),
Error::ShredTypeMismatch
);

let chunks: Vec<_> = from_shred_bypass_checks(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.clone(),
rng.gen(), // wallclock
512, // max_size
)
.unwrap()
.collect();
assert!(chunks.len() > 4);

assert_matches!(
into_shreds(&leader.pubkey(), chunks).err().unwrap(),
Error::ShredTypeMismatch
);
}
}
}