Skip to content
This repository has been archived by the owner on Jan 13, 2025. It is now read-only.

Commit

Permalink
gossip: process duplicate proofs for merkle root conflicts (#34066)
Browse files Browse the repository at this point in the history
* gossip: process duplicate proofs for merkle root conflicts

* pr comments + abi

(cherry picked from commit ca6ab08)

# Conflicts:
#	gossip/src/cluster_info.rs
  • Loading branch information
AshwinSekar committed Dec 1, 2023

Verified

This commit was signed with the committer’s verified signature.
vkubiv Volodymyr Kubiv
1 parent da2dadd commit f4f50f1
Showing 2 changed files with 210 additions and 23 deletions.
4 changes: 4 additions & 0 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
@@ -272,7 +272,11 @@ pub fn make_accounts_hashes_message(
pub(crate) type Ping = ping_pong::Ping<[u8; GOSSIP_PING_TOKEN_SIZE]>;

// TODO These messages should go through the gpu pipeline for spam filtering
<<<<<<< HEAD
#[frozen_abi(digest = "EnbW8mYTsPMndq9NkHLTkHJgduXvWSfSD6bBdmqQ8TiF")]
=======
#[frozen_abi(digest = "FW5Ycg6GXPsY5Ek9b2VjP69toxRb95bSNQRRWLSdKv2Y")]
>>>>>>> ca6ab08555 (gossip: process duplicate proofs for merkle root conflicts (#34066))
#[derive(Serialize, Deserialize, Debug, AbiEnumVisitor, AbiExample)]
#[allow(clippy::large_enum_variant)]
pub(crate) enum Protocol {
229 changes: 206 additions & 23 deletions gossip/src/duplicate_shred.rs
Original file line number Diff line number Diff line change
@@ -30,7 +30,7 @@ pub struct DuplicateShred {
pub(crate) wallclock: u64,
pub(crate) slot: Slot,
_unused: u32,
shred_type: ShredType,
_unused_shred_type: ShredType,
// Serialized DuplicateSlotProof split into chunks.
num_chunks: u8,
chunk_index: u8,
@@ -90,8 +90,8 @@ pub enum Error {

/// Check that `shred1` and `shred2` indicate a valid duplicate proof
/// - Must be for the same slot
/// - Must have the same `shred_type`
/// - Must both sigverify for the correct leader
/// - Must have a merkle root conflict, otherwise `shred1` and `shred2` must have the same `shred_type`
/// - If `shred1` and `shred2` share the same index they must be not equal
/// - If `shred1` and `shred2` do not share the same index and are data shreds
/// verify that they indicate an index conflict. One of them must be the
@@ -106,10 +106,6 @@ where
return Err(Error::SlotMismatch);
}

if shred1.shred_type() != shred2.shred_type() {
return Err(Error::ShredTypeMismatch);
}

if let Some(leader_schedule) = leader_schedule {
let slot_leader =
leader_schedule(shred1.slot()).ok_or(Error::UnknownSlotLeader(shred1.slot()))?;
@@ -118,6 +114,20 @@ where
}
}

// Merkle root conflict check
if shred1.fec_set_index() == shred2.fec_set_index()
&& shred1.merkle_root().ok() != shred2.merkle_root().ok()
{
// This catches a mixture of legacy and merkle shreds
// as well as merkle shreds with different roots in the
// same fec set
return Ok(());
}

if shred1.shred_type() != shred2.shred_type() {
return Err(Error::ShredTypeMismatch);
}

if shred1.index() == shred2.index() {
if shred1.payload() != shred2.payload() {
return Ok(());
@@ -164,7 +174,7 @@ where
}
let other_shred = Shred::new_from_serialized_shred(other_payload)?;
check_shreds(leader_schedule, &shred, &other_shred)?;
let (slot, shred_type) = (shred.slot(), shred.shred_type());
let slot = shred.slot();
let proof = DuplicateSlotProof {
shred1: shred.into_payload(),
shred2: other_shred.into_payload(),
@@ -184,27 +194,21 @@ where
from: self_pubkey,
wallclock,
slot,
shred_type,
num_chunks,
chunk_index: i as u8,
chunk,
_unused: 0,
_unused_shred_type: ShredType::Code,
});
Ok(chunks)
}

// Returns a predicate checking if a duplicate-shred chunk matches
// (slot, shred_type) and has valid chunk_index.
fn check_chunk(
slot: Slot,
shred_type: ShredType,
num_chunks: u8,
) -> impl Fn(&DuplicateShred) -> Result<(), Error> {
// the slot and has valid chunk_index.
fn check_chunk(slot: Slot, num_chunks: u8) -> impl Fn(&DuplicateShred) -> Result<(), Error> {
move |dup| {
if dup.slot != slot {
Err(Error::SlotMismatch)
} else if dup.shred_type != shred_type {
Err(Error::ShredTypeMismatch)
} else if dup.num_chunks != num_chunks {
Err(Error::NumChunksMismatch)
} else if dup.chunk_index >= num_chunks {
@@ -226,13 +230,12 @@ pub(crate) fn into_shreds(
let mut chunks = chunks.into_iter();
let DuplicateShred {
slot,
shred_type,
num_chunks,
chunk_index,
chunk,
..
} = chunks.next().ok_or(Error::InvalidDuplicateShreds)?;
let check_chunk = check_chunk(slot, shred_type, num_chunks);
let check_chunk = check_chunk(slot, num_chunks);
let mut data = HashMap::new();
data.insert(chunk_index, chunk);
for chunk in chunks {
@@ -260,8 +263,6 @@ pub(crate) fn into_shreds(
let shred2 = Shred::new_from_serialized_shred(proof.shred2)?;
if shred1.slot() != slot || shred2.slot() != slot {
Err(Error::SlotMismatch)
} else if shred1.shred_type() != shred_type || shred2.shred_type() != shred_type {
Err(Error::ShredTypeMismatch)
} else {
check_shreds(Some(|_| Some(slot_leader).copied()), &shred1, &shred2)?;
Ok((shred1, shred2))
@@ -300,7 +301,7 @@ pub(crate) mod tests {
from: Pubkey::new_unique(),
wallclock: u64::MAX,
slot: Slot::MAX,
shred_type: ShredType::Data,
_unused_shred_type: ShredType::Data,
num_chunks: u8::MAX,
chunk_index: u8::MAX,
chunk: Vec::default(),
@@ -421,7 +422,7 @@ pub(crate) mod tests {
wallclock: u64,
max_size: usize, // Maximum serialized size of each DuplicateShred.
) -> Result<impl Iterator<Item = DuplicateShred>, Error> {
let (slot, shred_type) = (shred.slot(), shred.shred_type());
let slot = shred.slot();
let proof = DuplicateSlotProof {
shred1: shred.into_payload(),
shred2: other_shred.into_payload(),
@@ -437,11 +438,11 @@ pub(crate) mod tests {
from: self_pubkey,
wallclock,
slot,
shred_type,
num_chunks,
chunk_index: i as u8,
chunk,
_unused: 0,
_unused_shred_type: ShredType::Code,
});
Ok(chunks)
}
@@ -949,4 +950,186 @@ pub(crate) mod tests {
);
}
}

#[test]
fn test_merkle_root_conflict_round_trip() {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..31_000);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};

let (data_shreds, coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
true, /* merkle_variant */
&shredder,
&leader,
false,
);

let (legacy_data_shreds, legacy_coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
false, /* merkle_variant */
&shredder,
&leader,
true,
);

let (diff_data_shreds, diff_coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
true, /* merkle_variant */
&shredder,
&leader,
false,
);

let test_cases = vec![
(data_shreds[0].clone(), diff_data_shreds[1].clone()),
(coding_shreds[0].clone(), diff_coding_shreds[1].clone()),
(data_shreds[0].clone(), diff_coding_shreds[0].clone()),
(coding_shreds[0].clone(), diff_data_shreds[0].clone()),
// Mix of legacy and merkle for same fec set
(legacy_coding_shreds[0].clone(), data_shreds[0].clone()),
(coding_shreds[0].clone(), legacy_data_shreds[0].clone()),
(legacy_data_shreds[0].clone(), coding_shreds[0].clone()),
(data_shreds[0].clone(), legacy_coding_shreds[0].clone()),
];
for (shred1, shred2) in test_cases.into_iter() {
let chunks: Vec<_> = from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
)
.unwrap()
.collect();
assert!(chunks.len() > 4);
let (shred3, shred4) = into_shreds(&leader.pubkey(), chunks).unwrap();
assert_eq!(shred1, shred3);
assert_eq!(shred2, shred4);
}
}

#[test]
fn test_merkle_root_conflict_invalid() {
let mut rng = rand::thread_rng();
let leader = Arc::new(Keypair::new());
let (slot, parent_slot, reference_tick, version) = (53084024, 53084023, 0, 0);
let shredder = Shredder::new(slot, parent_slot, reference_tick, version).unwrap();
let next_shred_index = rng.gen_range(0..31_000);
let leader_schedule = |s| {
if s == slot {
Some(leader.pubkey())
} else {
None
}
};

let (data_shreds, coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
true,
&shredder,
&leader,
true,
);

let (next_data_shreds, next_coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index + 1,
next_shred_index + 1,
10,
true,
&shredder,
&leader,
true,
);

let (legacy_data_shreds, legacy_coding_shreds) = new_rand_shreds(
&mut rng,
next_shred_index,
next_shred_index,
10,
false,
&shredder,
&leader,
true,
);

let test_cases = vec![
// Same fec set same merkle root
(coding_shreds[0].clone(), data_shreds[0].clone()),
(data_shreds[0].clone(), coding_shreds[0].clone()),
// Different FEC set different merkle root
(coding_shreds[0].clone(), next_data_shreds[0].clone()),
(next_coding_shreds[0].clone(), data_shreds[0].clone()),
(data_shreds[0].clone(), next_coding_shreds[0].clone()),
(next_data_shreds[0].clone(), coding_shreds[0].clone()),
// Legacy shreds
(
legacy_coding_shreds[0].clone(),
legacy_data_shreds[0].clone(),
),
(
legacy_data_shreds[0].clone(),
legacy_coding_shreds[0].clone(),
),
// Mix of legacy and merkle with different fec index
(legacy_coding_shreds[0].clone(), next_data_shreds[0].clone()),
(next_coding_shreds[0].clone(), legacy_data_shreds[0].clone()),
(legacy_data_shreds[0].clone(), next_coding_shreds[0].clone()),
(next_data_shreds[0].clone(), legacy_coding_shreds[0].clone()),
];
for (shred1, shred2) in test_cases.into_iter() {
assert_matches!(
from_shred(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.payload().clone(),
Some(leader_schedule),
rng.gen(), // wallclock
512, // max_size
)
.err()
.unwrap(),
Error::ShredTypeMismatch
);

let chunks: Vec<_> = from_shred_bypass_checks(
shred1.clone(),
Pubkey::new_unique(), // self_pubkey
shred2.clone(),
rng.gen(), // wallclock
512, // max_size
)
.unwrap()
.collect();
assert!(chunks.len() > 4);

assert_matches!(
into_shreds(&leader.pubkey(), chunks).err().unwrap(),
Error::ShredTypeMismatch
);
}
}
}

0 comments on commit f4f50f1

Please sign in to comment.