Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Fix bitfield signing (#1466)
Browse files Browse the repository at this point in the history
* Apply suggestions from #1364 code review

- use CoreState, not CoreOccupied
- query for availability chunks, not the whole PoV
- create a stub `fn availability_cores`

* link to issue documenting unimplemented

* implement get_availability_cores by adding a new runtime api request

* back out an unrelated change properly part of #1404

* av-store: handle QueryChunkAvailability

* simplify QueryDataAvailability

* remove extraneous whitespace

* compact primitive imports
  • Loading branch information
coriolinus authored Jul 29, 2020
1 parent 9e5446a commit cdd9394
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 46 deletions.
14 changes: 6 additions & 8 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,16 +149,14 @@ fn process_message(db: &Arc<dyn KeyValueDB>, msg: AvailabilityStoreMessage) -> R
tx.send(available_data(db, &hash).map(|d| d.data)).map_err(|_| oneshot::Canceled)?;
}
QueryDataAvailability(hash, tx) => {
let result = match available_data(db, &hash) {
Some(_) => true,
None => false,
};

tx.send(result).map_err(|_| oneshot::Canceled)?;
tx.send(available_data(db, &hash).is_some()).map_err(|_| oneshot::Canceled)?;
}
QueryChunk(hash, id, tx) => {
tx.send(get_chunk(db, &hash, id)?).map_err(|_| oneshot::Canceled)?;
}
QueryChunkAvailability(hash, id, tx) => {
tx.send(get_chunk(db, &hash, id)?.is_some()).map_err(|_| oneshot::Canceled)?;
}
StoreChunk(hash, id, chunk, tx) => {
match store_chunk(db, &hash, id, chunk) {
Err(e) => {
Expand Down Expand Up @@ -394,7 +392,7 @@ mod tests {

let chunk_msg = AvailabilityStoreMessage::StoreChunk(
relay_parent,
validator_index,
validator_index,
chunk.clone(),
tx,
);
Expand Down Expand Up @@ -436,7 +434,7 @@ mod tests {
global_validation,
local_validation,
};

let available_data = AvailableData {
pov,
omitted_validation,
Expand Down
84 changes: 47 additions & 37 deletions node/core/bitfield-signing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use keystore::KeyStorePtr;
use polkadot_node_subsystem::{
messages::{
self, AllMessages, AvailabilityStoreMessage, BitfieldDistributionMessage,
BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage,
BitfieldSigningMessage, CandidateBackingMessage, RuntimeApiMessage, RuntimeApiError
},
util::{self, JobManager, JobTrait, ToJobTrait, Validator},
};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash};
use polkadot_primitives::v1::{AvailabilityBitfield, CoreState, Hash, ValidatorIndex};
use std::{convert::TryFrom, pin::Pin, time::Duration};
use wasm_timer::{Delay, Instant};

Expand Down Expand Up @@ -125,16 +125,21 @@ pub enum Error {
/// several errors collected into one
#[from]
Multiple(Vec<Error>),
/// the runtime API failed to return what we wanted
#[from]
Runtime(RuntimeApiError),
}

// this function exists mainly to collect a bunch of potential error points into one.
// if there is a candidate pending availability, query the Availability Store
// for whether we have the availability chunk for our validator index.
async fn get_core_availability(
relay_parent: Hash,
core: CoreState,
validator_idx: ValidatorIndex,
sender: &mpsc::Sender<FromJob>,
) -> Result<bool, Error> {
use messages::{
AvailabilityStoreMessage::QueryDataAvailability,
AvailabilityStoreMessage::QueryChunkAvailability,
RuntimeApiRequest::CandidatePendingAvailability,
};
use FromJob::{AvailabilityStore, RuntimeApi};
Expand Down Expand Up @@ -163,8 +168,9 @@ async fn get_core_availability(
};
let (tx, rx) = oneshot::channel();
sender
.send(AvailabilityStore(QueryDataAvailability(
.send(AvailabilityStore(QueryChunkAvailability(
committed_candidate_receipt.descriptor.pov_hash,
validator_idx,
tx,
)))
.await?;
Expand All @@ -173,43 +179,42 @@ async fn get_core_availability(
Ok(false)
}

// the way this function works is not intuitive:
//
// - get the availability cores so we have a list of cores, in order.
// - for each occupied core, fetch `candidate_pending_availability` from runtime
// - from there, we can get the `CandidateDescriptor`
// - from there, we can send a `AvailabilityStore::QueryPoV` and set the indexed bit to 1 if it returns Some(_)
// delegates to the v1 runtime API
async fn get_availability_cores(relay_parent: Hash, sender: &mut mpsc::Sender<FromJob>) -> Result<Vec<CoreState>, Error> {
use FromJob::RuntimeApi;
use messages::{
RuntimeApiMessage::Request,
RuntimeApiRequest::AvailabilityCores,
};

let (tx, rx) = oneshot::channel();
sender.send(RuntimeApi(Request(relay_parent, AvailabilityCores(tx)))).await?;
match rx.await {
Ok(Ok(out)) => Ok(out),
Ok(Err(runtime_err)) => Err(runtime_err.into()),
Err(err) => Err(err.into())
}
}

// - get the list of core states from the runtime
// - for each core, concurrently determine chunk availability (see `get_core_availability`)
// - return the bitfield if there were no errors at any point in this process
// (otherwise, it's prone to false negatives)
async fn construct_availability_bitfield(
relay_parent: Hash,
validator_idx: ValidatorIndex,
sender: &mut mpsc::Sender<FromJob>,
) -> Result<Option<AvailabilityBitfield>, Error> {
) -> Result<AvailabilityBitfield, Error> {
use futures::lock::Mutex;

use messages::RuntimeApiRequest::AvailabilityCores;
use FromJob::RuntimeApi;
use RuntimeApiMessage::Request;

// request the availability cores metadata from runtime.
let (tx, rx) = oneshot::channel();
sender
.send(RuntimeApi(Request(relay_parent, AvailabilityCores(tx))))
.await?;
// get the set of availability cores from the runtime
let availability_cores = get_availability_cores(relay_parent, sender).await?;

// we now need sender to be immutable so we can copy the reference to multiple concurrent closures
let sender = &*sender;

// wait for the cores
let availability_cores = match rx.await? {
Ok(a) => a,
Err(e) => {
log::warn!(target: "bitfield_signing", "Encountered a runtime API error: {:?}", e);
return Ok(None);
}
};

// prepare outputs
let out =
Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len()));
let out = Mutex::new(bitvec!(bitvec::order::Lsb0, u8; 0; availability_cores.len()));
// in principle, we know that we never want concurrent access to the _same_ bit within the vec;
// we could `let out_ref = out.as_mut_ptr();` here instead, and manually assign bits, avoiding
// any need to ever wait to lock this mutex.
Expand All @@ -225,7 +230,7 @@ async fn construct_availability_bitfield(
// we need the mutexes and explicit references above.
stream::iter(availability_cores.into_iter().enumerate())
.for_each_concurrent(None, |(idx, core)| async move {
let availability = match get_core_availability(relay_parent, core, sender).await {
let availability = match get_core_availability(relay_parent, core, validator_idx, sender).await {
Ok(availability) => availability,
Err(err) => {
errs_ref.lock().await.push(err);
Expand All @@ -238,7 +243,7 @@ async fn construct_availability_bitfield(

let errs = errs.into_inner();
if errs.is_empty() {
Ok(Some(out.into_inner().into()))
Ok(out.into_inner().into())
} else {
Err(errs.into())
}
Expand Down Expand Up @@ -275,10 +280,15 @@ impl JobTrait for BitfieldSigningJob {
Delay::new_at(wait_until).await?;

let bitfield =
match construct_availability_bitfield(relay_parent, &mut sender).await?
match construct_availability_bitfield(relay_parent, validator.index(), &mut sender).await
{
None => return Ok(()),
Some(b) => b,
Err(Error::Runtime(runtime_err)) => {
// Don't take down the node on runtime API errors.
log::warn!(target: "bitfield_signing", "Encountered a runtime API error: {:?}", runtime_err);
return Ok(());
}
Err(err) => return Err(err),
Ok(bitfield) => bitfield,
};

let signed_bitfield = validator.sign(bitfield);
Expand Down
10 changes: 9 additions & 1 deletion node/subsystem/src/messages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,14 +242,21 @@ pub enum AvailabilityStoreMessage {

/// Query whether a `AvailableData` exists within the AV Store.
///
/// This is useful in cases like bitfield signing, when existence
/// This is useful in cases when existence
/// matters, but we don't want to necessarily pass around multiple
/// megabytes of data to get a single bit of information.
QueryDataAvailability(Hash, oneshot::Sender<bool>),

/// Query an `ErasureChunk` from the AV store.
QueryChunk(Hash, ValidatorIndex, oneshot::Sender<Option<ErasureChunk>>),

/// Query whether an `ErasureChunk` exists within the AV Store.
///
/// This is useful in cases like bitfield signing, when existence
/// matters, but we don't want to necessarily pass around large
/// quantities of data to get a single bit of information.
QueryChunkAvailability(Hash, ValidatorIndex, oneshot::Sender<bool>),

/// Store an `ErasureChunk` in the AV store.
///
/// Return `Ok(())` if the store operation succeeded, `Err(())` if it failed.
Expand All @@ -269,6 +276,7 @@ impl AvailabilityStoreMessage {
Self::QueryAvailableData(hash, _) => Some(*hash),
Self::QueryDataAvailability(hash, _) => Some(*hash),
Self::QueryChunk(hash, _, _) => Some(*hash),
Self::QueryChunkAvailability(hash, _, _) => Some(*hash),
Self::StoreChunk(hash, _, _, _) => Some(*hash),
Self::StoreAvailableData(hash, _, _, _, _) => Some(*hash),
}
Expand Down

0 comments on commit cdd9394

Please sign in to comment.