From 7691ae28a4b27c77f59f1196470503ff73f34694 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sat, 4 Sep 2021 12:14:08 -0500 Subject: [PATCH 01/13] Requeue UMP queue items if weight exhausted --- runtime/parachains/src/ump.rs | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index 40f1a5a3acda..9ea7eac0d2c7 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . + use crate::{ configuration::{self, HostConfiguration}, initializer, @@ -21,7 +22,7 @@ use crate::{ use frame_support::pallet_prelude::*; use primitives::v1::{Id as ParaId, UpwardMessage}; use sp_std::{ - collections::{btree_map::BTreeMap, vec_deque::VecDeque}, + collections::{btree_map::{BTreeMap, Entry}, vec_deque::VecDeque}, convert::TryFrom, fmt, marker::PhantomData, @@ -417,9 +418,10 @@ impl Pallet { ) { Ok(used) => weight_used += used, Err((id, required)) => { - // we process messages in order and don't drop them if we run out of weight, so need to break - // here. + // we process messages in order and don't drop them if we run out of weight, + // so need to break here and requeue the message we took off the queue. Self::deposit_event(Event::WeightExhausted(id, max_weight, required)); + queue_cache.requeue::(dispatchee, upward_message); break }, } @@ -460,6 +462,7 @@ impl Pallet { /// This struct is not supposed to be dropped but rather to be consumed by [`flush`]. struct QueueCache(BTreeMap); +#[derive(Default)] struct QueueCacheEntry { queue: VecDeque, count: u32, @@ -493,6 +496,22 @@ impl QueueCache { (upward_message, became_empty) } + /// Places a message back on to a given parachain's UMP queue. + /// + /// - `para`: The ID of the parachain. + /// - `message`: The message to put back on the front of the queue. + fn requeue(&mut self, para: ParaId, message: UpwardMessage) { + match self.0.entry(para) { + Entry::Occupied(entry) => { + let cache_entry = entry.into_mut(); + cache_entry.count += 1; + cache_entry.total_size += message.len() as u32; + cache_entry.queue.push_front(message); + }, + _ => (), + } + } + /// Flushes the updated queues into the storage. fn flush(self) { // NOTE we use an explicit method here instead of Drop impl because it has unwanted semantics From 5bad864aae007e496e42fa9ac3ce5630eb7cbfa6 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sat, 4 Sep 2021 12:45:37 -0500 Subject: [PATCH 02/13] Reduce complexity and remove Deque --- runtime/parachains/src/ump.rs | 93 ++++++++++++++++------------------- 1 file changed, 43 insertions(+), 50 deletions(-) diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index 9ea7eac0d2c7..f10e91f2bc02 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -22,7 +22,7 @@ use crate::{ use frame_support::pallet_prelude::*; use primitives::v1::{Id as ParaId, UpwardMessage}; use sp_std::{ - collections::{btree_map::{BTreeMap, Entry}, vec_deque::VecDeque}, + collections::btree_map::BTreeMap, convert::TryFrom, fmt, marker::PhantomData, @@ -212,7 +212,7 @@ pub mod pallet { /// The messages are processed in FIFO order. #[pallet::storage] pub type RelayDispatchQueues = - StorageMap<_, Twox64Concat, ParaId, VecDeque, ValueQuery>; + StorageMap<_, Twox64Concat, ParaId, Vec, ValueQuery>; /// Size of the dispatch queues. Caches sizes of the queues in `RelayDispatchQueue`. /// @@ -409,8 +409,7 @@ impl Pallet { }; // dequeue the next message from the queue of the dispatchee - let (upward_message, became_empty) = queue_cache.dequeue::(dispatchee); - if let Some(upward_message) = upward_message { + if let Some(upward_message) = queue_cache.peek_front::(dispatchee) { match T::UmpSink::process_upward_message( dispatchee, &upward_message[..], @@ -421,11 +420,11 @@ impl Pallet { // we process messages in order and don't drop them if we run out of weight, // so need to break here and requeue the message we took off the queue. Self::deposit_event(Event::WeightExhausted(id, max_weight, required)); - queue_cache.requeue::(dispatchee, upward_message); break }, } } + let became_empty = queue_cache.consume_front::(dispatchee); if became_empty { // the queue is empty now - this para doesn't need attention anymore. @@ -462,11 +461,11 @@ impl Pallet { /// This struct is not supposed to be dropped but rather to be consumed by [`flush`]. struct QueueCache(BTreeMap); -#[derive(Default)] struct QueueCacheEntry { - queue: VecDeque, - count: u32, + queue: Vec, total_size: u32, + consumed_count: usize, + consumed_size: usize, } impl QueueCache { @@ -474,42 +473,33 @@ impl QueueCache { Self(BTreeMap::new()) } - /// Dequeues one item from the upward message queue of the given para. - /// - /// Returns `(upward_message, became_empty)`, where - /// - /// - `upward_message` a dequeued message or `None` if the queue _was_ empty. - /// - `became_empty` is true if the queue _became_ empty. - fn dequeue(&mut self, para: ParaId) -> (Option, bool) { - let cache_entry = self.0.entry(para).or_insert_with(|| { + fn ensure_cached(&mut self, para: ParaId) -> &mut QueueCacheEntry { + self.0.entry(para).or_insert_with(|| { let queue = as Store>::RelayDispatchQueues::get(¶); - let (count, total_size) = as Store>::RelayDispatchQueueSize::get(¶); - QueueCacheEntry { queue, count, total_size } - }); - let upward_message = cache_entry.queue.pop_front(); - if let Some(ref msg) = upward_message { - cache_entry.count -= 1; - cache_entry.total_size -= msg.len() as u32; - } - - let became_empty = cache_entry.queue.is_empty(); - (upward_message, became_empty) + let (_, total_size) = as Store>::RelayDispatchQueueSize::get(¶); + QueueCacheEntry { queue, total_size, consumed_count: 0, consumed_size: 0 } + }) } - /// Places a message back on to a given parachain's UMP queue. + /// Returns the message at the front of `para`'s queue, or `None` if the queue is empty. /// - /// - `para`: The ID of the parachain. - /// - `message`: The message to put back on the front of the queue. - fn requeue(&mut self, para: ParaId, message: UpwardMessage) { - match self.0.entry(para) { - Entry::Occupied(entry) => { - let cache_entry = entry.into_mut(); - cache_entry.count += 1; - cache_entry.total_size += message.len() as u32; - cache_entry.queue.push_front(message); - }, - _ => (), + /// Does not mutate the queue. + fn peek_front(&mut self, para: ParaId) -> Option<&UpwardMessage> { + let entry = self.ensure_cached::(para); + entry.queue.get(entry.consumed_count) + } + + /// Removes one message from the front of `para`'s queue. Returns `true` iff there are no more + /// messages in the queue. + fn consume_front(&mut self, para: ParaId) -> bool { + let cache_entry = self.ensure_cached::(para); + let upward_message = cache_entry.queue.get(cache_entry.consumed_count); + if let Some(msg) = upward_message { + cache_entry.consumed_count += 1; + cache_entry.consumed_size += msg.len(); } + + cache_entry.consumed_count >= cache_entry.queue.len() } /// Flushes the updated queues into the storage. @@ -517,14 +507,17 @@ impl QueueCache { // NOTE we use an explicit method here instead of Drop impl because it has unwanted semantics // within runtime. It is dangerous to use because of double-panics and flushing on a panic // is not necessary as well. - for (para, QueueCacheEntry { queue, count, total_size }) in self.0 { - if queue.is_empty() { + for (para, e) in self.0 { + let QueueCacheEntry { queue, total_size, consumed_count, consumed_size } = e; + if consumed_count >= queue.len() { // remove the entries altogether. as Store>::RelayDispatchQueues::remove(¶); as Store>::RelayDispatchQueueSize::remove(¶); } else { - as Store>::RelayDispatchQueues::insert(¶, queue); - as Store>::RelayDispatchQueueSize::insert(¶, (count, total_size)); + as Store>::RelayDispatchQueues::insert(¶, &queue[consumed_count..]); + let count = (queue.len() - consumed_count) as u32; + let size = (total_size as usize - consumed_size) as u32; + as Store>::RelayDispatchQueueSize::insert(¶, (count, size)); } } } @@ -624,7 +617,7 @@ pub(crate) mod mock_sink { use super::{MessageId, ParaId, UmpSink, UpwardMessage}; use frame_support::weights::Weight; - use std::{cell::RefCell, collections::vec_deque::VecDeque}; + use std::{cell::RefCell, vec::Vec}; #[derive(Debug)] struct UmpExpectation { @@ -635,7 +628,7 @@ pub(crate) mod mock_sink { std::thread_local! { // `Some` here indicates that there is an active probe. - static HOOK: RefCell>> = RefCell::new(None); + static HOOK: RefCell>> = RefCell::new(None); } pub struct MockUmpSink; @@ -649,9 +642,9 @@ pub(crate) mod mock_sink { .with(|opt_hook| { opt_hook.borrow_mut().as_mut().map(|hook| { let UmpExpectation { expected_origin, expected_msg, mock_weight } = - match hook.pop_front() { - Some(expectation) => expectation, - None => { + match hook.is_empty() { + false => hook.remove(0), + true => { panic!( "The probe is active but didn't expect the message:\n\n\t{:?}.", actual_msg, @@ -674,7 +667,7 @@ pub(crate) mod mock_sink { impl Probe { pub fn new() -> Self { HOOK.with(|opt_hook| { - let prev = opt_hook.borrow_mut().replace(VecDeque::default()); + let prev = opt_hook.borrow_mut().replace(Vec::default()); // that can trigger if there were two probes were created during one session which // is may be a bit strict, but may save time figuring out what's wrong. @@ -695,7 +688,7 @@ pub(crate) mod mock_sink { mock_weight: Weight, ) { HOOK.with(|opt_hook| { - opt_hook.borrow_mut().as_mut().unwrap().push_back(UmpExpectation { + opt_hook.borrow_mut().as_mut().unwrap().push(UmpExpectation { expected_origin, expected_msg, mock_weight, From 19100dc827c87f13d87fa6d4cc0ff4c3bfbd4224 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sat, 4 Sep 2021 12:47:17 -0500 Subject: [PATCH 03/13] Formatting --- runtime/parachains/src/ump.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index f10e91f2bc02..82e8bf7b76a3 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -14,7 +14,6 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . - use crate::{ configuration::{self, HostConfiguration}, initializer, @@ -22,11 +21,7 @@ use crate::{ use frame_support::pallet_prelude::*; use primitives::v1::{Id as ParaId, UpwardMessage}; use sp_std::{ - collections::btree_map::BTreeMap, - convert::TryFrom, - fmt, - marker::PhantomData, - prelude::*, + collections::btree_map::BTreeMap, convert::TryFrom, fmt, marker::PhantomData, prelude::*, }; use xcm::latest::Outcome; @@ -482,7 +477,7 @@ impl QueueCache { } /// Returns the message at the front of `para`'s queue, or `None` if the queue is empty. - /// + /// /// Does not mutate the queue. fn peek_front(&mut self, para: ParaId) -> Option<&UpwardMessage> { let entry = self.ensure_cached::(para); From 1aa9a1b1bcba79f2aeac42a3ae5357d945bf417d Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sat, 4 Sep 2021 13:04:27 -0500 Subject: [PATCH 04/13] Formatting --- runtime/parachains/src/ump.rs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index 82e8bf7b76a3..0e06910222e4 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -404,13 +404,17 @@ impl Pallet { }; // dequeue the next message from the queue of the dispatchee - if let Some(upward_message) = queue_cache.peek_front::(dispatchee) { + let maybe_next = queue_cache.peek_front::(dispatchee); + let became_empty = if let Some(upward_message) = maybe_next { match T::UmpSink::process_upward_message( dispatchee, &upward_message[..], max_weight, ) { - Ok(used) => weight_used += used, + Ok(used) => { + weight_used += used; + queue_cache.consume_front::(dispatchee) + }, Err((id, required)) => { // we process messages in order and don't drop them if we run out of weight, // so need to break here and requeue the message we took off the queue. @@ -418,8 +422,9 @@ impl Pallet { break }, } - } - let became_empty = queue_cache.consume_front::(dispatchee); + } else { + true + }; if became_empty { // the queue is empty now - this para doesn't need attention anymore. @@ -484,8 +489,10 @@ impl QueueCache { entry.queue.get(entry.consumed_count) } - /// Removes one message from the front of `para`'s queue. Returns `true` iff there are no more - /// messages in the queue. + /// Attempts to remove one message from the front of `para`'s queue. If the queue is empty, then + /// does nothing. + /// + /// Returns `true` iff there are no more messages in the queue after the removal attempt. fn consume_front(&mut self, para: ParaId) -> bool { let cache_entry = self.ensure_cached::(para); let upward_message = cache_entry.queue.get(cache_entry.consumed_count); From bf4e7f9af53a27cc961489a501a66f3d3963b18a Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sat, 4 Sep 2021 13:11:46 -0500 Subject: [PATCH 05/13] Avoid needless storage writes --- runtime/parachains/src/ump.rs | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index 0e06910222e4..826166bbfb5a 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -417,12 +417,14 @@ impl Pallet { }, Err((id, required)) => { // we process messages in order and don't drop them if we run out of weight, - // so need to break here and requeue the message we took off the queue. + // so need to break here without calling `consume_front`. Self::deposit_event(Event::WeightExhausted(id, max_weight, required)); break }, } } else { + // this should never happen, since the cursor should never point to an empty queue. + // it is resolved harmlessly here anyway. true }; @@ -443,8 +445,8 @@ impl Pallet { /// To avoid constant fetching, deserializing and serialization the queues are cached. /// -/// After an item dequeued from a queue for the first time, the queue is stored in this struct rather -/// than being serialized and persisted. +/// After an item dequeued from a queue for the first time, the queue is stored in this struct +/// rather than being serialized and persisted. /// /// This implementation works best when: /// @@ -475,8 +477,8 @@ impl QueueCache { fn ensure_cached(&mut self, para: ParaId) -> &mut QueueCacheEntry { self.0.entry(para).or_insert_with(|| { - let queue = as Store>::RelayDispatchQueues::get(¶); - let (_, total_size) = as Store>::RelayDispatchQueueSize::get(¶); + let queue = RelayDispatchQueues::::get(¶); + let (_, total_size) = RelayDispatchQueueSize::::get(¶); QueueCacheEntry { queue, total_size, consumed_count: 0, consumed_size: 0 } }) } @@ -511,15 +513,17 @@ impl QueueCache { // is not necessary as well. for (para, e) in self.0 { let QueueCacheEntry { queue, total_size, consumed_count, consumed_size } = e; - if consumed_count >= queue.len() { - // remove the entries altogether. - as Store>::RelayDispatchQueues::remove(¶); - as Store>::RelayDispatchQueueSize::remove(¶); - } else { - as Store>::RelayDispatchQueues::insert(¶, &queue[consumed_count..]); - let count = (queue.len() - consumed_count) as u32; - let size = (total_size as usize - consumed_size) as u32; - as Store>::RelayDispatchQueueSize::insert(¶, (count, size)); + if consumed_count > 0 { + if consumed_count >= queue.len() { + // remove the entries altogether. + RelayDispatchQueues::::remove(¶); + RelayDispatchQueueSize::::remove(¶); + } else { + RelayDispatchQueues::::insert(¶, &queue[consumed_count..]); + let count = (queue.len() - consumed_count) as u32; + let size = (total_size as usize - consumed_size) as u32; + RelayDispatchQueueSize::::insert(¶, (count, size)); + } } } } From 50105ccf37b682455ac6c521919e7907a720b29c Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sat, 4 Sep 2021 13:58:14 -0500 Subject: [PATCH 06/13] Test --- runtime/parachains/src/ump.rs | 231 ++++++++++++---------------------- 1 file changed, 79 insertions(+), 152 deletions(-) diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index 826166bbfb5a..8e151fb75efc 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -623,18 +623,21 @@ pub(crate) mod mock_sink { use super::{MessageId, ParaId, UmpSink, UpwardMessage}; use frame_support::weights::Weight; - use std::{cell::RefCell, vec::Vec}; - - #[derive(Debug)] - struct UmpExpectation { - expected_origin: ParaId, - expected_msg: UpwardMessage, - mock_weight: Weight, - } + use std::cell::RefCell; + use parity_scale_codec::Decode; std::thread_local! { // `Some` here indicates that there is an active probe. - static HOOK: RefCell>> = RefCell::new(None); + static PROCESSED: RefCell> = RefCell::new(vec![]); + } + + pub fn take_processed() -> Vec<(ParaId, UpwardMessage)> { + PROCESSED.with(|opt_hook| { + let mut r = vec![]; + let mut processed = opt_hook.borrow_mut(); + std::mem::swap(processed.as_mut(), &mut r); + r + }) } pub struct MockUmpSink; @@ -642,98 +645,27 @@ pub(crate) mod mock_sink { fn process_upward_message( actual_origin: ParaId, actual_msg: &[u8], - _max_weight: Weight, + max_weight: Weight, ) -> Result { - Ok(HOOK - .with(|opt_hook| { - opt_hook.borrow_mut().as_mut().map(|hook| { - let UmpExpectation { expected_origin, expected_msg, mock_weight } = - match hook.is_empty() { - false => hook.remove(0), - true => { - panic!( - "The probe is active but didn't expect the message:\n\n\t{:?}.", - actual_msg, - ); - }, - }; - assert_eq!(expected_origin, actual_origin); - assert_eq!(expected_msg, &actual_msg[..]); - mock_weight - }) - }) - .unwrap_or(0)) - } - } - - pub struct Probe { - _private: (), - } - - impl Probe { - pub fn new() -> Self { - HOOK.with(|opt_hook| { - let prev = opt_hook.borrow_mut().replace(Vec::default()); - - // that can trigger if there were two probes were created during one session which - // is may be a bit strict, but may save time figuring out what's wrong. - // if you land here and you do need the two probes in one session consider - // dropping the the existing probe explicitly. - assert!(prev.is_none()); - }); - Self { _private: () } - } - - /// Add an expected message. - /// - /// The enqueued messages are processed in FIFO order. - pub fn assert_msg( - &mut self, - expected_origin: ParaId, - expected_msg: UpwardMessage, - mock_weight: Weight, - ) { - HOOK.with(|opt_hook| { - opt_hook.borrow_mut().as_mut().unwrap().push(UmpExpectation { - expected_origin, - expected_msg, - mock_weight, - }) - }); - } - } - - impl Drop for Probe { - fn drop(&mut self) { - let _ = HOOK.try_with(|opt_hook| { - let prev = opt_hook.borrow_mut().take().expect( - "this probe was created and hasn't been yet destroyed; - the probe cannot be replaced; - there is only one probe at a time allowed; - thus it cannot be `None`; - qed", - ); - - if !prev.is_empty() { - // some messages are left unchecked. We should notify the developer about this. - // however, we do so only if the thread doesn't panic already. Otherwise, the - // developer would get a SIGILL or SIGABRT without a meaningful error message. - if !std::thread::panicking() { - panic!( - "the probe is dropped and not all expected messages arrived: {:?}", - prev - ); - } - } + let weight = match u32::decode(&mut &actual_msg[..]) { + Ok(w) => w as Weight, + Err(_) => return Ok(0), // same as the real `UmpSink` + }; + if weight > max_weight { + let id = sp_io::hashing::blake2_256(actual_msg); + return Err((id, weight)) + } + PROCESSED.with(|opt_hook| { + opt_hook.borrow_mut().push((actual_origin, actual_msg.to_owned())); }); - // an `Err` here signals here that the thread local was already destroyed. + Ok(weight) } } } #[cfg(test)] mod tests { - use super::{mock_sink::Probe, *}; + use super::{mock_sink::take_processed, *}; use crate::mock::{new_test_ext, Configuration, MockGenesisConfig, Ump}; use std::collections::HashSet; @@ -844,15 +776,12 @@ mod tests { #[test] fn dispatch_single_message() { let a = ParaId::from(228); - let msg = vec![1, 2, 3]; + let msg = 1000u32.encode(); new_test_ext(GenesisConfigBuilder::default().build()).execute_with(|| { - let mut probe = Probe::new(); - - probe.assert_msg(a, msg.clone(), 0); - queue_upward_msg(a, msg); - + queue_upward_msg(a, msg.clone()); Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, msg)]); assert_storage_consistency_exhaustive(); }); @@ -864,11 +793,11 @@ mod tests { let c = ParaId::from(228); let q = ParaId::from(911); - let a_msg_1 = vec![1, 2, 3]; - let a_msg_2 = vec![3, 2, 1]; - let c_msg_1 = vec![4, 5, 6]; - let c_msg_2 = vec![9, 8, 7]; - let q_msg = b"we are Q".to_vec(); + let a_msg_1 = (200u32, "a_msg_1").encode(); + let a_msg_2 = (100u32, "a_msg_2").encode(); + let c_msg_1 = (300u32, "c_msg_1").encode(); + let c_msg_2 = (100u32, "c_msg_2").encode(); + let q_msg = (500u32, "q_msg").encode(); new_test_ext( GenesisConfigBuilder { ump_service_total_weight: 500, ..Default::default() }.build(), @@ -882,52 +811,60 @@ mod tests { assert_storage_consistency_exhaustive(); // we expect only two first messages to fit in the first iteration. - { - let mut probe = Probe::new(); - - probe.assert_msg(a, a_msg_1.clone(), 300); - probe.assert_msg(c, c_msg_1.clone(), 300); - Ump::process_pending_upward_messages(); - assert_storage_consistency_exhaustive(); - - drop(probe); - } + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, a_msg_1), (c, c_msg_1)]); + assert_storage_consistency_exhaustive(); queue_upward_msg(c, c_msg_2.clone()); assert_storage_consistency_exhaustive(); // second iteration should process the second message. - { - let mut probe = Probe::new(); + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(q, q_msg)]); + assert_storage_consistency_exhaustive(); - probe.assert_msg(q, q_msg.clone(), 500); - Ump::process_pending_upward_messages(); - assert_storage_consistency_exhaustive(); + // 3rd iteration. + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, a_msg_2), (c, c_msg_2)]); + assert_storage_consistency_exhaustive(); - drop(probe); - } + // finally, make sure that the queue is empty. + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![]); + assert_storage_consistency_exhaustive(); + }); + } - // 3rd iteration. - { - let mut probe = Probe::new(); + #[test] + fn dispatch_keeps_message_after_weight_exhausted() { + let a = ParaId::from(128); - probe.assert_msg(a, a_msg_2.clone(), 100); - probe.assert_msg(c, c_msg_2.clone(), 100); - Ump::process_pending_upward_messages(); - assert_storage_consistency_exhaustive(); + let a_msg_1 = (300u32, "a_msg_1").encode(); + let a_msg_2 = (300u32, "a_msg_2").encode(); - drop(probe); - } + new_test_ext( + GenesisConfigBuilder { ump_service_total_weight: 500, ..Default::default() }.build(), + ) + .execute_with(|| { + queue_upward_msg(a, a_msg_1.clone()); + queue_upward_msg(a, a_msg_2.clone()); - // finally, make sure that the queue is empty. - { - let probe = Probe::new(); + assert_storage_consistency_exhaustive(); - Ump::process_pending_upward_messages(); - assert_storage_consistency_exhaustive(); + // we expect only one message to fit in the first iteration. + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, a_msg_1)]); + assert_storage_consistency_exhaustive(); - drop(probe); - } + // second iteration should process the remaining message. + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, a_msg_2)]); + assert_storage_consistency_exhaustive(); + + // finally, make sure that the queue is empty. + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![]); + assert_storage_consistency_exhaustive(); }); } @@ -936,9 +873,9 @@ mod tests { let a = ParaId::from(1991); let b = ParaId::from(1999); - let a_msg_1 = vec![1, 2, 3]; - let a_msg_2 = vec![3, 2, 1]; - let b_msg_1 = vec![4, 5, 6]; + let a_msg_1 = (300u32, "a_msg_1").encode(); + let a_msg_2 = (300u32, "a_msg_2").encode(); + let b_msg_1 = (300u32, "b_msg_1").encode(); new_test_ext( GenesisConfigBuilder { ump_service_total_weight: 900, ..Default::default() }.build(), @@ -953,18 +890,8 @@ mod tests { queue_upward_msg(a, a_msg_1.clone()); queue_upward_msg(a, a_msg_2.clone()); queue_upward_msg(b, b_msg_1.clone()); - - { - let mut probe = Probe::new(); - - probe.assert_msg(a, a_msg_1.clone(), 300); - probe.assert_msg(b, b_msg_1.clone(), 300); - probe.assert_msg(a, a_msg_2.clone(), 300); - - Ump::process_pending_upward_messages(); - - drop(probe); - } + Ump::process_pending_upward_messages(); + assert_eq!(take_processed(), vec![(a, a_msg_1), (b, b_msg_1), (a, a_msg_2)]); }); } From f493eee6b2ce728ae4e8f1fddec6e00913b731a9 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sat, 4 Sep 2021 13:59:28 -0500 Subject: [PATCH 07/13] Formatting --- runtime/parachains/src/ump.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index 8e151fb75efc..4589782ff81d 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -623,8 +623,8 @@ pub(crate) mod mock_sink { use super::{MessageId, ParaId, UmpSink, UpwardMessage}; use frame_support::weights::Weight; - use std::cell::RefCell; use parity_scale_codec::Decode; + use std::cell::RefCell; std::thread_local! { // `Some` here indicates that there is an active probe. @@ -649,7 +649,7 @@ pub(crate) mod mock_sink { ) -> Result { let weight = match u32::decode(&mut &actual_msg[..]) { Ok(w) => w as Weight, - Err(_) => return Ok(0), // same as the real `UmpSink` + Err(_) => return Ok(0), // same as the real `UmpSink` }; if weight > max_weight { let id = sp_io::hashing::blake2_256(actual_msg); From 6a184a1d66d05205f0661aeb48d94c306d3434de Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Sat, 4 Sep 2021 15:03:27 -0500 Subject: [PATCH 08/13] Docs and cleanup --- runtime/parachains/src/mock.rs | 55 ++++++++++++++++---- runtime/parachains/src/ump.rs | 92 ++++++---------------------------- 2 files changed, 61 insertions(+), 86 deletions(-) diff --git a/runtime/parachains/src/mock.rs b/runtime/parachains/src/mock.rs index fc45eed0d1a5..6f39a0b0413c 100644 --- a/runtime/parachains/src/mock.rs +++ b/runtime/parachains/src/mock.rs @@ -16,19 +16,15 @@ //! Mocks for all the traits. -use crate::{ - configuration, disputes, dmp, hrmp, inclusion, initializer, paras, paras_inherent, scheduler, - session_info, shared, ump, -}; -use frame_support::{parameter_types, traits::GenesisBuild}; +use crate::{configuration, disputes, dmp, hrmp, inclusion, initializer, paras, paras_inherent, scheduler, session_info, shared, ump::{self, MessageId, UmpSink}, ParaId}; +use frame_support::{parameter_types, traits::GenesisBuild, weights::Weight}; use frame_support_test::TestRandomness; -use primitives::v1::{ - AuthorityDiscoveryId, Balance, BlockNumber, Header, SessionIndex, ValidatorIndex, -}; +use primitives::v1::{AuthorityDiscoveryId, Balance, BlockNumber, Header, SessionIndex, UpwardMessage, ValidatorIndex}; use sp_core::H256; use sp_io::TestExternalities; use sp_runtime::traits::{BlakeTwo256, IdentityLookup}; use std::{cell::RefCell, collections::HashMap}; +use parity_scale_codec::Decode; type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic; type Block = frame_system::mocking::MockBlock; @@ -128,7 +124,7 @@ parameter_types! { impl crate::ump::Config for Test { type Event = Event; - type UmpSink = crate::ump::mock_sink::MockUmpSink; + type UmpSink = TestUmpSink; type FirstMessageFactorPercent = FirstMessageFactorPercent; } @@ -232,6 +228,47 @@ pub fn availability_rewards() -> HashMap { AVAILABILITY_REWARDS.with(|r| r.borrow().clone()) } +std::thread_local! { + // `Some` here indicates that there is an active probe. + static PROCESSED: RefCell> = RefCell::new(vec![]); +} + +/// Return which messages have been processed by `pocess_upward_message` and clear the buffer. +pub fn take_processed() -> Vec<(ParaId, UpwardMessage)> { + PROCESSED.with(|opt_hook| { + let mut r = vec![]; + let mut processed = opt_hook.borrow_mut(); + std::mem::swap(processed.as_mut(), &mut r); + r + }) +} + +/// An implementation of a UMP sink that just records which messages were processed. +/// +/// A message's weight is defined by the first 4 bytes of its data, which we decode into a +/// `u32`. +pub struct TestUmpSink; +impl UmpSink for TestUmpSink { + fn process_upward_message( + actual_origin: ParaId, + actual_msg: &[u8], + max_weight: Weight, + ) -> Result { + let weight = match u32::decode(&mut &actual_msg[..]) { + Ok(w) => w as Weight, + Err(_) => return Ok(0), // same as the real `UmpSink` + }; + if weight > max_weight { + let id = sp_io::hashing::blake2_256(actual_msg); + return Err((id, weight)) + } + PROCESSED.with(|opt_hook| { + opt_hook.borrow_mut().push((actual_origin, actual_msg.to_owned())); + }); + Ok(weight) + } +} + pub struct TestRewardValidators; impl inclusion::RewardValidators for TestRewardValidators { diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index 4589782ff81d..0f8af6614dfe 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -511,19 +511,16 @@ impl QueueCache { // NOTE we use an explicit method here instead of Drop impl because it has unwanted semantics // within runtime. It is dangerous to use because of double-panics and flushing on a panic // is not necessary as well. - for (para, e) in self.0 { - let QueueCacheEntry { queue, total_size, consumed_count, consumed_size } = e; - if consumed_count > 0 { - if consumed_count >= queue.len() { - // remove the entries altogether. - RelayDispatchQueues::::remove(¶); - RelayDispatchQueueSize::::remove(¶); - } else { - RelayDispatchQueues::::insert(¶, &queue[consumed_count..]); - let count = (queue.len() - consumed_count) as u32; - let size = (total_size as usize - consumed_size) as u32; - RelayDispatchQueueSize::::insert(¶, (count, size)); - } + for (para, entry) in self.0 { + if entry.consumed_count >= entry.queue.len() { + // remove the entries altogether. + RelayDispatchQueues::::remove(¶); + RelayDispatchQueueSize::::remove(¶); + } else if entry.consumed_count > 0 { + RelayDispatchQueues::::insert(¶, &entry.queue[entry.consumed_count..]); + let count = (entry.queue.len() - entry.consumed_count) as u32; + let size = (entry.total_size as usize - entry.consumed_size) as u32; + RelayDispatchQueueSize::::insert(¶, (count, size)); } } } @@ -604,71 +601,12 @@ impl NeedsDispatchCursor { } #[cfg(test)] -pub(crate) mod mock_sink { - //! An implementation of a mock UMP sink that allows attaching a probe for mocking the weights - //! and checking the sent messages. - //! - //! A default behavior of the UMP sink is to ignore an incoming message and return 0 weight. - //! - //! A probe can be attached to the mock UMP sink. When attached, the mock sink would consult the - //! probe to check whether the received message was expected and what weight it should return. - //! - //! There are two rules on how to use a probe: - //! - //! 1. There can be only one active probe at a time. Creation of another probe while there is - //! already an active one leads to a panic. The probe is scoped to a thread where it was created. - //! - //! 2. All messages expected by the probe must be received by the time of dropping it. Unreceived - //! messages will lead to a panic while dropping a probe. - - use super::{MessageId, ParaId, UmpSink, UpwardMessage}; - use frame_support::weights::Weight; - use parity_scale_codec::Decode; - use std::cell::RefCell; - - std::thread_local! { - // `Some` here indicates that there is an active probe. - static PROCESSED: RefCell> = RefCell::new(vec![]); - } - - pub fn take_processed() -> Vec<(ParaId, UpwardMessage)> { - PROCESSED.with(|opt_hook| { - let mut r = vec![]; - let mut processed = opt_hook.borrow_mut(); - std::mem::swap(processed.as_mut(), &mut r); - r - }) - } - - pub struct MockUmpSink; - impl UmpSink for MockUmpSink { - fn process_upward_message( - actual_origin: ParaId, - actual_msg: &[u8], - max_weight: Weight, - ) -> Result { - let weight = match u32::decode(&mut &actual_msg[..]) { - Ok(w) => w as Weight, - Err(_) => return Ok(0), // same as the real `UmpSink` - }; - if weight > max_weight { - let id = sp_io::hashing::blake2_256(actual_msg); - return Err((id, weight)) - } - PROCESSED.with(|opt_hook| { - opt_hook.borrow_mut().push((actual_origin, actual_msg.to_owned())); - }); - Ok(weight) - } - } -} - -#[cfg(test)] -mod tests { - use super::{mock_sink::take_processed, *}; - use crate::mock::{new_test_ext, Configuration, MockGenesisConfig, Ump}; +pub (crate) mod tests { + use super::*; + use crate::mock::{new_test_ext, Configuration, MockGenesisConfig, Ump, take_processed}; use std::collections::HashSet; - + use frame_support::weights::Weight; + struct GenesisConfigBuilder { max_upward_message_size: u32, max_upward_message_num_per_candidate: u32, From dd6dfe4aa788e5521d2055ab1529507be6e77ad0 Mon Sep 17 00:00:00 2001 From: Shawn Tabrizi Date: Sat, 4 Sep 2021 17:21:34 -0400 Subject: [PATCH 09/13] fmt --- runtime/parachains/src/mock.rs | 15 +++++++++++---- runtime/parachains/src/ump.rs | 8 ++++---- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/runtime/parachains/src/mock.rs b/runtime/parachains/src/mock.rs index 6f39a0b0413c..f554e1ed0f23 100644 --- a/runtime/parachains/src/mock.rs +++ b/runtime/parachains/src/mock.rs @@ -16,15 +16,22 @@ //! Mocks for all the traits. -use crate::{configuration, disputes, dmp, hrmp, inclusion, initializer, paras, paras_inherent, scheduler, session_info, shared, ump::{self, MessageId, UmpSink}, ParaId}; +use crate::{ + configuration, disputes, dmp, hrmp, inclusion, initializer, paras, paras_inherent, scheduler, + session_info, shared, + ump::{self, MessageId, UmpSink}, + ParaId, +}; use frame_support::{parameter_types, traits::GenesisBuild, weights::Weight}; use frame_support_test::TestRandomness; -use primitives::v1::{AuthorityDiscoveryId, Balance, BlockNumber, Header, SessionIndex, UpwardMessage, ValidatorIndex}; +use parity_scale_codec::Decode; +use primitives::v1::{ + AuthorityDiscoveryId, Balance, BlockNumber, Header, SessionIndex, UpwardMessage, ValidatorIndex, +}; use sp_core::H256; use sp_io::TestExternalities; use sp_runtime::traits::{BlakeTwo256, IdentityLookup}; use std::{cell::RefCell, collections::HashMap}; -use parity_scale_codec::Decode; type UncheckedExtrinsic = frame_system::mocking::MockUncheckedExtrinsic; type Block = frame_system::mocking::MockBlock; @@ -244,7 +251,7 @@ pub fn take_processed() -> Vec<(ParaId, UpwardMessage)> { } /// An implementation of a UMP sink that just records which messages were processed. -/// +/// /// A message's weight is defined by the first 4 bytes of its data, which we decode into a /// `u32`. pub struct TestUmpSink; diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index 0f8af6614dfe..4168f7abb1ca 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -601,12 +601,12 @@ impl NeedsDispatchCursor { } #[cfg(test)] -pub (crate) mod tests { +pub(crate) mod tests { use super::*; - use crate::mock::{new_test_ext, Configuration, MockGenesisConfig, Ump, take_processed}; - use std::collections::HashSet; + use crate::mock::{new_test_ext, take_processed, Configuration, MockGenesisConfig, Ump}; use frame_support::weights::Weight; - + use std::collections::HashSet; + struct GenesisConfigBuilder { max_upward_message_size: u32, max_upward_message_num_per_candidate: u32, From deddbd27161b245e5ab59506c0d2bd1cc7fcc308 Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 6 Sep 2021 12:16:21 +0000 Subject: [PATCH 10/13] Remove now irrelevant comment. --- runtime/parachains/src/mock.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/runtime/parachains/src/mock.rs b/runtime/parachains/src/mock.rs index f554e1ed0f23..50f594f73480 100644 --- a/runtime/parachains/src/mock.rs +++ b/runtime/parachains/src/mock.rs @@ -236,7 +236,6 @@ pub fn availability_rewards() -> HashMap { } std::thread_local! { - // `Some` here indicates that there is an active probe. static PROCESSED: RefCell> = RefCell::new(vec![]); } From f23fbdb56aeb269c4abbd212693d579ca97896a7 Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 6 Sep 2021 12:16:42 +0000 Subject: [PATCH 11/13] Simplify `take_processed` by using `mem::take` --- runtime/parachains/src/mock.rs | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/runtime/parachains/src/mock.rs b/runtime/parachains/src/mock.rs index 50f594f73480..88e241426d62 100644 --- a/runtime/parachains/src/mock.rs +++ b/runtime/parachains/src/mock.rs @@ -241,12 +241,7 @@ std::thread_local! { /// Return which messages have been processed by `pocess_upward_message` and clear the buffer. pub fn take_processed() -> Vec<(ParaId, UpwardMessage)> { - PROCESSED.with(|opt_hook| { - let mut r = vec![]; - let mut processed = opt_hook.borrow_mut(); - std::mem::swap(processed.as_mut(), &mut r); - r - }) + PROCESSED.with(|opt_hook| std::mem::take(&mut *opt_hook.borrow_mut())) } /// An implementation of a UMP sink that just records which messages were processed. From d958f4d4f85079a6eefab7ec21b43f362682a2ce Mon Sep 17 00:00:00 2001 From: Sergei Shulepov Date: Mon, 6 Sep 2021 12:19:43 +0000 Subject: [PATCH 12/13] Clean up & fmt: use `upward_message` directly. --- runtime/parachains/src/ump.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index 4168f7abb1ca..2cd2a5b6f062 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -406,11 +406,7 @@ impl Pallet { // dequeue the next message from the queue of the dispatchee let maybe_next = queue_cache.peek_front::(dispatchee); let became_empty = if let Some(upward_message) = maybe_next { - match T::UmpSink::process_upward_message( - dispatchee, - &upward_message[..], - max_weight, - ) { + match T::UmpSink::process_upward_message(dispatchee, upward_message, max_weight) { Ok(used) => { weight_used += used; queue_cache.consume_front::(dispatchee) From 46930b6b8c1a55d796f454a86c9eced8223abd55 Mon Sep 17 00:00:00 2001 From: Gav Wood Date: Wed, 8 Sep 2021 09:39:36 -0500 Subject: [PATCH 13/13] Grumbles --- runtime/parachains/src/ump.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/runtime/parachains/src/ump.rs b/runtime/parachains/src/ump.rs index 2cd2a5b6f062..ae04d2d460df 100644 --- a/runtime/parachains/src/ump.rs +++ b/runtime/parachains/src/ump.rs @@ -403,7 +403,8 @@ impl Pallet { config.ump_service_total_weight - weight_used }; - // dequeue the next message from the queue of the dispatchee + // attempt to process the next message from the queue of the dispatchee; if not beyond + // our remaining weight limit, then consume it. let maybe_next = queue_cache.peek_front::(dispatchee); let became_empty = if let Some(upward_message) = maybe_next { match T::UmpSink::process_upward_message(dispatchee, upward_message, max_weight) { @@ -515,7 +516,7 @@ impl QueueCache { } else if entry.consumed_count > 0 { RelayDispatchQueues::::insert(¶, &entry.queue[entry.consumed_count..]); let count = (entry.queue.len() - entry.consumed_count) as u32; - let size = (entry.total_size as usize - entry.consumed_size) as u32; + let size = entry.total_size.saturating_sub(entry.consumed_size as u32); RelayDispatchQueueSize::::insert(¶, (count, size)); } }