Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Candidate backing respects scheduled collator #1613

Merged
merged 3 commits into from
Aug 19, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 167 additions & 25 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use polkadot_primitives::v1::{
CommittedCandidateReceipt, BackedCandidate, Id as ParaId, ValidatorId,
ValidatorIndex, SigningContext, PoV,
CandidateDescriptor, AvailableData, ValidatorSignature, Hash, CandidateReceipt,
CandidateCommitments, CoreState, CoreIndex,
CandidateCommitments, CoreState, CoreIndex, CollatorId,
};
use polkadot_node_primitives::{
FromTableMisbehavior, Statement, SignedFullStatement, MisbehaviorReport,
Expand Down Expand Up @@ -91,8 +91,10 @@ struct CandidateBackingJob {
rx_to: mpsc::Receiver<ToJob>,
/// Outbound message channel sending part.
tx_from: mpsc::Sender<FromJob>,
/// The `ParaId`s assigned to this validator.
/// The `ParaId` assigned to this validator
assignment: ParaId,
/// The collator required to author the candidate, if any.
required_collator: Option<CollatorId>,
/// We issued `Valid` or `Invalid` statements on about these candidates.
issued_statements: HashSet<Hash>,
/// `Some(h)` if this job has already issues `Seconded` statemt for some candidate with `h` hash.
Expand Down Expand Up @@ -268,6 +270,14 @@ impl CandidateBackingJob {
candidate: &CandidateReceipt,
pov: PoV,
) -> Result<bool, Error> {
// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
.map_or(false, |c| c != &candidate.descriptor().collator)
{
self.issue_candidate_invalid_message(candidate.clone()).await?;
return Ok(false);
}

let valid = self.request_candidate_validation(
candidate.descriptor().clone(),
Arc::new(pov.clone()),
Expand Down Expand Up @@ -479,6 +489,19 @@ impl CandidateBackingJob {
let expected_commitments = candidate.commitments.clone();

let descriptor = candidate.descriptor().clone();

// Check that candidate is collated by the right collator.
if self.required_collator.as_ref()
.map_or(false, |c| c != &descriptor.collator)
{
// If not, we've got the statement in the table but we will
// not issue validation work for it.
//
// Act as though we've issued a statement.
self.issued_statements.insert(candidate_hash);
return Ok(());
}

let pov = self.request_pov_from_distribution(descriptor.clone()).await?;
let v = self.request_candidate_validation(descriptor, pov.clone()).await?;

Expand Down Expand Up @@ -721,46 +744,61 @@ impl util::JobTrait for CandidateBackingJob {
let cores = try_runtime_api!(cores);

let signing_context = SigningContext { parent_hash: parent, session_index };
let validator = Validator::construct(&validators, signing_context, keystore.clone())?;
let validator = match Validator::construct(
&validators,
signing_context,
keystore.clone(),
) {
Ok(v) => v,
Err(util::Error::NotAValidator) => { return Ok(()) },
Err(e) => {
log::warn!(
target: "candidate_backing",
"Cannot participate in candidate backing: {:?}",
e
);

return Ok(())
}
};

let mut groups = HashMap::new();

let n_cores = cores.len();

let mut assignment = None;
for (idx, core) in cores.into_iter().enumerate() {
// Ignore prospective assignments on occupied cores for the time being.
if let CoreState::Scheduled(scheduled) = core {
let core_index = CoreIndex(idx as _);
let group_index = group_rotation_info.group_for_core(core_index, n_cores);
if let Some(g) = validator_groups.get(group_index.0 as usize) {
if g.contains(&validator.index()) {
assignment = Some((scheduled.para_id, scheduled.collator));
}
groups.insert(scheduled.para_id, g.clone());
}
}
}

let mut assignment = Default::default();

if let Some(idx) = validators.iter().position(|k| *k == validator.id()) {
let idx = idx as u32;
for (para_id, group) in groups.iter() {
if group.contains(&idx) {
assignment = *para_id;
break;
}
}
}

let table_context = TableContext {
groups,
validators,
signing_context: validator.signing_context().clone(),
validator: Some(validator),
};

let (assignment, required_collator) = match assignment {
None => return Ok(()), // no need to work.
Some((a, r)) => (a, r),
};

let job = CandidateBackingJob {
parent,
rx_to,
tx_from,
assignment,
required_collator,
issued_statements: HashSet::new(),
seconded: None,
reported_misbehavior_for: HashSet::new(),
Expand Down Expand Up @@ -829,7 +867,7 @@ mod tests {
use assert_matches::assert_matches;
use futures::{executor, future, Future};
use polkadot_primitives::v1::{
ScheduledCore, BlockData, CandidateCommitments, CollatorId,
ScheduledCore, BlockData, CandidateCommitments,
PersistedValidationData, ValidationData, TransientValidationData, HeadData,
ValidatorPair, ValidityAttestation, GroupRotationInfo,
};
Expand Down Expand Up @@ -1041,15 +1079,15 @@ mod tests {
}
);

// Check that subsystem job issues a request for the availability cores.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::AvailabilityCores(tx))
) if parent == test_state.relay_parent => {
tx.send(Ok(test_state.availability_cores.clone())).unwrap();
}
);
// Check that subsystem job issues a request for the availability cores.
assert_matches!(
virtual_overseer.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(parent, RuntimeApiRequest::AvailabilityCores(tx))
) if parent == test_state.relay_parent => {
tx.send(Ok(test_state.availability_cores.clone())).unwrap();
}
);
}

// Test that a `CandidateBackingMessage::Second` issues validation work
Expand Down Expand Up @@ -1747,4 +1785,108 @@ mod tests {
assert_eq!(rx.await.unwrap().len(), 0);
});
}

// Test that a `CandidateBackingMessage::Second` issues validation work
// and in case validation is successful issues a `StatementDistributionMessage`.
#[test]
fn backing_doesnt_second_wrong_collator() {
let mut test_state = TestState::default();
test_state.availability_cores[0] = CoreState::Scheduled(ScheduledCore {
para_id: ParaId::from(1),
collator: Some(Sr25519Keyring::Bob.public().into()),
});

test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;

test_startup(&mut virtual_overseer, &test_state).await;

let pov = PoV {
block_data: BlockData(vec![42, 43, 44]),
};

let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();

let pov_hash = pov.hash();
let candidate = TestCandidateBuilder {
para_id: test_state.chain_ids[0],
relay_parent: test_state.relay_parent,
pov_hash,
head_data: expected_head_data.clone(),
erasure_root: make_erasure_root(&test_state, pov.clone()),
..Default::default()
}.build();

let second = CandidateBackingMessage::Second(
test_state.relay_parent,
candidate.to_plain(),
pov.clone(),
);

virtual_overseer.send(FromOverseer::Communication{ msg: second }).await;

assert_matches!(
virtual_overseer.recv().await,
AllMessages::CandidateSelection(
CandidateSelectionMessage::Invalid(parent, c)
) if parent == test_state.relay_parent && c == candidate.to_plain() => {
}
);

virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
).await;
});
}

