Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Keep the table router for the lifetime of the validation instance alive #1175

Merged
merged 1 commit into from
Jun 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions network/src/protocol/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,10 @@ impl ProtocolHandler {
}

fn on_connect(&mut self, peer: PeerId, role: ObservedRole) {
let claimed_validator = matches!(role, ObservedRole::OurSentry | ObservedRole::OurGuardedAuthority | ObservedRole::Authority);
let claimed_validator = matches!(
role,
ObservedRole::OurSentry | ObservedRole::OurGuardedAuthority | ObservedRole::Authority
);

self.peers.insert(peer.clone(), PeerData {
claimed_validator,
Expand Down Expand Up @@ -978,7 +981,11 @@ impl<Api, Sp, Gossip> Worker<Api, Sp, Gossip> where
);
}
ServiceToWorkerMsg::AwaitCollation(relay_parent, para_id, sender) => {
debug!(target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}", para_id, relay_parent);
debug!(
target: "p_net", "Attempting to get collation for parachain {:?} on relay parent {:?}",
para_id,
relay_parent,
);
self.protocol_handler.await_collation(relay_parent, para_id, sender)
}
ServiceToWorkerMsg::NoteBadCollator(collator) => {
Expand Down
3 changes: 3 additions & 0 deletions validation/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ pub enum Error {
/// Proposer destroyed before finishing proposing or evaluating
#[display(fmt = "Proposer destroyed before finishing proposing or evaluating")]
PrematureDestruction,
/// Failed to build the table router.
#[display(fmt = "Failed to build the table router: {}", _0)]
CouldNotBuildTableRouter(String),
/// Timer failed
#[display(fmt = "Timer failed: {}", _0)]
Timer(std::io::Error),
Expand Down
81 changes: 47 additions & 34 deletions validation/src/validation_service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
relay_parent,
&keystore,
max_block_data_size,
));
).await);
}
Message::NotifyImport(notification) => {
let relay_parent = notification.hash;
Expand All @@ -217,7 +217,7 @@ impl<C, N, P, SC, SP> ServiceBuilder<C, N, P, SC, SP> where
relay_parent,
&keystore,
max_block_data_size,
);
).await;

if let Err(e) = res {
warn!(
Expand Down Expand Up @@ -299,8 +299,17 @@ fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option<Arc
.map(|pair| Arc::new(pair))
}

/// A live instance that is related to a relay chain validation round.
///
/// It stores the `instance_handle` and the `_table_router`.
struct LiveInstance<TR> {
instance_handle: ValidationInstanceHandle,
/// Make sure we keep the table router alive, to respond/receive consensus messages.
_table_router: TR,
}

/// Constructs parachain-agreement instances.
pub(crate) struct ParachainValidationInstances<N, P, SP, CF> {
pub(crate) struct ParachainValidationInstances<N: Network, P, SP, CF> {
/// The client instance.
client: Arc<P>,
/// The backing network handle.
Expand All @@ -311,7 +320,7 @@ pub(crate) struct ParachainValidationInstances<N, P, SP, CF> {
availability_store: AvailabilityStore,
/// Live agreements. Maps relay chain parent hashes to attestation
/// instances.
live_instances: HashMap<Hash, ValidationInstanceHandle>,
live_instances: HashMap<Hash, LiveInstance<N::TableRouter>>,
/// The underlying validation pool of processes to use.
/// Only `None` in tests.
validation_pool: Option<ValidationPool>,
Expand Down Expand Up @@ -339,16 +348,16 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
///
/// Additionally, this will trigger broadcast of data to the new block's duty
/// roster.
fn get_or_instantiate(
async fn get_or_instantiate(
&mut self,
parent_hash: Hash,
keystore: &KeyStorePtr,
max_block_data_size: Option<u64>,
) -> Result<ValidationInstanceHandle, Error> {
use primitives::Pair;

if let Some(tracker) = self.live_instances.get(&parent_hash) {
return Ok(tracker.clone());
if let Some(instance) = self.live_instances.get(&parent_hash) {
return Ok(instance.instance_handle.clone());
}

let id = BlockId::hash(parent_hash);
Expand Down Expand Up @@ -417,49 +426,51 @@ impl<N, P, SP, CF> ParachainValidationInstances<N, P, SP, CF> where
self.validation_pool.clone(),
));

let build_router = self.network.build_table_router(
// The router will join the consensus gossip network. This is important
// to receive messages sent for the current round.
let router = match self.network.build_table_router(
table.clone(),
&validators,
);

let availability_store = self.availability_store.clone();
let client = self.client.clone();
let collation_fetch = self.collation_fetch.clone();

let res = self.spawner.spawn(async move {
// It is important that we build the router as it launches tasks internally
// that are required to receive gossip messages.
let router = match build_router.await {
Ok(res) => res,
Err(e) => {
warn!(target: "validation", "Failed to build router: {:?}", e);
return
}
};
).await {
Ok(res) => res,
Err(e) => {
warn!(target: "validation", "Failed to build router: {:?}", e);
return Err(Error::CouldNotBuildTableRouter(format!("{:?}", e)))
}
};

if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) {
let n_validators = validators.len();
if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) {
let n_validators = validators.len();
let availability_store = self.availability_store.clone();
let client = self.client.clone();
let collation_fetch = self.collation_fetch.clone();
let router = router.clone();

let res = self.spawner.spawn(
launch_work(
move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators),
availability_store,
router,
n_validators,
index,
).await;
}
});
),
);

if let Err(e) = res {
error!(target: "validation", "Failed to create router and launch work: {:?}", e);
if let Err(e) = res {
error!(target: "validation", "Failed to launch work: {:?}", e);
}
}

let tracker = ValidationInstanceHandle {
table,
started: Instant::now(),
};

self.live_instances.insert(parent_hash, tracker.clone());
let live_instance = LiveInstance {
instance_handle: tracker.clone(),
_table_router: router,
};
self.live_instances.insert(parent_hash, live_instance);

Ok(tracker)
}
Expand Down Expand Up @@ -721,8 +732,9 @@ mod tests {
validation_pool: None,
};

parachain_validation.get_or_instantiate(Default::default(), &keystore, None)
executor::block_on(parachain_validation.get_or_instantiate(Default::default(), &keystore, None))
.expect("Creates new validation round");
assert!(parachain_validation.live_instances.contains_key(&Default::default()));

let mut events = executor::block_on_stream(events);

Expand Down Expand Up @@ -760,8 +772,9 @@ mod tests {
validation_pool: None,
};

parachain_validation.get_or_instantiate(Default::default(), &keystore, None)
executor::block_on(parachain_validation.get_or_instantiate(Default::default(), &keystore, None))
.expect("Creates new validation round");
assert!(parachain_validation.live_instances.contains_key(&Default::default()));

let mut events = executor::block_on_stream(events);

Expand Down