From 2c1dd30f50fb517a1631a0f6e9c1006eec388b5b Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Mon, 12 Jul 2021 18:10:39 -0400 Subject: [PATCH 1/5] node/dispute-coordinator: Modify db to return SubsystemResult. In preparation of moving to the overlayed backend pattern, this commit moves the db to return SubsystemResult values. --- node/core/dispute-coordinator/src/db/v1.rs | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/node/core/dispute-coordinator/src/db/v1.rs b/node/core/dispute-coordinator/src/db/v1.rs index 7ec30d51c1ce..92d3e4f52f2a 100644 --- a/node/core/dispute-coordinator/src/db/v1.rs +++ b/node/core/dispute-coordinator/src/db/v1.rs @@ -20,6 +20,7 @@ use polkadot_primitives::v1::{ CandidateReceipt, ValidDisputeStatementKind, InvalidDisputeStatementKind, ValidatorIndex, ValidatorSignature, SessionIndex, CandidateHash, Hash, }; +use polkadot_node_subsystem::{SubsystemResult, SubsystemError}; use kvdb::{KeyValueDB, DBTransaction}; use parity_scale_codec::{Encode, Decode}; @@ -134,24 +135,27 @@ pub(crate) fn load_candidate_votes( config: &ColumnConfiguration, session: SessionIndex, candidate_hash: &CandidateHash, -) -> Result> { +) -> SubsystemResult> { load_decode(db, config.col_data, &candidate_votes_key(session, candidate_hash)) + .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e)) } /// Load the earliest session, if any. pub(crate) fn load_earliest_session( db: &dyn KeyValueDB, config: &ColumnConfiguration, -) -> Result> { +) -> SubsystemResult> { load_decode(db, config.col_data, EARLIEST_SESSION_KEY) + .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e)) } /// Load the recent disputes, if any. pub(crate) fn load_recent_disputes( db: &dyn KeyValueDB, config: &ColumnConfiguration, -) -> Result> { +) -> SubsystemResult> { load_decode(db, config.col_data, RECENT_DISPUTES_KEY) + .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e)) } /// An atomic transaction to be commited to the underlying DB. @@ -203,7 +207,7 @@ impl Transaction { } /// Write the transaction atomically to the DB. - pub(crate) fn write(self, db: &dyn KeyValueDB, config: &ColumnConfiguration) -> Result<()> { + pub(crate) fn write(self, db: &dyn KeyValueDB, config: &ColumnConfiguration) -> SubsystemResult<()> { let mut tx = DBTransaction::new(); if let Some(s) = self.earliest_session { @@ -238,7 +242,7 @@ pub(crate) fn note_current_session( store: &dyn KeyValueDB, config: &ColumnConfiguration, current_session: SessionIndex, -) -> Result<()> { +) -> SubsystemResult<()> { let new_earliest = current_session.saturating_sub(DISPUTE_WINDOW); let mut tx = Transaction::default(); From 67674dbef3add5fa5841b817c7729b15c85002eb Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Mon, 12 Jul 2021 18:12:27 -0400 Subject: [PATCH 2/5] node/dispute-coordinator: Add the Backend and OverlayedBackend. This commit adds the backend and overlayed backend structs to the dispute-coordinator subsystem. --- node/core/dispute-coordinator/src/backend.rs | 160 +++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 node/core/dispute-coordinator/src/backend.rs diff --git a/node/core/dispute-coordinator/src/backend.rs b/node/core/dispute-coordinator/src/backend.rs new file mode 100644 index 000000000000..684b8af273f8 --- /dev/null +++ b/node/core/dispute-coordinator/src/backend.rs @@ -0,0 +1,160 @@ +use polkadot_primitives::v1::{CandidateHash, SessionIndex}; +use polkadot_node_subsystem::SubsystemResult; + +use std::collections::HashMap; + +use super::db::v1::{RecentDisputes, CandidateVotes}; + +#[derive(Debug)] +pub enum BackendWriteOp { + WriteEarliestSession(SessionIndex), + WriteRecentDisputes(RecentDisputes), + WriteCandidateVotes(SessionIndex, CandidateHash, CandidateVotes), + DeleteCandidateVotes(SessionIndex, CandidateHash), +} + +/// An abstraction over backend storage for the logic of this subsystem. +pub trait Backend { + /// Load the earliest session, if any. + fn load_earliest_session(&self) -> SubsystemResult>; + + /// Load the recent disputes, if any. + fn load_recent_disputes(&self) -> SubsystemResult>; + + /// Load the candidate votes for the specific session-candidate pair, if any. + fn load_candidate_votes( + &self, + session: SessionIndex, + candidate_hash: &CandidateHash, + ) -> SubsystemResult>; + + /// Atomically writes the list of operations, with later operations taking precedence over + /// prior. + fn write(&mut self, ops: I) -> SubsystemResult<()> + where I: IntoIterator; +} + +/// An in-memory overllay for the backend. +/// +/// This maintains read-only access to the underlying backend, but can be converted into a set of +/// write operations which will, when written to the underlying backend, give the same view as the +/// state of the overlay. +pub struct OverlayedBackend<'a, B: 'a> { + inner: &'a B, + + // `None` means unchanged. + earliest_session: Option, + // `None` means unchanged. + recent_disputes: Option, + // `None` means deleted, missing means query inner. + candidate_votes: HashMap<(SessionIndex, CandidateHash), Option>, +} + +impl<'a, B: 'a + Backend> OverlayedBackend<'a, B> { + pub fn new(backend: &'a B) -> Self { + Self { + inner: backend, + earliest_session: None, + recent_disputes: None, + candidate_votes: HashMap::new(), + } + } + + /// Returns true if the are no write operations to perform. + pub fn is_empty(&self) -> bool { + self.earliest_session.is_none() && + self.recent_disputes.is_none() && + self.candidate_votes.is_empty() + } + + /// Load the earliest session, if any. + pub fn load_earliest_session(&self) -> SubsystemResult> { + if let Some(val) = self.earliest_session { + return Ok(Some(val)) + } + + self.inner.load_earliest_session() + } + + /// Load the recent disputes, if any. + pub fn load_recent_disputes(&self) -> SubsystemResult> { + if let Some(val) = &self.recent_disputes { + return Ok(Some(val.clone())) + } + + self.inner.load_recent_disputes() + } + + /// Load the candidate votes for the specific session-candidate pair, if any. + pub fn load_candidate_votes( + &self, + session: SessionIndex, + candidate_hash: &CandidateHash + ) -> SubsystemResult> { + if let Some(val) = self.candidate_votes.get(&(session, *candidate_hash)) { + return Ok(val.clone()) + } + + self.inner.load_candidate_votes(session, candidate_hash) + } + + /// Prepare a write to the 'earliest session' field of the DB. + /// + /// Later calls to this function will override earlier ones. + pub fn write_earliest_session(&mut self, session: SessionIndex) { + self.earliest_session = Some(session); + } + + /// Prepare a write to the recent disputes stored in the DB. + /// + /// Later calls to this function will override earlier ones. + pub fn write_recent_disputes(&mut self, recent_disputes: RecentDisputes) { + self.recent_disputes = Some(recent_disputes) + } + + /// Prepare a write of the candidate votes under the indicated candidate. + /// + /// Later calls to this function for the same candidate will override earlier ones. + pub fn write_candidate_votes( + &mut self, + session: SessionIndex, + candidate_hash: CandidateHash, + votes: CandidateVotes + ) { + self.candidate_votes.insert((session, candidate_hash), Some(votes)); + } + + /// Prepare a deletion of the candidate votes under the indicated candidate. + /// + /// Later calls to this function for the same candidate will override earlier ones. + pub fn delete_candidate_votes( + &mut self, + session: SessionIndex, + candidate_hash: CandidateHash, + ) { + self.candidate_votes.insert((session, candidate_hash), None); + } + + /// Transform this backend into a set of write-ops to be written to the inner backend. + pub fn into_write_ops(self) -> impl Iterator { + let earliest_session_ops = self.earliest_session + .map(|s| BackendWriteOp::WriteEarliestSession(s)) + .into_iter(); + + let recent_dispute_ops = self.recent_disputes + .map(|d| BackendWriteOp::WriteRecentDisputes(d)) + .into_iter(); + + let candidate_vote_ops = self.candidate_votes + .into_iter() + .map(|((session, candidate), votes)| match votes { + Some(votes) => BackendWriteOp::WriteCandidateVotes(session, candidate, votes), + None => BackendWriteOp::DeleteCandidateVotes(session, candidate), + }); + + earliest_session_ops + .chain(recent_dispute_ops) + .chain(candidate_vote_ops) + + } +} From f3f0cb886fb0d58c38a6058946464d2b00191065 Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Mon, 12 Jul 2021 18:13:58 -0400 Subject: [PATCH 3/5] node/dispute-coordinator: Implement backend and overlayed-backend. This commit finalizes the move from the previous transactional model to the common overlay pattern in subsystem persistency. This can be observed in the ApprovalVoting and ChainSelection subsystems. --- node/core/dispute-coordinator/src/db/v1.rs | 539 ++++++++++----------- node/core/dispute-coordinator/src/lib.rs | 215 ++++---- node/core/dispute-coordinator/src/tests.rs | 3 +- 3 files changed, 352 insertions(+), 405 deletions(-) diff --git a/node/core/dispute-coordinator/src/db/v1.rs b/node/core/dispute-coordinator/src/db/v1.rs index 92d3e4f52f2a..d3f859e7d641 100644 --- a/node/core/dispute-coordinator/src/db/v1.rs +++ b/node/core/dispute-coordinator/src/db/v1.rs @@ -22,15 +22,94 @@ use polkadot_primitives::v1::{ }; use polkadot_node_subsystem::{SubsystemResult, SubsystemError}; +use std::sync::Arc; + use kvdb::{KeyValueDB, DBTransaction}; use parity_scale_codec::{Encode, Decode}; use crate::{DISPUTE_WINDOW, DisputeStatus}; +use crate::backend::{Backend, BackendWriteOp, OverlayedBackend}; const RECENT_DISPUTES_KEY: &[u8; 15] = b"recent-disputes"; const EARLIEST_SESSION_KEY: &[u8; 16] = b"earliest-session"; const CANDIDATE_VOTES_SUBKEY: &[u8; 15] = b"candidate-votes"; +pub struct DbBackend { + inner: Arc, + config: ColumnConfiguration, +} + +impl DbBackend { + pub fn new(db: Arc, config: ColumnConfiguration) -> Self { + Self { + inner: db, + config, + } + } +} + +impl Backend for DbBackend { + /// Load the earliest session, if any. + fn load_earliest_session(&self) -> SubsystemResult> { + load_earliest_session(&*self.inner, &self.config) + } + + /// Load the recent disputes, if any. + fn load_recent_disputes(&self) -> SubsystemResult> { + load_recent_disputes(&*self.inner, &self.config) + } + + /// Load the candidate votes for the specific session-candidate pair, if any. + fn load_candidate_votes( + &self, + session: SessionIndex, + candidate_hash: &CandidateHash, + ) -> SubsystemResult> { + load_candidate_votes(&*self.inner, &self.config, session, candidate_hash) + } + + /// Atomically writes the list of operations, with later operations taking precedence over + /// prior. + fn write(&mut self, ops: I) -> SubsystemResult<()> + where I: IntoIterator + { + let mut tx = DBTransaction::new(); + for op in ops { + match op { + BackendWriteOp::WriteEarliestSession(session) => { + tx.put_vec( + self.config.col_data, + EARLIEST_SESSION_KEY, + session.encode(), + ); + } + BackendWriteOp::WriteRecentDisputes(recent_disputes) => { + tx.put_vec( + self.config.col_data, + RECENT_DISPUTES_KEY, + recent_disputes.encode(), + ); + } + BackendWriteOp::WriteCandidateVotes(session, candidate_hash, votes) => { + tx.put_vec( + self.config.col_data, + &candidate_votes_key(session, &candidate_hash), + votes.encode(), + ); + } + BackendWriteOp::DeleteCandidateVotes(session, candidate_hash) => { + tx.delete( + self.config.col_data, + &candidate_votes_key(session, &candidate_hash), + ); + } + } + } + + self.inner.write(tx).map_err(Into::into) + } +} + fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) -> [u8; 15 + 4 + 32] { let mut buf = [0u8; 15 + 4 + 32]; buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY); @@ -42,29 +121,6 @@ fn candidate_votes_key(session: SessionIndex, candidate_hash: &CandidateHash) -> buf } -// Computes the upper lexicographic bound on DB keys for candidate votes with a given -// upper-exclusive bound on sessions. -fn candidate_votes_range_upper_bound(upper_exclusive: SessionIndex) -> [u8; 15 + 4] { - let mut buf = [0; 15 + 4]; - buf[..15].copy_from_slice(CANDIDATE_VOTES_SUBKEY); - // big-endian encoding is used to ensure lexicographic ordering. - buf[15..][..4].copy_from_slice(&upper_exclusive.to_be_bytes()); - - buf -} - -fn decode_candidate_votes_key(key: &[u8]) -> Option<(SessionIndex, CandidateHash)> { - if key.len() != 15 + 4 + 32 { - return None; - } - - let mut session_buf = [0; 4]; - session_buf.copy_from_slice(&key[15..][..4]); - let session = SessionIndex::from_be_bytes(session_buf); - - CandidateHash::decode(&mut &key[(15 + 4)..]).ok().map(|hash| (session, hash)) -} - /// Column configuration information for the DB. #[derive(Debug, Clone)] pub struct ColumnConfiguration { @@ -129,7 +185,7 @@ fn load_decode(db: &dyn KeyValueDB, col_data: u32, key: &[u8]) } } -/// Load the candidate votes for the identified candidate under the given hash. +/// Load the candidate votes for the specific session-candidate pair, if any. pub(crate) fn load_candidate_votes( db: &dyn KeyValueDB, config: &ColumnConfiguration, @@ -158,78 +214,6 @@ pub(crate) fn load_recent_disputes( .map_err(|e| SubsystemError::with_origin("dispute-coordinator", e)) } -/// An atomic transaction to be commited to the underlying DB. -#[derive(Debug, Default, Clone)] -pub(crate) struct Transaction { - earliest_session: Option, - recent_disputes: Option, - write_candidate_votes: Vec<(SessionIndex, CandidateHash, CandidateVotes)>, - delete_candidate_votes: Vec<(SessionIndex, CandidateHash)>, -} - -impl Transaction { - /// Prepare a write to the 'earliest session' field of the DB. - /// - /// Later calls to this function will override earlier ones. - pub(crate) fn put_earliest_session(&mut self, session: SessionIndex) { - self.earliest_session = Some(session); - } - - /// Prepare a write to the recent disputes stored in the DB. - /// - /// Later calls to this function will override earlier ones. - pub(crate) fn put_recent_disputes(&mut self, recent_disputes: RecentDisputes) { - self.recent_disputes = Some(recent_disputes); - } - - /// Prepare a write of the candidate votes under the indicated candidate. - /// - /// Later calls to this function for the same candidate will override earlier ones. - /// Any calls to this function will be overridden by deletions of the same candidate. - pub(crate) fn put_candidate_votes( - &mut self, - session: SessionIndex, - candidate_hash: CandidateHash, - votes: CandidateVotes, - ) { - self.write_candidate_votes.push((session, candidate_hash, votes)) - } - - /// Prepare a deletion of the candidate votes under the indicated candidate. - /// - /// Any calls to this function will override writes to the same candidate. - pub(crate) fn delete_candidate_votes( - &mut self, - session: SessionIndex, - candidate_hash: CandidateHash, - ) { - self.delete_candidate_votes.push((session, candidate_hash)) - } - - /// Write the transaction atomically to the DB. - pub(crate) fn write(self, db: &dyn KeyValueDB, config: &ColumnConfiguration) -> SubsystemResult<()> { - let mut tx = DBTransaction::new(); - - if let Some(s) = self.earliest_session { - tx.put_vec(config.col_data, EARLIEST_SESSION_KEY, s.encode()); - } - - if let Some(a) = self.recent_disputes { - tx.put_vec(config.col_data, RECENT_DISPUTES_KEY, a.encode()); - } - - for (session, candidate_hash, votes) in self.write_candidate_votes { - tx.put_vec(config.col_data, &candidate_votes_key(session, &candidate_hash), votes.encode()); - } - - for (session, candidate_hash) in self.delete_candidate_votes { - tx.delete(config.col_data, &candidate_votes_key(session, &candidate_hash)); - } - - db.write(tx).map_err(Into::into) - } -} - /// Maybe prune data in the DB based on the provided session index. /// /// This is intended to be called on every block, and as such will be used to populate the DB on @@ -239,57 +223,46 @@ impl Transaction { /// If one or more ancient sessions are pruned, all metadata on candidates within the ancient /// session will be deleted. pub(crate) fn note_current_session( - store: &dyn KeyValueDB, - config: &ColumnConfiguration, + overlay_db: &mut OverlayedBackend<'_, impl Backend>, current_session: SessionIndex, ) -> SubsystemResult<()> { let new_earliest = current_session.saturating_sub(DISPUTE_WINDOW); - let mut tx = Transaction::default(); - - match load_earliest_session(store, config)? { + match overlay_db.load_earliest_session()? { None => { // First launch - write new-earliest. - tx.put_earliest_session(new_earliest); + overlay_db.write_earliest_session(new_earliest); } Some(prev_earliest) if new_earliest > prev_earliest => { // Prune all data in the outdated sessions. - tx.put_earliest_session(new_earliest); + overlay_db.write_earliest_session(new_earliest); // Clear recent disputes metadata. { - let mut recent_disputes = load_recent_disputes(store, config)?.unwrap_or_default(); + let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default(); let lower_bound = ( new_earliest, CandidateHash(Hash::repeat_byte(0x00)), ); - let prev_len = recent_disputes.len(); - recent_disputes = recent_disputes.split_off(&lower_bound); + let new_recent_disputes = recent_disputes.split_off(&lower_bound); + // Any remanining disputes are considered ancient and must be pruned. + let pruned_disputes = recent_disputes; - if recent_disputes.len() != prev_len { - tx.put_recent_disputes(recent_disputes); + if pruned_disputes.len() != 0 { + overlay_db.write_recent_disputes(new_recent_disputes); + for ((session, candidate_hash), _) in pruned_disputes { + overlay_db.delete_candidate_votes(session, candidate_hash); + } } } - - // Clear all candidate data with session less than the new earliest kept. - { - let end_prefix = candidate_votes_range_upper_bound(new_earliest); - - store.iter_with_prefix(config.col_data, CANDIDATE_VOTES_SUBKEY) - .take_while(|(k, _)| &k[..] < &end_prefix[..]) - .filter_map(|(k, _)| decode_candidate_votes_key(&k[..])) - .for_each(|(session, candidate_hash)| { - tx.delete_candidate_votes(session, candidate_hash); - }); - } } Some(_) => { // nothing to do. } - }; + } - tx.write(store, config) + Ok(()) } #[cfg(test)] @@ -297,151 +270,150 @@ mod tests { use super::*; use polkadot_primitives::v1::{Hash, Id as ParaId}; - #[test] - fn candidate_votes_key_works() { - let session = 4; - let candidate = CandidateHash(Hash::repeat_byte(0x01)); - - let key = candidate_votes_key(session, &candidate); - - assert_eq!(&key[0..15], CANDIDATE_VOTES_SUBKEY); - assert_eq!(&key[15..19], &[0x00, 0x00, 0x00, 0x04]); - assert_eq!(&key[19..51], candidate.0.as_bytes()); - - assert_eq!( - decode_candidate_votes_key(&key[..]), - Some((session, candidate)), - ); + fn make_db() -> DbBackend { + let store = Arc::new(kvdb_memorydb::create(1)); + let config = ColumnConfiguration { col_data: 0 }; + DbBackend::new(store, config) } #[test] - fn db_transaction() { - let store = kvdb_memorydb::create(1); - let config = ColumnConfiguration { col_data: 0 }; - - { - let mut tx = Transaction::default(); - - tx.put_earliest_session(0); - tx.put_earliest_session(1); + fn overlay_pre_and_post_commit_consistency() { + let mut backend = make_db(); + + let mut overlay_db = OverlayedBackend::new(&backend); + + overlay_db.write_earliest_session(0); + overlay_db.write_earliest_session(1); + + overlay_db.write_recent_disputes(vec![ + ((0, CandidateHash(Hash::repeat_byte(0))), DisputeStatus::Active), + ].into_iter().collect()); + + overlay_db.write_recent_disputes(vec![ + ((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active), + ].into_iter().collect()); + + overlay_db.write_candidate_votes( + 1, + CandidateHash(Hash::repeat_byte(1)), + CandidateVotes { + candidate_receipt: Default::default(), + valid: Vec::new(), + invalid: Vec::new(), + }, + ); + overlay_db.write_candidate_votes( + 1, + CandidateHash(Hash::repeat_byte(1)), + CandidateVotes { + candidate_receipt: { + let mut receipt = CandidateReceipt::default(); + receipt.descriptor.para_id = 5.into(); + + receipt + }, + valid: Vec::new(), + invalid: Vec::new(), + }, + ); - tx.put_recent_disputes(vec![ - ((0, CandidateHash(Hash::repeat_byte(0))), DisputeStatus::Active), - ].into_iter().collect()); + // Test that overlay returns the correct values before committing. + assert_eq!( + overlay_db.load_earliest_session().unwrap().unwrap(), + 1, + ); - tx.put_recent_disputes(vec![ + assert_eq!( + overlay_db.load_recent_disputes().unwrap().unwrap(), + vec![ ((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active), - ].into_iter().collect()); + ].into_iter().collect() + ); - tx.put_candidate_votes( - 1, - CandidateHash(Hash::repeat_byte(1)), - CandidateVotes { - candidate_receipt: Default::default(), - valid: Vec::new(), - invalid: Vec::new(), - }, - ); - tx.put_candidate_votes( + assert_eq!( + overlay_db.load_candidate_votes( 1, - CandidateHash(Hash::repeat_byte(1)), - CandidateVotes { - candidate_receipt: { - let mut receipt = CandidateReceipt::default(); - receipt.descriptor.para_id = 5.into(); - - receipt - }, - valid: Vec::new(), - invalid: Vec::new(), - }, - ); + &CandidateHash(Hash::repeat_byte(1)) + ).unwrap().unwrap().candidate_receipt.descriptor.para_id, + ParaId::from(5), + ); - tx.write(&store, &config).unwrap(); - } + let write_ops = overlay_db.into_write_ops(); + backend.write(write_ops).unwrap(); // Test that subsequent writes were written. - { - assert_eq!( - load_earliest_session(&store, &config).unwrap().unwrap(), + assert_eq!( + backend.load_earliest_session().unwrap().unwrap(), + 1, + ); + + assert_eq!( + backend.load_recent_disputes().unwrap().unwrap(), + vec![ + ((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active), + ].into_iter().collect() + ); + + assert_eq!( + backend.load_candidate_votes( 1, - ); - - assert_eq!( - load_recent_disputes(&store, &config).unwrap().unwrap(), - vec![ - ((1, CandidateHash(Hash::repeat_byte(1))), DisputeStatus::Active), - ].into_iter().collect() - ); - - assert_eq!( - load_candidate_votes( - &store, - &config, - 1, - &CandidateHash(Hash::repeat_byte(1)) - ).unwrap().unwrap().candidate_receipt.descriptor.para_id, - ParaId::from(5), - ); - } + &CandidateHash(Hash::repeat_byte(1)) + ).unwrap().unwrap().candidate_receipt.descriptor.para_id, + ParaId::from(5), + ); } #[test] - fn db_deletes_supersede_writes() { - let store = kvdb_memorydb::create(1); - let config = ColumnConfiguration { col_data: 0 }; - - { - let mut tx = Transaction::default(); - tx.put_candidate_votes( - 1, - CandidateHash(Hash::repeat_byte(1)), - CandidateVotes { - candidate_receipt: Default::default(), - valid: Vec::new(), - invalid: Vec::new(), - } - ); + fn overlay_preserves_candidate_votes_operation_order() { + let mut backend = make_db(); + + let mut overlay_db = OverlayedBackend::new(&backend); + overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1))); + + overlay_db.write_candidate_votes( + 1, + CandidateHash(Hash::repeat_byte(1)), + CandidateVotes { + candidate_receipt: Default::default(), + valid: Vec::new(), + invalid: Vec::new(), + } + ); - tx.write(&store, &config).unwrap(); - } + let write_ops = overlay_db.into_write_ops(); + backend.write(write_ops).unwrap(); assert_eq!( - load_candidate_votes( - &store, - &config, + backend.load_candidate_votes( 1, &CandidateHash(Hash::repeat_byte(1)) ).unwrap().unwrap().candidate_receipt.descriptor.para_id, ParaId::from(0), ); - { - let mut tx = Transaction::default(); - tx.put_candidate_votes( - 1, - CandidateHash(Hash::repeat_byte(1)), - CandidateVotes { - candidate_receipt: { - let mut receipt = CandidateReceipt::default(); - receipt.descriptor.para_id = 5.into(); - - receipt - }, - valid: Vec::new(), - invalid: Vec::new(), - } - ); + let mut overlay_db = OverlayedBackend::new(&backend); + overlay_db.write_candidate_votes( + 1, + CandidateHash(Hash::repeat_byte(1)), + CandidateVotes { + candidate_receipt: { + let mut receipt = CandidateReceipt::default(); + receipt.descriptor.para_id = 5.into(); - tx.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1))); + receipt + }, + valid: Vec::new(), + invalid: Vec::new(), + } + ); - tx.write(&store, &config).unwrap(); - } + overlay_db.delete_candidate_votes(1, CandidateHash(Hash::repeat_byte(1))); + + let write_ops = overlay_db.into_write_ops(); + backend.write(write_ops).unwrap(); assert!( - load_candidate_votes( - &store, - &config, + backend.load_candidate_votes( 1, &CandidateHash(Hash::repeat_byte(1)) ).unwrap().is_none() @@ -450,8 +422,7 @@ mod tests { #[test] fn note_current_session_prunes_old() { - let store = kvdb_memorydb::create(1); - let config = ColumnConfiguration { col_data: 0 }; + let mut backend = make_db(); let hash_a = CandidateHash(Hash::repeat_byte(0x0a)); let hash_b = CandidateHash(Hash::repeat_byte(0x0b)); @@ -472,61 +443,61 @@ mod tests { invalid: Vec::new(), }; - { - let mut tx = Transaction::default(); - tx.put_earliest_session(prev_earliest_session); - tx.put_recent_disputes(vec![ - ((very_old, hash_a), DisputeStatus::Active), - ((slightly_old, hash_b), DisputeStatus::Active), - ((new_earliest_session, hash_c), DisputeStatus::Active), - ((very_recent, hash_d), DisputeStatus::Active), - ].into_iter().collect()); - - tx.put_candidate_votes( - very_old, - hash_a, - blank_candidate_votes(), - ); - - tx.put_candidate_votes( - slightly_old, - hash_b, - blank_candidate_votes(), - ); - - tx.put_candidate_votes( - new_earliest_session, - hash_c, - blank_candidate_votes(), - ); - - tx.put_candidate_votes( - very_recent, - hash_d, - blank_candidate_votes(), - ); - - tx.write(&store, &config).unwrap(); - } + let mut overlay_db = OverlayedBackend::new(&backend); + overlay_db.write_earliest_session(prev_earliest_session); + overlay_db.write_recent_disputes(vec![ + ((very_old, hash_a), DisputeStatus::Active), + ((slightly_old, hash_b), DisputeStatus::Active), + ((new_earliest_session, hash_c), DisputeStatus::Active), + ((very_recent, hash_d), DisputeStatus::Active), + ].into_iter().collect()); + + overlay_db.write_candidate_votes( + very_old, + hash_a, + blank_candidate_votes(), + ); + + overlay_db.write_candidate_votes( + slightly_old, + hash_b, + blank_candidate_votes(), + ); + + overlay_db.write_candidate_votes( + new_earliest_session, + hash_c, + blank_candidate_votes(), + ); + + overlay_db.write_candidate_votes( + very_recent, + hash_d, + blank_candidate_votes(), + ); + + let write_ops = overlay_db.into_write_ops(); + backend.write(write_ops).unwrap(); - note_current_session(&store, &config, current_session).unwrap(); + let mut overlay_db = OverlayedBackend::new(&backend); + note_current_session(&mut overlay_db, current_session).unwrap(); assert_eq!( - load_earliest_session(&store, &config).unwrap(), + overlay_db.load_earliest_session().unwrap(), Some(new_earliest_session), ); assert_eq!( - load_recent_disputes(&store, &config).unwrap().unwrap(), + overlay_db.load_recent_disputes().unwrap().unwrap(), vec![ ((new_earliest_session, hash_c), DisputeStatus::Active), ((very_recent, hash_d), DisputeStatus::Active), ].into_iter().collect(), ); - assert!(load_candidate_votes(&store, &config, very_old, &hash_a).unwrap().is_none()); - assert!(load_candidate_votes(&store, &config, slightly_old, &hash_b).unwrap().is_none()); - assert!(load_candidate_votes(&store, &config, new_earliest_session, &hash_c).unwrap().is_some()); - assert!(load_candidate_votes(&store, &config, very_recent, &hash_d).unwrap().is_some()); + assert!(overlay_db.load_candidate_votes(very_old, &hash_a).unwrap().is_none()); + assert!(overlay_db.load_candidate_votes(slightly_old, &hash_b).unwrap().is_none()); + assert!(overlay_db.load_candidate_votes(new_earliest_session, &hash_c).unwrap().is_some()); + assert!(overlay_db.load_candidate_votes(very_recent, &hash_d).unwrap().is_some()); } } diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index c311529acd8b..f2d72f3e7011 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -52,9 +52,11 @@ use kvdb::KeyValueDB; use parity_scale_codec::{Encode, Decode, Error as CodecError}; use sc_keystore::LocalKeystore; -use db::v1::RecentDisputes; +use db::v1::{RecentDisputes, DbBackend}; +use backend::{Backend, OverlayedBackend}; mod db; +mod backend; #[cfg(test)] mod tests; @@ -112,7 +114,8 @@ where Context: overseer::SubsystemContext, { fn start(self, ctx: Context) -> SpawnedSubsystem { - let future = run(self, ctx, Box::new(SystemClock)) + let backend = DbBackend::new(self.store.clone(), self.config.column_config()); + let future = run(self, ctx, backend, Box::new(SystemClock)) .map(|_| Ok(())) .boxed(); @@ -262,17 +265,19 @@ impl DisputeStatus { } } -async fn run( +async fn run( subsystem: DisputeCoordinatorSubsystem, mut ctx: Context, + mut backend: B, clock: Box, ) where Context: overseer::SubsystemContext, - Context: SubsystemContext + Context: SubsystemContext, + B: Backend, { loop { - let res = run_iteration(&mut ctx, &subsystem, &*clock).await; + let res = run_iteration(&mut ctx, &subsystem, &mut backend, &*clock).await; match res { Err(e) => { e.trace(); @@ -294,24 +299,25 @@ where // // A return value of `Ok` indicates that an exit should be made, while non-fatal errors // lead to another call to this function. -async fn run_iteration( +async fn run_iteration( ctx: &mut Context, subsystem: &DisputeCoordinatorSubsystem, + backend: &mut B, clock: &dyn Clock, -) - -> Result<(), Error> +) -> Result<(), Error> where Context: overseer::SubsystemContext, - Context: SubsystemContext + Context: SubsystemContext, + B: Backend, { - let DisputeCoordinatorSubsystem { ref store, ref keystore, ref config } = *subsystem; let mut state = State { - keystore: keystore.clone(), + keystore: subsystem.keystore.clone(), highest_session: None, rolling_session_window: RollingSessionWindow::new(DISPUTE_WINDOW), }; loop { + let mut overlay_db = OverlayedBackend::new(backend); match ctx.recv().await? { FromOverseer::Signal(OverseerSignal::Conclude) => { return Ok(()) @@ -319,9 +325,8 @@ where FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { handle_new_activations( ctx, - &**store, + &mut overlay_db, &mut state, - config, update.activated.into_iter().map(|a| a.hash), ).await? } @@ -329,22 +334,25 @@ where FromOverseer::Communication { msg } => { handle_incoming( ctx, - &**store, + &mut overlay_db, &mut state, - config, msg, clock.now(), ).await? } } + + if !overlay_db.is_empty() { + let ops = overlay_db.into_write_ops(); + backend.write(ops)?; + } } } async fn handle_new_activations( ctx: &mut (impl SubsystemContext + overseer::SubsystemContext), - store: &dyn KeyValueDB, + overlay_db: &mut OverlayedBackend<'_, impl Backend>, state: &mut State, - config: &Config, new_activations: impl IntoIterator, ) -> Result<(), Error> { for new_leaf in new_activations { @@ -388,11 +396,7 @@ async fn handle_new_activations( state.highest_session = Some(session); - db::v1::note_current_session( - store, - &config.column_config(), - session, - )?; + db::v1::note_current_session(overlay_db, session)?; } } _ => {} @@ -404,9 +408,8 @@ async fn handle_new_activations( async fn handle_incoming( ctx: &mut impl SubsystemContext, - store: &dyn KeyValueDB, + overlay_db: &mut OverlayedBackend<'_, impl Backend>, state: &mut State, - config: &Config, message: DisputeCoordinatorMessage, now: Timestamp, ) -> Result<(), Error> { @@ -420,9 +423,8 @@ async fn handle_incoming( } => { handle_import_statements( ctx, - store, + overlay_db, state, - config, candidate_hash, candidate_receipt, session, @@ -432,15 +434,11 @@ async fn handle_incoming( ).await?; } DisputeCoordinatorMessage::RecentDisputes(rx) => { - let recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())? - .unwrap_or_default(); - + let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default(); let _ = rx.send(recent_disputes.keys().cloned().collect()); } DisputeCoordinatorMessage::ActiveDisputes(rx) => { - let recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())? - .unwrap_or_default(); - + let recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default(); let _ = rx.send(collect_active(recent_disputes, now)); } DisputeCoordinatorMessage::QueryCandidateVotes( @@ -448,9 +446,7 @@ async fn handle_incoming( candidate_hash, rx ) => { - let candidate_votes = db::v1::load_candidate_votes( - store, - &config.column_config(), + let candidate_votes = overlay_db.load_candidate_votes( session, &candidate_hash, )?; @@ -466,8 +462,7 @@ async fn handle_incoming( issue_local_statement( ctx, state, - store, - config, + overlay_db, candidate_hash, candidate_receipt, session, @@ -481,8 +476,7 @@ async fn handle_incoming( tx, } => { let undisputed_chain = determine_undisputed_chain( - store, - &config, + overlay_db, base_number, block_descriptions )?; @@ -519,9 +513,8 @@ fn insert_into_statement_vec( async fn handle_import_statements( ctx: &mut impl SubsystemContext, - store: &dyn KeyValueDB, + overlay_db: &mut OverlayedBackend<'_, impl Backend>, state: &mut State, - config: &Config, candidate_hash: CandidateHash, candidate_receipt: CandidateReceipt, session: SessionIndex, @@ -554,12 +547,7 @@ async fn handle_import_statements( let supermajority_threshold = polkadot_primitives::v1::supermajority_threshold(n_validators); - let mut votes = db::v1::load_candidate_votes( - store, - &config.column_config(), - session, - &candidate_hash - )? + let mut votes = overlay_db.load_candidate_votes(session, &candidate_hash)? .map(CandidateVotes::from) .unwrap_or_else(|| CandidateVotes { candidate_receipt: candidate_receipt.clone(), @@ -608,86 +596,79 @@ async fn handle_import_statements( let concluded_valid = votes.valid.len() >= supermajority_threshold; let concluded_invalid = votes.invalid.len() >= supermajority_threshold; - let mut recent_disputes = db::v1::load_recent_disputes(store, &config.column_config())? - .unwrap_or_default(); + let mut recent_disputes = overlay_db.load_recent_disputes()?.unwrap_or_default(); - { // Scope so we will only confirm valid import after the import got actually persisted. - let mut tx = db::v1::Transaction::default(); + let prev_status = recent_disputes.get(&(session, candidate_hash)).map(|x| x.clone()); - let prev_status = recent_disputes.get(&(session, candidate_hash)).map(|x| x.clone()); + let status = if is_disputed { + let status = recent_disputes + .entry((session, candidate_hash)) + .or_insert(DisputeStatus::active()); - let status = if is_disputed { - let status = recent_disputes - .entry((session, candidate_hash)) - .or_insert(DisputeStatus::active()); - - // Note: concluded-invalid overwrites concluded-valid, - // so we do this check first. Dispute state machine is - // non-commutative. - if concluded_valid { - *status = status.concluded_for(now); - } - - if concluded_invalid { - *status = status.concluded_against(now); - } + // Note: concluded-invalid overwrites concluded-valid, + // so we do this check first. Dispute state machine is + // non-commutative. + if concluded_valid { + *status = status.concluded_for(now); + } - Some(*status) - } else { - None - }; + if concluded_invalid { + *status = status.concluded_against(now); + } - if status != prev_status { - // Only write when updated. - tx.put_recent_disputes(recent_disputes); + Some(*status) + } else { + None + }; - // This branch is only hit when the candidate is freshly disputed - - // status was previously `None`, and now is not. - if prev_status.is_none() { - // No matter what, if the dispute is new, we participate. + if status != prev_status { + // This branch is only hit when the candidate is freshly disputed - + // status was previously `None`, and now is not. + if prev_status.is_none() { + // No matter what, if the dispute is new, we participate. + // + // We also block the coordinator while awaiting our determination + // of whether the vote is available. + let (report_availability, receive_availability) = oneshot::channel(); + ctx.send_message(DisputeParticipationMessage::Participate { + candidate_hash, + candidate_receipt, + session, + n_validators: n_validators as u32, + report_availability, + }).await; + + if !receive_availability.await.map_err(Error::Oneshot)? { + // If the data is not available, we disregard the dispute votes. + // This is an indication that the dispute does not correspond to any included + // candidate and that it should be ignored. // - // We also block the coordinator while awaiting our determination - // of whether the vote is available. - let (report_availability, receive_availability) = oneshot::channel(); - ctx.send_message(DisputeParticipationMessage::Participate { - candidate_hash, - candidate_receipt, - session, - n_validators: n_validators as u32, - report_availability, - }).await; - - if !receive_availability.await.map_err(Error::Oneshot)? { - // If the data is not available, we disregard the dispute votes. - // This is an indication that the dispute does not correspond to any included - // candidate and that it should be ignored. - // - // We expect that if the candidate is truly disputed that the higher-level network - // code will retry. - pending_confirmation.send(ImportStatementsResult::InvalidImport) - .map_err(|_| Error::OneshotSend)?; - - tracing::debug!( - target: LOG_TARGET, - "Recovering availability failed - invalid import." - ); - return Ok(()) - } + // We expect that if the candidate is truly disputed that the higher-level network + // code will retry. + pending_confirmation.send(ImportStatementsResult::InvalidImport) + .map_err(|_| Error::OneshotSend)?; + + tracing::debug!( + target: LOG_TARGET, + "Recovering availability failed - invalid import." + ); + return Ok(()) } } - tx.put_candidate_votes(session, candidate_hash, votes.into()); - tx.write(store, &config.column_config())?; + // Only write when updated and vote is available. + overlay_db.write_recent_disputes(recent_disputes); } + overlay_db.write_candidate_votes(session, candidate_hash, votes.into()); + Ok(()) } async fn issue_local_statement( ctx: &mut impl SubsystemContext, state: &mut State, - store: &dyn KeyValueDB, - config: &Config, + overlay_db: &mut OverlayedBackend<'_, impl Backend>, candidate_hash: CandidateHash, candidate_receipt: CandidateReceipt, session: SessionIndex, @@ -710,12 +691,7 @@ async fn issue_local_statement( let validators = info.validators.clone(); - let votes = db::v1::load_candidate_votes( - store, - &config.column_config(), - session, - &candidate_hash - )? + let votes = overlay_db.load_candidate_votes(session, &candidate_hash)? .map(CandidateVotes::from) .unwrap_or_else(|| CandidateVotes { candidate_receipt: candidate_receipt.clone(), @@ -783,9 +759,8 @@ async fn issue_local_statement( let (pending_confirmation, _rx) = oneshot::channel(); handle_import_statements( ctx, - store, + overlay_db, state, - config, candidate_hash, candidate_receipt, session, @@ -853,16 +828,16 @@ fn make_dispute_message( } fn determine_undisputed_chain( - store: &dyn KeyValueDB, - config: &Config, + overlay_db: &mut OverlayedBackend<'_, impl Backend>, base_number: BlockNumber, block_descriptions: Vec<(Hash, SessionIndex, Vec)>, ) -> Result, Error> { let last = block_descriptions.last() + .map(|e| (base_number + block_descriptions.len() as BlockNumber, e.0)); // Fast path for no disputes. - let recent_disputes = match db::v1::load_recent_disputes(store, &config.column_config())? { + let recent_disputes = match overlay_db.load_recent_disputes()? { None => return Ok(last), Some(a) if a.is_empty() => return Ok(last), Some(a) => a, diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index ed5a7da062a7..5e65cbeff9da 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -250,7 +250,8 @@ fn test_harness(test: F) state.subsystem_keystore.clone(), ); - let subsystem_task = run(subsystem, ctx, Box::new(state.clock.clone())); + let backend = DbBackend::new(state.db.clone(), state.config.column_config()); + let subsystem_task = run(subsystem, ctx, backend, Box::new(state.clock.clone())); let test_task = test(state, ctx_handle); futures::executor::block_on(future::join(subsystem_task, test_task)); From ced0b5c907225dc5e945fb4322f4378b8847fb57 Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Mon, 12 Jul 2021 19:53:41 -0400 Subject: [PATCH 4/5] Add module docs + license --- node/core/dispute-coordinator/src/backend.rs | 23 ++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/node/core/dispute-coordinator/src/backend.rs b/node/core/dispute-coordinator/src/backend.rs index 684b8af273f8..35c36f986c91 100644 --- a/node/core/dispute-coordinator/src/backend.rs +++ b/node/core/dispute-coordinator/src/backend.rs @@ -1,3 +1,26 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! An abstraction over storage used by the chain selection subsystem. +//! +//! This provides both a [`Backend`] trait and an [`OverlayedBackend`] +//! struct which allows in-memory changes to be applied on top of a +//! [`Backend`], maintaining consistency between queries and temporary writes, +//! before any commit to the underlying storage is made. + use polkadot_primitives::v1::{CandidateHash, SessionIndex}; use polkadot_node_subsystem::SubsystemResult; From ebe2cb1df7e2aaf211f3b71fd8c507c621188e23 Mon Sep 17 00:00:00 2001 From: Lldenaurois Date: Mon, 12 Jul 2021 22:30:19 -0400 Subject: [PATCH 5/5] Touchup merge --- node/core/dispute-coordinator/src/lib.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 1dcb4382660f..2cc155a51e99 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -447,9 +447,7 @@ async fn handle_incoming( ) => { let mut query_output = Vec::new(); for (session_index, candidate_hash) in query.into_iter() { - if let Some(v) = db::v1::load_candidate_votes( - store, - &config.column_config(), + if let Some(v) = overlay_db.load_candidate_votes( session_index, &candidate_hash, )? {