#[test]
fn validation_work_ignores_wrong_collator() {
let mut test_state = TestState::default();
test_state.availability_cores[0] = CoreState::Scheduled(ScheduledCore {
para_id: ParaId::from(1),
collator: Some(Sr25519Keyring::Bob.public().into()),
});

test_harness(test_state.keystore.clone(), |test_harness| async move {
let TestHarness { mut virtual_overseer } = test_harness;

test_startup(&mut virtual_overseer, &test_state).await;

let pov = PoV {
block_data: BlockData(vec![1, 2, 3]),
};

let pov_hash = pov.hash();

let expected_head_data = test_state.head_data.get(&test_state.chain_ids[0]).unwrap();

let candidate_a = TestCandidateBuilder {
para_id: test_state.chain_ids[0],
relay_parent: test_state.relay_parent,
pov_hash,
head_data: expected_head_data.clone(),
erasure_root: make_erasure_root(&test_state, pov.clone()),
..Default::default()
}.build();

let seconding = SignedFullStatement::sign(
Statement::Seconded(candidate_a.clone()),
&test_state.signing_context,
2,
&test_state.validators[2].pair().into(),
);

let statement = CandidateBackingMessage::Statement(
test_state.relay_parent,
seconding.clone(),
);

virtual_overseer.send(FromOverseer::Communication{ msg: statement }).await;

// The statement will be ignored because it has the wrong collator.
virtual_overseer.send(FromOverseer::Signal(
OverseerSignal::ActiveLeaves(ActiveLeavesUpdate::stop_work(test_state.relay_parent)))
).await;
});
}
}