From 107f97150ba30f67f5f41acfa155eb4e95c17f06 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Mon, 6 Mar 2023 17:56:57 -0800 Subject: [PATCH 01/22] Added participation and queue sizes metrics --- .../dispute-coordinator/src/initialized.rs | 11 ++--- node/core/dispute-coordinator/src/metrics.rs | 44 +++++++++++++++++++ .../src/participation/mod.rs | 26 +++++++---- .../src/participation/queues/mod.rs | 25 ++++++++--- 4 files changed, 88 insertions(+), 18 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 2e06e60d0787..0985a4b67506 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -164,7 +164,7 @@ impl Initialized { B: Backend, { for (priority, request) in participations.drain(..) { - self.participation.queue_participation(ctx, priority, request).await?; + self.participation.queue_participation(ctx, priority, request, &self.metrics).await?; } { @@ -190,7 +190,7 @@ impl Initialized { if let Some(first_leaf) = first_leaf.take() { // Also provide first leaf to participation for good measure. self.participation - .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf)) + .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf), &self.metrics) .await?; } @@ -208,7 +208,7 @@ impl Initialized { candidate_hash, candidate_receipt, outcome, - } = self.participation.get_participation_result(ctx, msg).await?; + } = self.participation.get_participation_result(ctx, msg, &self.metrics).await?; if let Some(valid) = outcome.validity() { gum::trace!( target: LOG_TARGET, @@ -276,10 +276,10 @@ impl Initialized { self.scraper.process_active_leaves_update(ctx.sender(), &update).await?; log_error( self.participation - .bump_to_priority_for_candidates(ctx, &scraped_updates.included_receipts) + .bump_to_priority_for_candidates(ctx, &scraped_updates.included_receipts, &self.metrics) .await, )?; - self.participation.process_active_leaves_update(ctx, &update).await?; + self.participation.process_active_leaves_update(ctx, &update, &self.metrics).await?; if let Some(new_leaf) = update.activated { match self @@ -913,6 +913,7 @@ impl Initialized { ctx, priority, ParticipationRequest::new(new_state.candidate_receipt().clone(), session), + &self.metrics, ) .await; log_error(r)?; diff --git a/node/core/dispute-coordinator/src/metrics.rs b/node/core/dispute-coordinator/src/metrics.rs index 70cd49ac49d1..5c1a48df7362 100644 --- a/node/core/dispute-coordinator/src/metrics.rs +++ b/node/core/dispute-coordinator/src/metrics.rs @@ -32,6 +32,12 @@ struct MetricsInner { vote_cleanup_time: prometheus::Histogram, /// Number of refrained participations. refrained_participations: prometheus::Counter, + /// Distribution of participation durations. + participation_durations: prometheus::Histogram, + /// Size of participation priority queue + priority_queue_size: prometheus::Gauge, + /// Size of participation best effort queue + best_effort_queue_size: prometheus::Gauge, } /// Candidate validation metrics. @@ -96,6 +102,25 @@ impl Metrics { metrics.refrained_participations.inc(); } } + + /// Provide a timer for participation durations which updates on drop. + pub(crate) fn time_participation(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.participation_durations.start_timer()) + } + + /// Set the priority_queue_size metric + pub fn report_priority_queue_size(&self, size: u64) { + if let Some(metrics) = &self.0 { + metrics.priority_queue_size.set(size); + } + } + + /// Set the best_effort_queue_size metric + pub fn report_best_effort_queue_size(&self, size: u64) { + if let Some(metrics) = &self.0 { + metrics.best_effort_queue_size.set(size); + } + } } impl metrics::Metrics for Metrics { @@ -163,6 +188,25 @@ impl metrics::Metrics for Metrics { ))?, registry, )?, + participation_durations: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_parachain_dispute_participation_durations", + "Time spent within fn Participation::participate", + ) + )?, + registry, + )?, + priority_queue_size: prometheus::register( + prometheus::Gauge::new("polkadot_parachain_dispute_priority_queue_size", + "Number of disputes waiting for local participation in the priority queue.")?, + registry, + )?, + best_effort_queue_size: prometheus::register( + prometheus::Gauge::new("polkadot_parachain_dispute_best_effort_queue_size", + "Number of disputes waiting for local participation in the best effort queue.")?, + registry, + )?, }; Ok(Metrics(Some(metrics))) } diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index f813b216b6ad..90e6a2bed9ed 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -46,6 +46,8 @@ mod queues; use queues::Queues; pub use queues::{ParticipationPriority, ParticipationRequest, QueueError}; +use crate::metrics::Metrics; + /// How many participation processes do we want to run in parallel the most. /// /// This should be a relatively low value, while we might have a speedup once we fetched the data, @@ -153,6 +155,7 @@ impl Participation { ctx: &mut Context, priority: ParticipationPriority, req: ParticipationRequest, + metrics: &Metrics, ) -> Result<()> { // Participation already running - we can ignore that request: if self.running_participations.contains(req.candidate_hash()) { @@ -161,12 +164,12 @@ impl Participation { // Available capacity - participate right away (if we already have a recent block): if let Some((_, h)) = self.recent_block { if self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS { - self.fork_participation(ctx, req, h)?; + self.fork_participation(ctx, req, h, metrics)?; return Ok(()) } } // Out of capacity/no recent block yet - queue: - self.queue.queue(ctx.sender(), priority, req).await + self.queue.queue(ctx.sender(), priority, req, metrics).await } /// Message from a worker task was received - get the outcome. @@ -182,11 +185,12 @@ impl Participation { &mut self, ctx: &mut Context, msg: WorkerMessage, + metrics: &Metrics, ) -> FatalResult { let WorkerMessage(statement) = msg; self.running_participations.remove(&statement.candidate_hash); let recent_block = self.recent_block.expect("We never ever reset recent_block to `None` and we already received a result, so it must have been set before. qed."); - self.dequeue_until_capacity(ctx, recent_block.1).await?; + self.dequeue_until_capacity(ctx, recent_block.1, metrics).await?; Ok(statement) } @@ -198,13 +202,14 @@ impl Participation { &mut self, ctx: &mut Context, update: &ActiveLeavesUpdate, + metrics: &Metrics, ) -> FatalResult<()> { if let Some(activated) = &update.activated { match self.recent_block { None => { self.recent_block = Some((activated.number, activated.hash)); // Work got potentially unblocked: - self.dequeue_until_capacity(ctx, activated.hash).await?; + self.dequeue_until_capacity(ctx, activated.hash, metrics).await?; }, Some((number, _)) if activated.number > number => { self.recent_block = Some((activated.number, activated.hash)); @@ -221,9 +226,10 @@ impl Participation { &mut self, ctx: &mut Context, included_receipts: &Vec, + metrics: &Metrics, ) -> Result<()> { for receipt in included_receipts { - self.queue.prioritize_if_present(ctx.sender(), receipt).await?; + self.queue.prioritize_if_present(ctx.sender(), receipt, metrics).await?; } Ok(()) } @@ -233,10 +239,11 @@ impl Participation { &mut self, ctx: &mut Context, recent_head: Hash, + metrics: &Metrics, ) -> FatalResult<()> { while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS { - if let Some(req) = self.queue.dequeue() { - self.fork_participation(ctx, req, recent_head)?; + if let Some(req) = self.queue.dequeue(metrics) { + self.fork_participation(ctx, req, recent_head, metrics)?; } else { break } @@ -250,12 +257,13 @@ impl Participation { ctx: &mut Context, req: ParticipationRequest, recent_head: Hash, + metrics: &Metrics, ) -> FatalResult<()> { if self.running_participations.insert(*req.candidate_hash()) { let sender = ctx.sender().clone(); ctx.spawn( "participation-worker", - participate(self.worker_sender.clone(), sender, recent_head, req).boxed(), + participate(self.worker_sender.clone(), sender, recent_head, req, metrics.clone()).boxed(), ) .map_err(FatalError::SpawnFailed)?; } @@ -268,7 +276,9 @@ async fn participate( mut sender: impl overseer::DisputeCoordinatorSenderTrait, block_hash: Hash, req: ParticipationRequest, + metrics: Metrics, ) { + let _measure_duration = metrics.time_participation(); #[cfg(test)] // Hack for tests, so we get recovery messages not too early. Delay::new(Duration::from_millis(100)).await; diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index b632e04dbb4f..e1c481b2c5f4 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -25,6 +25,8 @@ use crate::{ LOG_TARGET, }; +use crate::metrics::Metrics; + #[cfg(test)] mod tests; @@ -129,7 +131,7 @@ impl ParticipationRequest { impl Queues { /// Create new `Queues`. pub fn new() -> Self { - Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() } + Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), } } /// Will put message in queue, either priority or best effort depending on priority. @@ -143,20 +145,24 @@ impl Queues { sender: &mut impl overseer::DisputeCoordinatorSenderTrait, priority: ParticipationPriority, req: ParticipationRequest, + metrics: &Metrics, ) -> Result<()> { let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?; - self.queue_with_comparator(comparator, priority, req)?; + self.queue_with_comparator(comparator, priority, req, metrics)?; Ok(()) } /// Get the next best request for dispute participation if any. /// First the priority queue is considered and then the best effort one. - pub fn dequeue(&mut self) -> Option { + pub fn dequeue(&mut self, metrics: &Metrics) -> Option { if let Some(req) = self.pop_priority() { + metrics.report_priority_queue_size(self.priority.len() as u64); return Some(req.1) } - self.pop_best_effort().map(|d| d.1) + let request = self.pop_best_effort().map(|d| d.1); + metrics.report_best_effort_queue_size(self.best_effort.len() as u64); + request } /// Reprioritizes any participation requests pertaining to the @@ -165,21 +171,26 @@ impl Queues { &mut self, sender: &mut impl overseer::DisputeCoordinatorSenderTrait, receipt: &CandidateReceipt, + metrics: &Metrics, ) -> Result<()> { let comparator = CandidateComparator::new(sender, receipt).await?; - self.prioritize_with_comparator(comparator)?; + self.prioritize_with_comparator(comparator, metrics)?; Ok(()) } fn prioritize_with_comparator( &mut self, comparator: CandidateComparator, + metrics: &Metrics, ) -> std::result::Result<(), QueueError> { if self.priority.len() >= PRIORITY_QUEUE_SIZE { return Err(QueueError::PriorityFull) } if let Some(request) = self.best_effort.remove(&comparator) { self.priority.insert(comparator, request); + // Report changes to both queue sizes + metrics.report_priority_queue_size(self.priority.len() as u64); + metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } Ok(()) } @@ -189,6 +200,7 @@ impl Queues { comparator: CandidateComparator, priority: ParticipationPriority, req: ParticipationRequest, + metrics: &Metrics, ) -> std::result::Result<(), QueueError> { if priority.is_priority() { if self.priority.len() >= PRIORITY_QUEUE_SIZE { @@ -197,6 +209,8 @@ impl Queues { // Remove any best effort entry: self.best_effort.remove(&comparator); self.priority.insert(comparator, req); + metrics.report_priority_queue_size(self.priority.len() as u64); + metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } else { if self.priority.contains_key(&comparator) { // The candidate is already in priority queue - don't @@ -207,6 +221,7 @@ impl Queues { return Err(QueueError::BestEffortFull) } self.best_effort.insert(comparator, req); + metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } Ok(()) } From a8186fc7f26c229ecbf78ff3fb2f2768ce573834 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Mon, 6 Mar 2023 20:22:18 -0800 Subject: [PATCH 02/22] First draft of all metric code --- node/core/dispute-coordinator/src/metrics.rs | 18 +++++++++ .../src/participation/mod.rs | 16 +++++--- .../src/participation/queues/mod.rs | 38 ++++++++++++++----- 3 files changed, 58 insertions(+), 14 deletions(-) diff --git a/node/core/dispute-coordinator/src/metrics.rs b/node/core/dispute-coordinator/src/metrics.rs index 5c1a48df7362..d1dab6359a4d 100644 --- a/node/core/dispute-coordinator/src/metrics.rs +++ b/node/core/dispute-coordinator/src/metrics.rs @@ -34,6 +34,10 @@ struct MetricsInner { refrained_participations: prometheus::Counter, /// Distribution of participation durations. participation_durations: prometheus::Histogram, + /// Measures the duration of the full participation pipeline: From when + /// a participation request is first queued to when participation in the + /// requested dispute is complete. + participation_pipeline_durations: prometheus::Histogram, /// Size of participation priority queue priority_queue_size: prometheus::Gauge, /// Size of participation best effort queue @@ -108,6 +112,11 @@ impl Metrics { self.0.as_ref().map(|metrics| metrics.participation_durations.start_timer()) } + /// Provide a timer for participation pipeline durations which updates on drop. + pub(crate) fn time_participation_pipeline(&self) -> Option { + self.0.as_ref().map(|metrics| metrics.participation_pipeline_durations.start_timer()) + } + /// Set the priority_queue_size metric pub fn report_priority_queue_size(&self, size: u64) { if let Some(metrics) = &self.0 { @@ -197,6 +206,15 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + participation_pipeline_durations: prometheus::register( + prometheus::Histogram::with_opts( + prometheus::HistogramOpts::new( + "polkadot_parachain_dispute_participation_pipeline_durations", + "Measures the duration of the full participation pipeline: From when a participation request is first queued to when participation in the requested dispute is complete.", + ) + )?, + registry, + )?, priority_queue_size: prometheus::register( prometheus::Gauge::new("polkadot_parachain_dispute_priority_queue_size", "Number of disputes waiting for local participation in the priority queue.")?, diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index 90e6a2bed9ed..22cc43e9a8cf 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -47,6 +47,7 @@ use queues::Queues; pub use queues::{ParticipationPriority, ParticipationRequest, QueueError}; use crate::metrics::Metrics; +use polkadot_node_subsystem_util::metrics::prometheus::prometheus; /// How many participation processes do we want to run in parallel the most. /// @@ -157,6 +158,8 @@ impl Participation { req: ParticipationRequest, metrics: &Metrics, ) -> Result<()> { + let request_timer = metrics.time_participation_pipeline(); + // Participation already running - we can ignore that request: if self.running_participations.contains(req.candidate_hash()) { return Ok(()) @@ -164,12 +167,12 @@ impl Participation { // Available capacity - participate right away (if we already have a recent block): if let Some((_, h)) = self.recent_block { if self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS { - self.fork_participation(ctx, req, h, metrics)?; + self.fork_participation(ctx, req, h, request_timer, metrics)?; return Ok(()) } } // Out of capacity/no recent block yet - queue: - self.queue.queue(ctx.sender(), priority, req, metrics).await + self.queue.queue(ctx.sender(), priority, req, request_timer, metrics).await } /// Message from a worker task was received - get the outcome. @@ -242,8 +245,9 @@ impl Participation { metrics: &Metrics, ) -> FatalResult<()> { while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS { - if let Some(req) = self.queue.dequeue(metrics) { - self.fork_participation(ctx, req, recent_head, metrics)?; + let (maybe_req, maybe_timer) = self.queue.dequeue(metrics); + if let Some(req) = maybe_req { + self.fork_participation(ctx, req, recent_head, maybe_timer, metrics)?; } else { break } @@ -257,13 +261,14 @@ impl Participation { ctx: &mut Context, req: ParticipationRequest, recent_head: Hash, + request_timer: Option, metrics: &Metrics, ) -> FatalResult<()> { if self.running_participations.insert(*req.candidate_hash()) { let sender = ctx.sender().clone(); ctx.spawn( "participation-worker", - participate(self.worker_sender.clone(), sender, recent_head, req, metrics.clone()).boxed(), + participate(self.worker_sender.clone(), sender, recent_head, req, request_timer, metrics.clone()).boxed(), ) .map_err(FatalError::SpawnFailed)?; } @@ -276,6 +281,7 @@ async fn participate( mut sender: impl overseer::DisputeCoordinatorSenderTrait, block_hash: Hash, req: ParticipationRequest, + _request_timer: Option, // Stops timer and sends metric data when dropped metrics: Metrics, ) { let _measure_duration = metrics.time_participation(); diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index e1c481b2c5f4..0dee07621a30 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -26,6 +26,7 @@ use crate::{ }; use crate::metrics::Metrics; +use polkadot_node_subsystem_util::metrics::prometheus::prometheus; #[cfg(test)] mod tests; @@ -58,6 +59,10 @@ pub struct Queues { /// Priority queue. priority: BTreeMap, + + /// Timer handle for each participation request. Stored to measure full request + /// completion time. + request_timers: BTreeMap>, } /// A dispute participation request that can be queued. @@ -131,7 +136,7 @@ impl ParticipationRequest { impl Queues { /// Create new `Queues`. pub fn new() -> Self { - Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), } + Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), request_timers: BTreeMap::new(), } } /// Will put message in queue, either priority or best effort depending on priority. @@ -145,24 +150,34 @@ impl Queues { sender: &mut impl overseer::DisputeCoordinatorSenderTrait, priority: ParticipationPriority, req: ParticipationRequest, + timer: Option, metrics: &Metrics, ) -> Result<()> { let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?; - self.queue_with_comparator(comparator, priority, req, metrics)?; + self.queue_with_comparator(comparator, priority, req, timer, metrics)?; Ok(()) } /// Get the next best request for dispute participation if any. /// First the priority queue is considered and then the best effort one. - pub fn dequeue(&mut self, metrics: &Metrics) -> Option { - if let Some(req) = self.pop_priority() { + /// We also get the corresponding request timer, if any. + pub fn dequeue(&mut self, metrics: &Metrics) -> (Option, Option) { + if let Some((comp, req)) = self.pop_priority() { metrics.report_priority_queue_size(self.priority.len() as u64); - return Some(req.1) + if let Some(maybe_timer) = self.request_timers.remove(&comp) { + return (Some(req), maybe_timer); + } + return (Some(req), None); + } + if let Some((comp, req)) = self.pop_best_effort() { + metrics.report_best_effort_queue_size(self.best_effort.len() as u64); + if let Some(maybe_timer) = self.request_timers.remove(&comp) { + return (Some(req), maybe_timer); + } + return (Some(req), None); } - let request = self.pop_best_effort().map(|d| d.1); - metrics.report_best_effort_queue_size(self.best_effort.len() as u64); - request + (None, None) } /// Reprioritizes any participation requests pertaining to the @@ -200,6 +215,7 @@ impl Queues { comparator: CandidateComparator, priority: ParticipationPriority, req: ParticipationRequest, + timer: Option, metrics: &Metrics, ) -> std::result::Result<(), QueueError> { if priority.is_priority() { @@ -207,7 +223,10 @@ impl Queues { return Err(QueueError::PriorityFull) } // Remove any best effort entry: - self.best_effort.remove(&comparator); + if let None = self.best_effort.remove(&comparator) { + // Only insert new timer if request wasn't in either queue + self.request_timers.insert(comparator, timer); + } self.priority.insert(comparator, req); metrics.report_priority_queue_size(self.priority.len() as u64); metrics.report_best_effort_queue_size(self.best_effort.len() as u64); @@ -221,6 +240,7 @@ impl Queues { return Err(QueueError::BestEffortFull) } self.best_effort.insert(comparator, req); + self.request_timers.insert(comparator, timer); metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } Ok(()) From b6cb2c79d9e43437bfa4dc37ca6e59bf9662955d Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Tue, 7 Mar 2023 15:14:18 -0800 Subject: [PATCH 03/22] Tests pass --- .../dispute-coordinator/src/initialized.rs | 20 ++++-- node/core/dispute-coordinator/src/metrics.rs | 16 +++-- .../src/participation/mod.rs | 16 +++-- .../src/participation/queues/mod.rs | 22 ++++-- .../src/participation/queues/tests.rs | 70 +++++++++++++++---- .../src/participation/tests.rs | 51 +++++++++++--- 6 files changed, 152 insertions(+), 43 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 0985a4b67506..08d17bca7fa1 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -164,7 +164,9 @@ impl Initialized { B: Backend, { for (priority, request) in participations.drain(..) { - self.participation.queue_participation(ctx, priority, request, &self.metrics).await?; + self.participation + .queue_participation(ctx, priority, request, &self.metrics) + .await?; } { @@ -190,7 +192,11 @@ impl Initialized { if let Some(first_leaf) = first_leaf.take() { // Also provide first leaf to participation for good measure. self.participation - .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf), &self.metrics) + .process_active_leaves_update( + ctx, + &ActiveLeavesUpdate::start_work(first_leaf), + &self.metrics, + ) .await?; } @@ -276,10 +282,16 @@ impl Initialized { self.scraper.process_active_leaves_update(ctx.sender(), &update).await?; log_error( self.participation - .bump_to_priority_for_candidates(ctx, &scraped_updates.included_receipts, &self.metrics) + .bump_to_priority_for_candidates( + ctx, + &scraped_updates.included_receipts, + &self.metrics, + ) .await, )?; - self.participation.process_active_leaves_update(ctx, &update, &self.metrics).await?; + self.participation + .process_active_leaves_update(ctx, &update, &self.metrics) + .await?; if let Some(new_leaf) = update.activated { match self diff --git a/node/core/dispute-coordinator/src/metrics.rs b/node/core/dispute-coordinator/src/metrics.rs index d1dab6359a4d..1587e9120c8e 100644 --- a/node/core/dispute-coordinator/src/metrics.rs +++ b/node/core/dispute-coordinator/src/metrics.rs @@ -34,8 +34,8 @@ struct MetricsInner { refrained_participations: prometheus::Counter, /// Distribution of participation durations. participation_durations: prometheus::Histogram, - /// Measures the duration of the full participation pipeline: From when - /// a participation request is first queued to when participation in the + /// Measures the duration of the full participation pipeline: From when + /// a participation request is first queued to when participation in the /// requested dispute is complete. participation_pipeline_durations: prometheus::Histogram, /// Size of participation priority queue @@ -108,13 +108,19 @@ impl Metrics { } /// Provide a timer for participation durations which updates on drop. - pub(crate) fn time_participation(&self) -> Option { + pub(crate) fn time_participation( + &self, + ) -> Option { self.0.as_ref().map(|metrics| metrics.participation_durations.start_timer()) } /// Provide a timer for participation pipeline durations which updates on drop. - pub(crate) fn time_participation_pipeline(&self) -> Option { - self.0.as_ref().map(|metrics| metrics.participation_pipeline_durations.start_timer()) + pub(crate) fn time_participation_pipeline( + &self, + ) -> Option { + self.0 + .as_ref() + .map(|metrics| metrics.participation_pipeline_durations.start_timer()) } /// Set the priority_queue_size metric diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index 22cc43e9a8cf..c11f9cc4b01d 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -264,11 +264,20 @@ impl Participation { request_timer: Option, metrics: &Metrics, ) -> FatalResult<()> { + let participation_timer = metrics.time_participation(); if self.running_participations.insert(*req.candidate_hash()) { let sender = ctx.sender().clone(); ctx.spawn( "participation-worker", - participate(self.worker_sender.clone(), sender, recent_head, req, request_timer, metrics.clone()).boxed(), + participate( + self.worker_sender.clone(), + sender, + recent_head, + req, + request_timer, + participation_timer, + ) + .boxed(), ) .map_err(FatalError::SpawnFailed)?; } @@ -281,10 +290,9 @@ async fn participate( mut sender: impl overseer::DisputeCoordinatorSenderTrait, block_hash: Hash, req: ParticipationRequest, - _request_timer: Option, // Stops timer and sends metric data when dropped - metrics: Metrics, + _request_timer: Option, // Sends metric data when dropped + _participation_timer: Option, // Sends metric data when dropped ) { - let _measure_duration = metrics.time_participation(); #[cfg(test)] // Hack for tests, so we get recovery messages not too early. Delay::new(Duration::from_millis(100)).await; diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 0dee07621a30..1b50d770db97 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -61,7 +61,8 @@ pub struct Queues { priority: BTreeMap, /// Timer handle for each participation request. Stored to measure full request - /// completion time. + /// completion time. Optimally these would have been stored in the participation + /// request itself, but HistogramTimer doesn't implement the Clone trait. request_timers: BTreeMap>, } @@ -136,7 +137,11 @@ impl ParticipationRequest { impl Queues { /// Create new `Queues`. pub fn new() -> Self { - Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), request_timers: BTreeMap::new(), } + Self { + best_effort: BTreeMap::new(), + priority: BTreeMap::new(), + request_timers: BTreeMap::new(), + } } /// Will put message in queue, either priority or best effort depending on priority. @@ -162,20 +167,23 @@ impl Queues { /// Get the next best request for dispute participation if any. /// First the priority queue is considered and then the best effort one. /// We also get the corresponding request timer, if any. - pub fn dequeue(&mut self, metrics: &Metrics) -> (Option, Option) { + pub fn dequeue( + &mut self, + metrics: &Metrics, + ) -> (Option, Option) { if let Some((comp, req)) = self.pop_priority() { metrics.report_priority_queue_size(self.priority.len() as u64); if let Some(maybe_timer) = self.request_timers.remove(&comp) { - return (Some(req), maybe_timer); + return (Some(req), maybe_timer) } - return (Some(req), None); + return (Some(req), None) } if let Some((comp, req)) = self.pop_best_effort() { metrics.report_best_effort_queue_size(self.best_effort.len() as u64); if let Some(maybe_timer) = self.request_timers.remove(&comp) { - return (Some(req), maybe_timer); + return (Some(req), maybe_timer) } - return (Some(req), None); + return (Some(req), None) } (None, None) } diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index 164e7b3f011b..de91dd488c22 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use crate::ParticipationPriority; +use crate::{metrics::Metrics, ParticipationPriority}; use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use assert_matches::assert_matches; use polkadot_primitives::{BlockNumber, Hash}; @@ -52,11 +52,14 @@ fn ordering_works_as_expected() { let req5_unknown_parent = make_participation_request(Hash::repeat_byte(0x05)); let req_full = make_participation_request(Hash::repeat_byte(0x06)); let req_prio_full = make_participation_request(Hash::repeat_byte(0x07)); + let metrics = Metrics::default(); queue .queue_with_comparator( make_dummy_comparator(&req1, Some(1)), ParticipationPriority::BestEffort, req1.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); queue @@ -64,6 +67,8 @@ fn ordering_works_as_expected() { make_dummy_comparator(&req_prio, Some(1)), ParticipationPriority::Priority, req_prio.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); queue @@ -71,6 +76,8 @@ fn ordering_works_as_expected() { make_dummy_comparator(&req3, Some(2)), ParticipationPriority::BestEffort, req3.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); queue @@ -78,6 +85,8 @@ fn ordering_works_as_expected() { make_dummy_comparator(&req_prio_2, Some(2)), ParticipationPriority::Priority, req_prio_2.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); queue @@ -85,13 +94,17 @@ fn ordering_works_as_expected() { make_dummy_comparator(&req5_unknown_parent, None), ParticipationPriority::BestEffort, req5_unknown_parent.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); assert_matches!( queue.queue_with_comparator( make_dummy_comparator(&req_prio_full, Some(3)), ParticipationPriority::Priority, - req_prio_full + req_prio_full, + metrics.time_participation_pipeline(), + &metrics, ), Err(QueueError::PriorityFull) ); @@ -99,20 +112,26 @@ fn ordering_works_as_expected() { queue.queue_with_comparator( make_dummy_comparator(&req_full, Some(3)), ParticipationPriority::BestEffort, - req_full + req_full, + metrics.time_participation_pipeline(), + &metrics, ), Err(QueueError::BestEffortFull) ); + // Timers have been stored for each request + assert_eq!(queue.request_timers.len(), 5); // Prioritized queue is ordered correctly - assert_eq!(queue.dequeue(), Some(req_prio)); - assert_eq!(queue.dequeue(), Some(req_prio_2)); + assert_eq!(queue.dequeue(&metrics).0, Some(req_prio)); + assert_eq!(queue.dequeue(&metrics).0, Some(req_prio_2)); // So is the best-effort - assert_eq!(queue.dequeue(), Some(req1)); - assert_eq!(queue.dequeue(), Some(req3)); - assert_eq!(queue.dequeue(), Some(req5_unknown_parent)); + assert_eq!(queue.dequeue(&metrics).0, Some(req1)); + assert_eq!(queue.dequeue(&metrics).0, Some(req3)); + assert_eq!(queue.dequeue(&metrics).0, Some(req5_unknown_parent)); - assert_matches!(queue.dequeue(), None); + assert_matches!(queue.dequeue(&metrics).0, None); + // Timers have been removed from storage along with requests + assert_eq!(queue.request_timers.len(), 0); } /// No matter how often a candidate gets queued, it should only ever get dequeued once. @@ -123,12 +142,15 @@ fn candidate_is_only_dequeued_once() { let req_prio = make_participation_request(Hash::repeat_byte(0x02)); let req_best_effort_then_prio = make_participation_request(Hash::repeat_byte(0x03)); let req_prio_then_best_effort = make_participation_request(Hash::repeat_byte(0x04)); + let metrics = Metrics::default(); queue .queue_with_comparator( make_dummy_comparator(&req1, None), ParticipationPriority::BestEffort, req1.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); queue @@ -136,6 +158,8 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_prio, Some(1)), ParticipationPriority::Priority, req_prio.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); // Insert same best effort again: @@ -144,6 +168,8 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req1, None), ParticipationPriority::BestEffort, req1.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); // insert same prio again: @@ -152,6 +178,8 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_prio, Some(1)), ParticipationPriority::Priority, req_prio.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); @@ -161,6 +189,8 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_best_effort_then_prio, Some(2)), ParticipationPriority::BestEffort, req_best_effort_then_prio.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); // Then as prio: @@ -169,11 +199,13 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_best_effort_then_prio, Some(2)), ParticipationPriority::Priority, req_best_effort_then_prio.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); // Make space in prio: - assert_eq!(queue.dequeue(), Some(req_prio)); + assert_eq!(queue.dequeue(&metrics).0, Some(req_prio)); // Insert first as prio: queue @@ -181,6 +213,8 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_prio_then_best_effort, Some(3)), ParticipationPriority::Priority, req_prio_then_best_effort.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); // Then as best effort: @@ -189,11 +223,19 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_prio_then_best_effort, Some(3)), ParticipationPriority::BestEffort, req_prio_then_best_effort.clone(), + metrics.time_participation_pipeline(), + &metrics, ) .unwrap(); - assert_eq!(queue.dequeue(), Some(req_best_effort_then_prio)); - assert_eq!(queue.dequeue(), Some(req_prio_then_best_effort)); - assert_eq!(queue.dequeue(), Some(req1)); - assert_eq!(queue.dequeue(), None); + // Timers have been stored for each request + assert_eq!(queue.request_timers.len(), 3); + + assert_eq!(queue.dequeue(&metrics).0, Some(req_best_effort_then_prio)); + assert_eq!(queue.dequeue(&metrics).0, Some(req_prio_then_best_effort)); + assert_eq!(queue.dequeue(&metrics).0, Some(req1)); + assert_eq!(queue.dequeue(&metrics).0, None); + + // Timers have been removed from storage along with requests + assert_eq!(queue.request_timers.len(), 0); } diff --git a/node/core/dispute-coordinator/src/participation/tests.rs b/node/core/dispute-coordinator/src/participation/tests.rs index 273c27261081..6b1fed4d9d86 100644 --- a/node/core/dispute-coordinator/src/participation/tests.rs +++ b/node/core/dispute-coordinator/src/participation/tests.rs @@ -75,7 +75,7 @@ async fn participate_with_commitments_hash( let req = ParticipationRequest::new(candidate_receipt, session); participation - .queue_participation(ctx, ParticipationPriority::BestEffort, req) + .queue_participation(ctx, ParticipationPriority::BestEffort, req, &Metrics::default()) .await } @@ -104,6 +104,7 @@ async fn activate_leaf( number: block_number, status: LeafStatus::Fresh, }), + &Metrics::default(), ) .await } @@ -207,7 +208,11 @@ fn same_req_wont_get_queued_if_participation_is_already_running() { ); let result = participation - .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) + .get_participation_result( + &mut ctx, + worker_receiver.next().await.unwrap(), + &Metrics::default(), + ) .await .unwrap(); @@ -243,7 +248,11 @@ fn reqs_get_queued_when_out_of_capacity() { for _ in 0..MAX_PARALLEL_PARTICIPATIONS + 1 { let result = participation - .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) + .get_participation_result( + &mut ctx, + worker_receiver.next().await.unwrap(), + &Metrics::default(), + ) .await .unwrap(); assert_matches!( @@ -356,7 +365,11 @@ fn cannot_participate_if_cannot_recover_available_data() { "overseer did not receive recover available data message", ); let result = participation - .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) + .get_participation_result( + &mut ctx, + worker_receiver.next().await.unwrap(), + &Metrics::default(), + ) .await .unwrap(); assert_matches!( @@ -393,7 +406,11 @@ fn cannot_participate_if_cannot_recover_validation_code() { ); let result = participation - .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) + .get_participation_result( + &mut ctx, + worker_receiver.next().await.unwrap(), + &Metrics::default(), + ) .await .unwrap(); assert_matches!( @@ -424,7 +441,11 @@ fn cast_invalid_vote_if_available_data_is_invalid() { ); let result = participation - .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) + .get_participation_result( + &mut ctx, + worker_receiver.next().await.unwrap(), + &Metrics::default(), + ) .await .unwrap(); assert_matches!( @@ -461,7 +482,11 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() { ); let result = participation - .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) + .get_participation_result( + &mut ctx, + worker_receiver.next().await.unwrap(), + &Metrics::default(), + ) .await .unwrap(); assert_matches!( @@ -498,7 +523,11 @@ fn cast_invalid_vote_if_commitments_dont_match() { ); let result = participation - .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) + .get_participation_result( + &mut ctx, + worker_receiver.next().await.unwrap(), + &Metrics::default(), + ) .await .unwrap(); assert_matches!( @@ -535,7 +564,11 @@ fn cast_valid_vote_if_validation_passes() { ); let result = participation - .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) + .get_participation_result( + &mut ctx, + worker_receiver.next().await.unwrap(), + &Metrics::default(), + ) .await .unwrap(); assert_matches!( From bc7049e5e1de3b0732096f3851016e0ded2fa33e Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Wed, 8 Mar 2023 11:43:19 -0800 Subject: [PATCH 04/22] Changed Metrics to field on participation + queues --- .../dispute-coordinator/src/initialized.rs | 11 ++--- .../src/participation/mod.rs | 31 ++++++------- .../src/participation/queues/mod.rs | 29 ++++++------ .../src/participation/queues/tests.rs | 45 +++++++------------ .../src/participation/tests.rs | 29 +++++------- 5 files changed, 57 insertions(+), 88 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 74c0ff378a1a..d98e481ec66d 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -96,7 +96,7 @@ impl Initialized { let DisputeCoordinatorSubsystem { config: _, store: _, keystore, metrics } = subsystem; let (participation_sender, participation_receiver) = mpsc::channel(1); - let participation = Participation::new(participation_sender); + let participation = Participation::new(participation_sender, metrics.clone()); let highest_session = rolling_session_window.latest_session(); Self { @@ -165,7 +165,7 @@ impl Initialized { { for (priority, request) in participations.drain(..) { self.participation - .queue_participation(ctx, priority, request, &self.metrics) + .queue_participation(ctx, priority, request) .await?; } @@ -195,7 +195,6 @@ impl Initialized { .process_active_leaves_update( ctx, &ActiveLeavesUpdate::start_work(first_leaf), - &self.metrics, ) .await?; } @@ -214,7 +213,7 @@ impl Initialized { candidate_hash, candidate_receipt, outcome, - } = self.participation.get_participation_result(ctx, msg, &self.metrics).await?; + } = self.participation.get_participation_result(ctx, msg).await?; if let Some(valid) = outcome.validity() { gum::trace!( target: LOG_TARGET, @@ -286,12 +285,11 @@ impl Initialized { .bump_to_priority_for_candidates( ctx, &scraped_updates.included_receipts, - &self.metrics, ) .await, )?; self.participation - .process_active_leaves_update(ctx, &update, &self.metrics) + .process_active_leaves_update(ctx, &update) .await?; if let Some(new_leaf) = update.activated { @@ -934,7 +932,6 @@ impl Initialized { ctx, priority, ParticipationRequest::new(new_state.candidate_receipt().clone(), session), - &self.metrics, ) .await; log_error(r)?; diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index c11f9cc4b01d..d2586de8db3a 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -72,6 +72,8 @@ pub struct Participation { worker_sender: WorkerMessageSender, /// Some recent block for retrieving validation code from chain. recent_block: Option<(BlockNumber, Hash)>, + /// Metrics handle cloned from Initialized + metrics: Metrics, } /// Message from worker tasks. @@ -136,12 +138,13 @@ impl Participation { /// The passed in sender will be used by background workers to communicate back their results. /// The calling context should make sure to call `Participation::on_worker_message()` for the /// received messages. - pub fn new(sender: WorkerMessageSender) -> Self { + pub fn new(sender: WorkerMessageSender, metrics: Metrics) -> Self { Self { running_participations: HashSet::new(), - queue: Queues::new(), + queue: Queues::new(metrics.clone()), worker_sender: sender, recent_block: None, + metrics, } } @@ -156,9 +159,8 @@ impl Participation { ctx: &mut Context, priority: ParticipationPriority, req: ParticipationRequest, - metrics: &Metrics, ) -> Result<()> { - let request_timer = metrics.time_participation_pipeline(); + let request_timer = self.metrics.time_participation_pipeline(); // Participation already running - we can ignore that request: if self.running_participations.contains(req.candidate_hash()) { @@ -167,12 +169,12 @@ impl Participation { // Available capacity - participate right away (if we already have a recent block): if let Some((_, h)) = self.recent_block { if self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS { - self.fork_participation(ctx, req, h, request_timer, metrics)?; + self.fork_participation(ctx, req, h, request_timer)?; return Ok(()) } } // Out of capacity/no recent block yet - queue: - self.queue.queue(ctx.sender(), priority, req, request_timer, metrics).await + self.queue.queue(ctx.sender(), priority, req, request_timer).await } /// Message from a worker task was received - get the outcome. @@ -188,12 +190,11 @@ impl Participation { &mut self, ctx: &mut Context, msg: WorkerMessage, - metrics: &Metrics, ) -> FatalResult { let WorkerMessage(statement) = msg; self.running_participations.remove(&statement.candidate_hash); let recent_block = self.recent_block.expect("We never ever reset recent_block to `None` and we already received a result, so it must have been set before. qed."); - self.dequeue_until_capacity(ctx, recent_block.1, metrics).await?; + self.dequeue_until_capacity(ctx, recent_block.1).await?; Ok(statement) } @@ -205,14 +206,13 @@ impl Participation { &mut self, ctx: &mut Context, update: &ActiveLeavesUpdate, - metrics: &Metrics, ) -> FatalResult<()> { if let Some(activated) = &update.activated { match self.recent_block { None => { self.recent_block = Some((activated.number, activated.hash)); // Work got potentially unblocked: - self.dequeue_until_capacity(ctx, activated.hash, metrics).await?; + self.dequeue_until_capacity(ctx, activated.hash).await?; }, Some((number, _)) if activated.number > number => { self.recent_block = Some((activated.number, activated.hash)); @@ -229,10 +229,9 @@ impl Participation { &mut self, ctx: &mut Context, included_receipts: &Vec, - metrics: &Metrics, ) -> Result<()> { for receipt in included_receipts { - self.queue.prioritize_if_present(ctx.sender(), receipt, metrics).await?; + self.queue.prioritize_if_present(ctx.sender(), receipt).await?; } Ok(()) } @@ -242,12 +241,11 @@ impl Participation { &mut self, ctx: &mut Context, recent_head: Hash, - metrics: &Metrics, ) -> FatalResult<()> { while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS { - let (maybe_req, maybe_timer) = self.queue.dequeue(metrics); + let (maybe_req, maybe_timer) = self.queue.dequeue(); if let Some(req) = maybe_req { - self.fork_participation(ctx, req, recent_head, maybe_timer, metrics)?; + self.fork_participation(ctx, req, recent_head, maybe_timer)?; } else { break } @@ -262,9 +260,8 @@ impl Participation { req: ParticipationRequest, recent_head: Hash, request_timer: Option, - metrics: &Metrics, ) -> FatalResult<()> { - let participation_timer = metrics.time_participation(); + let participation_timer = self.metrics.time_participation(); if self.running_participations.insert(*req.candidate_hash()) { let sender = ctx.sender().clone(); ctx.spawn( diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 1b50d770db97..0c3011101a8a 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -64,6 +64,9 @@ pub struct Queues { /// completion time. Optimally these would have been stored in the participation /// request itself, but HistogramTimer doesn't implement the Clone trait. request_timers: BTreeMap>, + + /// Handle for recording queues data in metrics + metrics: Metrics, } /// A dispute participation request that can be queued. @@ -136,11 +139,12 @@ impl ParticipationRequest { impl Queues { /// Create new `Queues`. - pub fn new() -> Self { + pub fn new(metrics: Metrics) -> Self { Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), request_timers: BTreeMap::new(), + metrics, } } @@ -156,11 +160,10 @@ impl Queues { priority: ParticipationPriority, req: ParticipationRequest, timer: Option, - metrics: &Metrics, ) -> Result<()> { let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?; - self.queue_with_comparator(comparator, priority, req, timer, metrics)?; + self.queue_with_comparator(comparator, priority, req, timer)?; Ok(()) } @@ -169,17 +172,16 @@ impl Queues { /// We also get the corresponding request timer, if any. pub fn dequeue( &mut self, - metrics: &Metrics, ) -> (Option, Option) { if let Some((comp, req)) = self.pop_priority() { - metrics.report_priority_queue_size(self.priority.len() as u64); + self.metrics.report_priority_queue_size(self.priority.len() as u64); if let Some(maybe_timer) = self.request_timers.remove(&comp) { return (Some(req), maybe_timer) } return (Some(req), None) } if let Some((comp, req)) = self.pop_best_effort() { - metrics.report_best_effort_queue_size(self.best_effort.len() as u64); + self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); if let Some(maybe_timer) = self.request_timers.remove(&comp) { return (Some(req), maybe_timer) } @@ -194,17 +196,15 @@ impl Queues { &mut self, sender: &mut impl overseer::DisputeCoordinatorSenderTrait, receipt: &CandidateReceipt, - metrics: &Metrics, ) -> Result<()> { let comparator = CandidateComparator::new(sender, receipt).await?; - self.prioritize_with_comparator(comparator, metrics)?; + self.prioritize_with_comparator(comparator)?; Ok(()) } fn prioritize_with_comparator( &mut self, comparator: CandidateComparator, - metrics: &Metrics, ) -> std::result::Result<(), QueueError> { if self.priority.len() >= PRIORITY_QUEUE_SIZE { return Err(QueueError::PriorityFull) @@ -212,8 +212,8 @@ impl Queues { if let Some(request) = self.best_effort.remove(&comparator) { self.priority.insert(comparator, request); // Report changes to both queue sizes - metrics.report_priority_queue_size(self.priority.len() as u64); - metrics.report_best_effort_queue_size(self.best_effort.len() as u64); + self.metrics.report_priority_queue_size(self.priority.len() as u64); + self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } Ok(()) } @@ -224,7 +224,6 @@ impl Queues { priority: ParticipationPriority, req: ParticipationRequest, timer: Option, - metrics: &Metrics, ) -> std::result::Result<(), QueueError> { if priority.is_priority() { if self.priority.len() >= PRIORITY_QUEUE_SIZE { @@ -236,8 +235,8 @@ impl Queues { self.request_timers.insert(comparator, timer); } self.priority.insert(comparator, req); - metrics.report_priority_queue_size(self.priority.len() as u64); - metrics.report_best_effort_queue_size(self.best_effort.len() as u64); + self.metrics.report_priority_queue_size(self.priority.len() as u64); + self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } else { if self.priority.contains_key(&comparator) { // The candidate is already in priority queue - don't @@ -249,7 +248,7 @@ impl Queues { } self.best_effort.insert(comparator, req); self.request_timers.insert(comparator, timer); - metrics.report_best_effort_queue_size(self.best_effort.len() as u64); + self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } Ok(()) } diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index de91dd488c22..527e6d4e4ebf 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -44,7 +44,8 @@ fn make_dummy_comparator( /// block number should be treated with lowest priority. #[test] fn ordering_works_as_expected() { - let mut queue = Queues::new(); + let metrics = Metrics::default(); + let mut queue = Queues::new(metrics.clone()); let req1 = make_participation_request(Hash::repeat_byte(0x01)); let req_prio = make_participation_request(Hash::repeat_byte(0x02)); let req3 = make_participation_request(Hash::repeat_byte(0x03)); @@ -52,14 +53,12 @@ fn ordering_works_as_expected() { let req5_unknown_parent = make_participation_request(Hash::repeat_byte(0x05)); let req_full = make_participation_request(Hash::repeat_byte(0x06)); let req_prio_full = make_participation_request(Hash::repeat_byte(0x07)); - let metrics = Metrics::default(); queue .queue_with_comparator( make_dummy_comparator(&req1, Some(1)), ParticipationPriority::BestEffort, req1.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); queue @@ -68,7 +67,6 @@ fn ordering_works_as_expected() { ParticipationPriority::Priority, req_prio.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); queue @@ -77,7 +75,6 @@ fn ordering_works_as_expected() { ParticipationPriority::BestEffort, req3.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); queue @@ -86,7 +83,6 @@ fn ordering_works_as_expected() { ParticipationPriority::Priority, req_prio_2.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); queue @@ -95,7 +91,6 @@ fn ordering_works_as_expected() { ParticipationPriority::BestEffort, req5_unknown_parent.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); assert_matches!( @@ -104,7 +99,6 @@ fn ordering_works_as_expected() { ParticipationPriority::Priority, req_prio_full, metrics.time_participation_pipeline(), - &metrics, ), Err(QueueError::PriorityFull) ); @@ -114,7 +108,6 @@ fn ordering_works_as_expected() { ParticipationPriority::BestEffort, req_full, metrics.time_participation_pipeline(), - &metrics, ), Err(QueueError::BestEffortFull) ); @@ -122,14 +115,14 @@ fn ordering_works_as_expected() { // Timers have been stored for each request assert_eq!(queue.request_timers.len(), 5); // Prioritized queue is ordered correctly - assert_eq!(queue.dequeue(&metrics).0, Some(req_prio)); - assert_eq!(queue.dequeue(&metrics).0, Some(req_prio_2)); + assert_eq!(queue.dequeue().0, Some(req_prio)); + assert_eq!(queue.dequeue().0, Some(req_prio_2)); // So is the best-effort - assert_eq!(queue.dequeue(&metrics).0, Some(req1)); - assert_eq!(queue.dequeue(&metrics).0, Some(req3)); - assert_eq!(queue.dequeue(&metrics).0, Some(req5_unknown_parent)); + assert_eq!(queue.dequeue().0, Some(req1)); + assert_eq!(queue.dequeue().0, Some(req3)); + assert_eq!(queue.dequeue().0, Some(req5_unknown_parent)); - assert_matches!(queue.dequeue(&metrics).0, None); + assert_matches!(queue.dequeue().0, None); // Timers have been removed from storage along with requests assert_eq!(queue.request_timers.len(), 0); } @@ -137,12 +130,12 @@ fn ordering_works_as_expected() { /// No matter how often a candidate gets queued, it should only ever get dequeued once. #[test] fn candidate_is_only_dequeued_once() { - let mut queue = Queues::new(); + let metrics = Metrics::default(); + let mut queue = Queues::new(metrics.clone()); let req1 = make_participation_request(Hash::repeat_byte(0x01)); let req_prio = make_participation_request(Hash::repeat_byte(0x02)); let req_best_effort_then_prio = make_participation_request(Hash::repeat_byte(0x03)); let req_prio_then_best_effort = make_participation_request(Hash::repeat_byte(0x04)); - let metrics = Metrics::default(); queue .queue_with_comparator( @@ -150,7 +143,6 @@ fn candidate_is_only_dequeued_once() { ParticipationPriority::BestEffort, req1.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); queue @@ -159,7 +151,6 @@ fn candidate_is_only_dequeued_once() { ParticipationPriority::Priority, req_prio.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); // Insert same best effort again: @@ -169,7 +160,6 @@ fn candidate_is_only_dequeued_once() { ParticipationPriority::BestEffort, req1.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); // insert same prio again: @@ -179,7 +169,6 @@ fn candidate_is_only_dequeued_once() { ParticipationPriority::Priority, req_prio.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); @@ -190,7 +179,6 @@ fn candidate_is_only_dequeued_once() { ParticipationPriority::BestEffort, req_best_effort_then_prio.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); // Then as prio: @@ -200,12 +188,11 @@ fn candidate_is_only_dequeued_once() { ParticipationPriority::Priority, req_best_effort_then_prio.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); // Make space in prio: - assert_eq!(queue.dequeue(&metrics).0, Some(req_prio)); + assert_eq!(queue.dequeue().0, Some(req_prio)); // Insert first as prio: queue @@ -214,7 +201,6 @@ fn candidate_is_only_dequeued_once() { ParticipationPriority::Priority, req_prio_then_best_effort.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); // Then as best effort: @@ -224,17 +210,16 @@ fn candidate_is_only_dequeued_once() { ParticipationPriority::BestEffort, req_prio_then_best_effort.clone(), metrics.time_participation_pipeline(), - &metrics, ) .unwrap(); // Timers have been stored for each request assert_eq!(queue.request_timers.len(), 3); - assert_eq!(queue.dequeue(&metrics).0, Some(req_best_effort_then_prio)); - assert_eq!(queue.dequeue(&metrics).0, Some(req_prio_then_best_effort)); - assert_eq!(queue.dequeue(&metrics).0, Some(req1)); - assert_eq!(queue.dequeue(&metrics).0, None); + assert_eq!(queue.dequeue().0, Some(req_best_effort_then_prio)); + assert_eq!(queue.dequeue().0, Some(req_prio_then_best_effort)); + assert_eq!(queue.dequeue().0, Some(req1)); + assert_eq!(queue.dequeue().0, None); // Timers have been removed from storage along with requests assert_eq!(queue.request_timers.len(), 0); diff --git a/node/core/dispute-coordinator/src/participation/tests.rs b/node/core/dispute-coordinator/src/participation/tests.rs index 6b1fed4d9d86..313ca30b9318 100644 --- a/node/core/dispute-coordinator/src/participation/tests.rs +++ b/node/core/dispute-coordinator/src/participation/tests.rs @@ -75,7 +75,7 @@ async fn participate_with_commitments_hash( let req = ParticipationRequest::new(candidate_receipt, session); participation - .queue_participation(ctx, ParticipationPriority::BestEffort, req, &Metrics::default()) + .queue_participation(ctx, ParticipationPriority::BestEffort, req) .await } @@ -104,7 +104,6 @@ async fn activate_leaf( number: block_number, status: LeafStatus::Fresh, }), - &Metrics::default(), ) .await } @@ -190,7 +189,7 @@ fn same_req_wont_get_queued_if_participation_is_already_running() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); for _ in 0..MAX_PARALLEL_PARTICIPATIONS { @@ -211,7 +210,6 @@ fn same_req_wont_get_queued_if_participation_is_already_running() { .get_participation_result( &mut ctx, worker_receiver.next().await.unwrap(), - &Metrics::default(), ) .await .unwrap(); @@ -233,7 +231,7 @@ fn reqs_get_queued_when_out_of_capacity() { let test = async { let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); for i in 0..MAX_PARALLEL_PARTICIPATIONS { @@ -251,7 +249,6 @@ fn reqs_get_queued_when_out_of_capacity() { .get_participation_result( &mut ctx, worker_receiver.next().await.unwrap(), - &Metrics::default(), ) .await .unwrap(); @@ -301,7 +298,7 @@ fn reqs_get_queued_on_no_recent_block() { let (mut unblock_test, mut wait_for_verification) = mpsc::channel(0); let test = async { let (sender, _worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); participate(&mut ctx, &mut participation).await.unwrap(); // We have initiated participation but we'll block `active_leaf` so that we can check that @@ -351,7 +348,7 @@ fn cannot_participate_if_cannot_recover_available_data() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); @@ -368,7 +365,6 @@ fn cannot_participate_if_cannot_recover_available_data() { .get_participation_result( &mut ctx, worker_receiver.next().await.unwrap(), - &Metrics::default(), ) .await .unwrap(); @@ -385,7 +381,7 @@ fn cannot_participate_if_cannot_recover_validation_code() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); @@ -409,7 +405,6 @@ fn cannot_participate_if_cannot_recover_validation_code() { .get_participation_result( &mut ctx, worker_receiver.next().await.unwrap(), - &Metrics::default(), ) .await .unwrap(); @@ -426,7 +421,7 @@ fn cast_invalid_vote_if_available_data_is_invalid() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); @@ -444,7 +439,6 @@ fn cast_invalid_vote_if_available_data_is_invalid() { .get_participation_result( &mut ctx, worker_receiver.next().await.unwrap(), - &Metrics::default(), ) .await .unwrap(); @@ -461,7 +455,7 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); @@ -485,7 +479,6 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() { .get_participation_result( &mut ctx, worker_receiver.next().await.unwrap(), - &Metrics::default(), ) .await .unwrap(); @@ -502,7 +495,7 @@ fn cast_invalid_vote_if_commitments_dont_match() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); @@ -526,7 +519,6 @@ fn cast_invalid_vote_if_commitments_dont_match() { .get_participation_result( &mut ctx, worker_receiver.next().await.unwrap(), - &Metrics::default(), ) .await .unwrap(); @@ -543,7 +535,7 @@ fn cast_valid_vote_if_validation_passes() { let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); let (sender, mut worker_receiver) = mpsc::channel(1); - let mut participation = Participation::new(sender); + let mut participation = Participation::new(sender, Metrics::default()); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); participate(&mut ctx, &mut participation).await.unwrap(); @@ -567,7 +559,6 @@ fn cast_valid_vote_if_validation_passes() { .get_participation_result( &mut ctx, worker_receiver.next().await.unwrap(), - &Metrics::default(), ) .await .unwrap(); From 7b0bfc0af553d7eefab0b7ba818b6d84a47c171c Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Wed, 8 Mar 2023 11:46:27 -0800 Subject: [PATCH 05/22] fmt --- .../dispute-coordinator/src/initialized.rs | 18 ++------- .../src/participation/tests.rs | 40 ++++--------------- 2 files changed, 12 insertions(+), 46 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index d98e481ec66d..cfb40660a412 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -164,9 +164,7 @@ impl Initialized { B: Backend, { for (priority, request) in participations.drain(..) { - self.participation - .queue_participation(ctx, priority, request) - .await?; + self.participation.queue_participation(ctx, priority, request).await?; } { @@ -192,10 +190,7 @@ impl Initialized { if let Some(first_leaf) = first_leaf.take() { // Also provide first leaf to participation for good measure. self.participation - .process_active_leaves_update( - ctx, - &ActiveLeavesUpdate::start_work(first_leaf), - ) + .process_active_leaves_update(ctx, &ActiveLeavesUpdate::start_work(first_leaf)) .await?; } @@ -282,15 +277,10 @@ impl Initialized { self.scraper.process_active_leaves_update(ctx.sender(), &update).await?; log_error( self.participation - .bump_to_priority_for_candidates( - ctx, - &scraped_updates.included_receipts, - ) + .bump_to_priority_for_candidates(ctx, &scraped_updates.included_receipts) .await, )?; - self.participation - .process_active_leaves_update(ctx, &update) - .await?; + self.participation.process_active_leaves_update(ctx, &update).await?; if let Some(new_leaf) = update.activated { match self diff --git a/node/core/dispute-coordinator/src/participation/tests.rs b/node/core/dispute-coordinator/src/participation/tests.rs index 313ca30b9318..1480a1e8768b 100644 --- a/node/core/dispute-coordinator/src/participation/tests.rs +++ b/node/core/dispute-coordinator/src/participation/tests.rs @@ -207,10 +207,7 @@ fn same_req_wont_get_queued_if_participation_is_already_running() { ); let result = participation - .get_participation_result( - &mut ctx, - worker_receiver.next().await.unwrap(), - ) + .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) .await .unwrap(); @@ -246,10 +243,7 @@ fn reqs_get_queued_when_out_of_capacity() { for _ in 0..MAX_PARALLEL_PARTICIPATIONS + 1 { let result = participation - .get_participation_result( - &mut ctx, - worker_receiver.next().await.unwrap(), - ) + .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) .await .unwrap(); assert_matches!( @@ -362,10 +356,7 @@ fn cannot_participate_if_cannot_recover_available_data() { "overseer did not receive recover available data message", ); let result = participation - .get_participation_result( - &mut ctx, - worker_receiver.next().await.unwrap(), - ) + .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) .await .unwrap(); assert_matches!( @@ -402,10 +393,7 @@ fn cannot_participate_if_cannot_recover_validation_code() { ); let result = participation - .get_participation_result( - &mut ctx, - worker_receiver.next().await.unwrap(), - ) + .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) .await .unwrap(); assert_matches!( @@ -436,10 +424,7 @@ fn cast_invalid_vote_if_available_data_is_invalid() { ); let result = participation - .get_participation_result( - &mut ctx, - worker_receiver.next().await.unwrap(), - ) + .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) .await .unwrap(); assert_matches!( @@ -476,10 +461,7 @@ fn cast_invalid_vote_if_validation_fails_or_is_invalid() { ); let result = participation - .get_participation_result( - &mut ctx, - worker_receiver.next().await.unwrap(), - ) + .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) .await .unwrap(); assert_matches!( @@ -516,10 +498,7 @@ fn cast_invalid_vote_if_commitments_dont_match() { ); let result = participation - .get_participation_result( - &mut ctx, - worker_receiver.next().await.unwrap(), - ) + .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) .await .unwrap(); assert_matches!( @@ -556,10 +535,7 @@ fn cast_valid_vote_if_validation_passes() { ); let result = participation - .get_participation_result( - &mut ctx, - worker_receiver.next().await.unwrap(), - ) + .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) .await .unwrap(); assert_matches!( From 56e7723b67401fd49c2604f6648ed94846645380 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Thu, 9 Mar 2023 09:00:51 -0800 Subject: [PATCH 06/22] Improving naming --- node/core/dispute-coordinator/src/metrics.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/node/core/dispute-coordinator/src/metrics.rs b/node/core/dispute-coordinator/src/metrics.rs index 1587e9120c8e..0b538a12ff44 100644 --- a/node/core/dispute-coordinator/src/metrics.rs +++ b/node/core/dispute-coordinator/src/metrics.rs @@ -39,9 +39,9 @@ struct MetricsInner { /// requested dispute is complete. participation_pipeline_durations: prometheus::Histogram, /// Size of participation priority queue - priority_queue_size: prometheus::Gauge, + participation_priority_queue_size: prometheus::Gauge, /// Size of participation best effort queue - best_effort_queue_size: prometheus::Gauge, + participation_best_effort_queue_size: prometheus::Gauge, } /// Candidate validation metrics. @@ -126,14 +126,14 @@ impl Metrics { /// Set the priority_queue_size metric pub fn report_priority_queue_size(&self, size: u64) { if let Some(metrics) = &self.0 { - metrics.priority_queue_size.set(size); + metrics.participation_priority_queue_size.set(size); } } /// Set the best_effort_queue_size metric pub fn report_best_effort_queue_size(&self, size: u64) { if let Some(metrics) = &self.0 { - metrics.best_effort_queue_size.set(size); + metrics.participation_best_effort_queue_size.set(size); } } } @@ -221,12 +221,12 @@ impl metrics::Metrics for Metrics { )?, registry, )?, - priority_queue_size: prometheus::register( + participation_priority_queue_size: prometheus::register( prometheus::Gauge::new("polkadot_parachain_dispute_priority_queue_size", "Number of disputes waiting for local participation in the priority queue.")?, registry, )?, - best_effort_queue_size: prometheus::register( + participation_best_effort_queue_size: prometheus::register( prometheus::Gauge::new("polkadot_parachain_dispute_best_effort_queue_size", "Number of disputes waiting for local participation in the best effort queue.")?, registry, From 4185d4d55b1a035d535d5d5154bd5c9f0bd1ec47 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Thu, 9 Mar 2023 12:55:35 -0800 Subject: [PATCH 07/22] Refactor, placing timer in ParticipationRequest --- .../dispute-coordinator/src/initialized.rs | 3 +- node/core/dispute-coordinator/src/lib.rs | 2 + .../src/participation/mod.rs | 15 ++--- .../src/participation/queues/mod.rs | 58 +++++++++---------- .../src/participation/queues/tests.rs | 51 +++++----------- .../src/participation/tests.rs | 3 +- 6 files changed, 52 insertions(+), 80 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index cfb40660a412..db58edf23faf 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -916,12 +916,13 @@ impl Initialized { } else { self.metrics.on_queued_best_effort_participation(); } + let request_timer = Arc::new(self.metrics.time_participation_pipeline()); let r = self .participation .queue_participation( ctx, priority, - ParticipationRequest::new(new_state.candidate_receipt().clone(), session), + ParticipationRequest::new(new_state.candidate_receipt().clone(), session, request_timer), ) .await; log_error(r)?; diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index 1c66c6c6099c..de155c1e8f42 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -347,11 +347,13 @@ impl DisputeCoordinatorSubsystem { ?candidate_hash, "Found valid dispute, with no vote from us on startup - participating." ); + let request_timer = Arc::new(self.metrics.time_participation_pipeline()); participation_requests.push(( ParticipationPriority::with_priority_if(is_included), ParticipationRequest::new( vote_state.votes().candidate_receipt.clone(), session, + request_timer, ), )); } diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index d2586de8db3a..0fb897975e24 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -160,8 +160,6 @@ impl Participation { priority: ParticipationPriority, req: ParticipationRequest, ) -> Result<()> { - let request_timer = self.metrics.time_participation_pipeline(); - // Participation already running - we can ignore that request: if self.running_participations.contains(req.candidate_hash()) { return Ok(()) @@ -169,12 +167,12 @@ impl Participation { // Available capacity - participate right away (if we already have a recent block): if let Some((_, h)) = self.recent_block { if self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS { - self.fork_participation(ctx, req, h, request_timer)?; + self.fork_participation(ctx, req, h)?; return Ok(()) } } // Out of capacity/no recent block yet - queue: - self.queue.queue(ctx.sender(), priority, req, request_timer).await + self.queue.queue(ctx.sender(), priority, req).await } /// Message from a worker task was received - get the outcome. @@ -243,9 +241,9 @@ impl Participation { recent_head: Hash, ) -> FatalResult<()> { while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS { - let (maybe_req, maybe_timer) = self.queue.dequeue(); + let maybe_req = self.queue.dequeue(); if let Some(req) = maybe_req { - self.fork_participation(ctx, req, recent_head, maybe_timer)?; + self.fork_participation(ctx, req, recent_head)?; } else { break } @@ -259,7 +257,6 @@ impl Participation { ctx: &mut Context, req: ParticipationRequest, recent_head: Hash, - request_timer: Option, ) -> FatalResult<()> { let participation_timer = self.metrics.time_participation(); if self.running_participations.insert(*req.candidate_hash()) { @@ -271,7 +268,6 @@ impl Participation { sender, recent_head, req, - request_timer, participation_timer, ) .boxed(), @@ -286,8 +282,7 @@ async fn participate( mut result_sender: WorkerMessageSender, mut sender: impl overseer::DisputeCoordinatorSenderTrait, block_hash: Hash, - req: ParticipationRequest, - _request_timer: Option, // Sends metric data when dropped + req: ParticipationRequest, // Sends metric data via request_timer field when dropped _participation_timer: Option, // Sends metric data when dropped ) { #[cfg(test)] diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 0c3011101a8a..e3e336619b6c 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::{cmp::Ordering, collections::BTreeMap}; +use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; use futures::channel::oneshot; use polkadot_node_subsystem::{messages::ChainApiMessage, overseer}; @@ -60,21 +60,17 @@ pub struct Queues { /// Priority queue. priority: BTreeMap, - /// Timer handle for each participation request. Stored to measure full request - /// completion time. Optimally these would have been stored in the participation - /// request itself, but HistogramTimer doesn't implement the Clone trait. - request_timers: BTreeMap>, - /// Handle for recording queues data in metrics metrics: Metrics, } /// A dispute participation request that can be queued. -#[derive(Debug, PartialEq, Eq, Clone)] +#[derive(Debug, Clone)] pub struct ParticipationRequest { candidate_hash: CandidateHash, candidate_receipt: CandidateReceipt, session: SessionIndex, + _request_timer: Arc> // Sends metric data when request is dropped } /// Whether a `ParticipationRequest` should be put on best-effort or the priority queue. @@ -118,8 +114,8 @@ pub enum QueueError { impl ParticipationRequest { /// Create a new `ParticipationRequest` to be queued. - pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex) -> Self { - Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session } + pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex, request_timer: Arc>) -> Self { + Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, _request_timer: request_timer } } pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt { @@ -135,6 +131,20 @@ impl ParticipationRequest { let Self { candidate_hash, candidate_receipt, .. } = self; (candidate_hash, candidate_receipt) } + // For tests we want to check whether requests are equal, but the + // request_timer field of ParticipationRequest doesn't implement + // eq. This helper checks whether all other fields are equal, + // which is sufficient. + #[cfg(test)] + pub fn functionally_equal(&self, other: ParticipationRequest) -> bool { + if &self.candidate_receipt == other.candidate_receipt() && + &self.candidate_hash == other.candidate_hash() && + self.session == other.session() + { + return true; + } + false + } } impl Queues { @@ -143,7 +153,6 @@ impl Queues { Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), - request_timers: BTreeMap::new(), metrics, } } @@ -159,35 +168,27 @@ impl Queues { sender: &mut impl overseer::DisputeCoordinatorSenderTrait, priority: ParticipationPriority, req: ParticipationRequest, - timer: Option, ) -> Result<()> { let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?; - self.queue_with_comparator(comparator, priority, req, timer)?; + self.queue_with_comparator(comparator, priority, req)?; Ok(()) } /// Get the next best request for dispute participation if any. /// First the priority queue is considered and then the best effort one. - /// We also get the corresponding request timer, if any. pub fn dequeue( &mut self, - ) -> (Option, Option) { - if let Some((comp, req)) = self.pop_priority() { + ) -> Option { + if let Some(req) = self.pop_priority() { self.metrics.report_priority_queue_size(self.priority.len() as u64); - if let Some(maybe_timer) = self.request_timers.remove(&comp) { - return (Some(req), maybe_timer) - } - return (Some(req), None) + return Some(req.1) } - if let Some((comp, req)) = self.pop_best_effort() { + if let Some(req) = self.pop_best_effort() { self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); - if let Some(maybe_timer) = self.request_timers.remove(&comp) { - return (Some(req), maybe_timer) - } - return (Some(req), None) + return Some(req.1) } - (None, None) + None } /// Reprioritizes any participation requests pertaining to the @@ -223,17 +224,13 @@ impl Queues { comparator: CandidateComparator, priority: ParticipationPriority, req: ParticipationRequest, - timer: Option, ) -> std::result::Result<(), QueueError> { if priority.is_priority() { if self.priority.len() >= PRIORITY_QUEUE_SIZE { return Err(QueueError::PriorityFull) } // Remove any best effort entry: - if let None = self.best_effort.remove(&comparator) { - // Only insert new timer if request wasn't in either queue - self.request_timers.insert(comparator, timer); - } + self.best_effort.remove(&comparator); self.priority.insert(comparator, req); self.metrics.report_priority_queue_size(self.priority.len() as u64); self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); @@ -247,7 +244,6 @@ impl Queues { return Err(QueueError::BestEffortFull) } self.best_effort.insert(comparator, req); - self.request_timers.insert(comparator, timer); self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } Ok(()) diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index 527e6d4e4ebf..d70de8ca7fbb 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use std::sync::Arc; use crate::{metrics::Metrics, ParticipationPriority}; use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use assert_matches::assert_matches; @@ -26,7 +27,8 @@ fn make_participation_request(hash: Hash) -> ParticipationRequest { let mut receipt = dummy_candidate_receipt(dummy_hash()); // make it differ: receipt.commitments_hash = hash; - ParticipationRequest::new(receipt, 1) + let request_timer = Arc::new(Metrics::default().time_participation_pipeline()); + ParticipationRequest::new(receipt, 1, request_timer) } /// Make dummy comparator for request, based on the given block number. @@ -58,7 +60,6 @@ fn ordering_works_as_expected() { make_dummy_comparator(&req1, Some(1)), ParticipationPriority::BestEffort, req1.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); queue @@ -66,7 +67,6 @@ fn ordering_works_as_expected() { make_dummy_comparator(&req_prio, Some(1)), ParticipationPriority::Priority, req_prio.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); queue @@ -74,7 +74,6 @@ fn ordering_works_as_expected() { make_dummy_comparator(&req3, Some(2)), ParticipationPriority::BestEffort, req3.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); queue @@ -82,7 +81,6 @@ fn ordering_works_as_expected() { make_dummy_comparator(&req_prio_2, Some(2)), ParticipationPriority::Priority, req_prio_2.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); queue @@ -90,7 +88,6 @@ fn ordering_works_as_expected() { make_dummy_comparator(&req5_unknown_parent, None), ParticipationPriority::BestEffort, req5_unknown_parent.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); assert_matches!( @@ -98,7 +95,6 @@ fn ordering_works_as_expected() { make_dummy_comparator(&req_prio_full, Some(3)), ParticipationPriority::Priority, req_prio_full, - metrics.time_participation_pipeline(), ), Err(QueueError::PriorityFull) ); @@ -107,24 +103,19 @@ fn ordering_works_as_expected() { make_dummy_comparator(&req_full, Some(3)), ParticipationPriority::BestEffort, req_full, - metrics.time_participation_pipeline(), ), Err(QueueError::BestEffortFull) ); - // Timers have been stored for each request - assert_eq!(queue.request_timers.len(), 5); // Prioritized queue is ordered correctly - assert_eq!(queue.dequeue().0, Some(req_prio)); - assert_eq!(queue.dequeue().0, Some(req_prio_2)); + assert!(queue.dequeue().unwrap().functionally_equal(req_prio)); + assert!(queue.dequeue().unwrap().functionally_equal(req_prio_2)); // So is the best-effort - assert_eq!(queue.dequeue().0, Some(req1)); - assert_eq!(queue.dequeue().0, Some(req3)); - assert_eq!(queue.dequeue().0, Some(req5_unknown_parent)); + assert!(queue.dequeue().unwrap().functionally_equal(req1)); + assert!(queue.dequeue().unwrap().functionally_equal(req3)); + assert!(queue.dequeue().unwrap().functionally_equal(req5_unknown_parent)); - assert_matches!(queue.dequeue().0, None); - // Timers have been removed from storage along with requests - assert_eq!(queue.request_timers.len(), 0); + assert_matches!(queue.dequeue(), None); } /// No matter how often a candidate gets queued, it should only ever get dequeued once. @@ -142,7 +133,6 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req1, None), ParticipationPriority::BestEffort, req1.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); queue @@ -150,7 +140,6 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_prio, Some(1)), ParticipationPriority::Priority, req_prio.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); // Insert same best effort again: @@ -159,7 +148,6 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req1, None), ParticipationPriority::BestEffort, req1.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); // insert same prio again: @@ -168,7 +156,6 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_prio, Some(1)), ParticipationPriority::Priority, req_prio.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); @@ -178,7 +165,6 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_best_effort_then_prio, Some(2)), ParticipationPriority::BestEffort, req_best_effort_then_prio.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); // Then as prio: @@ -187,12 +173,11 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_best_effort_then_prio, Some(2)), ParticipationPriority::Priority, req_best_effort_then_prio.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); // Make space in prio: - assert_eq!(queue.dequeue().0, Some(req_prio)); + assert!(queue.dequeue().unwrap().functionally_equal(req_prio)); // Insert first as prio: queue @@ -200,7 +185,6 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_prio_then_best_effort, Some(3)), ParticipationPriority::Priority, req_prio_then_best_effort.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); // Then as best effort: @@ -209,18 +193,11 @@ fn candidate_is_only_dequeued_once() { make_dummy_comparator(&req_prio_then_best_effort, Some(3)), ParticipationPriority::BestEffort, req_prio_then_best_effort.clone(), - metrics.time_participation_pipeline(), ) .unwrap(); - // Timers have been stored for each request - assert_eq!(queue.request_timers.len(), 3); - - assert_eq!(queue.dequeue().0, Some(req_best_effort_then_prio)); - assert_eq!(queue.dequeue().0, Some(req_prio_then_best_effort)); - assert_eq!(queue.dequeue().0, Some(req1)); - assert_eq!(queue.dequeue().0, None); - - // Timers have been removed from storage along with requests - assert_eq!(queue.request_timers.len(), 0); + assert!(queue.dequeue().unwrap().functionally_equal(req_best_effort_then_prio)); + assert!(queue.dequeue().unwrap().functionally_equal(req_prio_then_best_effort)); + assert!(queue.dequeue().unwrap().functionally_equal(req1)); + assert_matches!(queue.dequeue(), None); } diff --git a/node/core/dispute-coordinator/src/participation/tests.rs b/node/core/dispute-coordinator/src/participation/tests.rs index 1480a1e8768b..8bb78726ed45 100644 --- a/node/core/dispute-coordinator/src/participation/tests.rs +++ b/node/core/dispute-coordinator/src/participation/tests.rs @@ -72,7 +72,8 @@ async fn participate_with_commitments_hash( }; let session = 1; - let req = ParticipationRequest::new(candidate_receipt, session); + let request_timer = Arc::new(participation.metrics.time_participation_pipeline()); + let req = ParticipationRequest::new(candidate_receipt, session, request_timer); participation .queue_participation(ctx, ParticipationPriority::BestEffort, req) From 20ceda9ebd462c6756b2648d3ae414919dbba575 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Thu, 9 Mar 2023 12:58:26 -0800 Subject: [PATCH 08/22] fmt --- .../dispute-coordinator/src/initialized.rs | 6 +++- .../src/participation/queues/mod.rs | 31 ++++++++++--------- .../src/participation/queues/tests.rs | 2 +- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index db58edf23faf..620c58fbb7e6 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -922,7 +922,11 @@ impl Initialized { .queue_participation( ctx, priority, - ParticipationRequest::new(new_state.candidate_receipt().clone(), session, request_timer), + ParticipationRequest::new( + new_state.candidate_receipt().clone(), + session, + request_timer, + ), ) .await; log_error(r)?; diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index e3e336619b6c..e5ad0a9bcc0e 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -70,7 +70,7 @@ pub struct ParticipationRequest { candidate_hash: CandidateHash, candidate_receipt: CandidateReceipt, session: SessionIndex, - _request_timer: Arc> // Sends metric data when request is dropped + _request_timer: Arc>, // Sends metric data when request is dropped } /// Whether a `ParticipationRequest` should be put on best-effort or the priority queue. @@ -114,8 +114,17 @@ pub enum QueueError { impl ParticipationRequest { /// Create a new `ParticipationRequest` to be queued. - pub fn new(candidate_receipt: CandidateReceipt, session: SessionIndex, request_timer: Arc>) -> Self { - Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, _request_timer: request_timer } + pub fn new( + candidate_receipt: CandidateReceipt, + session: SessionIndex, + request_timer: Arc>, + ) -> Self { + Self { + candidate_hash: candidate_receipt.hash(), + candidate_receipt, + session, + _request_timer: request_timer, + } } pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt { @@ -132,16 +141,16 @@ impl ParticipationRequest { (candidate_hash, candidate_receipt) } // For tests we want to check whether requests are equal, but the - // request_timer field of ParticipationRequest doesn't implement + // request_timer field of ParticipationRequest doesn't implement // eq. This helper checks whether all other fields are equal, // which is sufficient. #[cfg(test)] pub fn functionally_equal(&self, other: ParticipationRequest) -> bool { - if &self.candidate_receipt == other.candidate_receipt() && + if &self.candidate_receipt == other.candidate_receipt() && &self.candidate_hash == other.candidate_hash() && self.session == other.session() { - return true; + return true } false } @@ -150,11 +159,7 @@ impl ParticipationRequest { impl Queues { /// Create new `Queues`. pub fn new(metrics: Metrics) -> Self { - Self { - best_effort: BTreeMap::new(), - priority: BTreeMap::new(), - metrics, - } + Self { best_effort: BTreeMap::new(), priority: BTreeMap::new(), metrics } } /// Will put message in queue, either priority or best effort depending on priority. @@ -177,9 +182,7 @@ impl Queues { /// Get the next best request for dispute participation if any. /// First the priority queue is considered and then the best effort one. - pub fn dequeue( - &mut self, - ) -> Option { + pub fn dequeue(&mut self) -> Option { if let Some(req) = self.pop_priority() { self.metrics.report_priority_queue_size(self.priority.len() as u64); return Some(req.1) diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index d70de8ca7fbb..914d98c070e9 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -14,11 +14,11 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::sync::Arc; use crate::{metrics::Metrics, ParticipationPriority}; use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use assert_matches::assert_matches; use polkadot_primitives::{BlockNumber, Hash}; +use std::sync::Arc; use super::{CandidateComparator, ParticipationRequest, QueueError, Queues}; From 02e5608df64b2e0f7810905e4508673b2037d351 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 10 Mar 2023 10:06:45 -0800 Subject: [PATCH 09/22] Final cleanup --- node/core/dispute-coordinator/src/metrics.rs | 4 +-- .../src/participation/mod.rs | 3 +-- .../src/participation/queues/mod.rs | 25 ++++++++++--------- .../src/participation/queues/tests.rs | 18 ++++++------- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/node/core/dispute-coordinator/src/metrics.rs b/node/core/dispute-coordinator/src/metrics.rs index 0b538a12ff44..977f5cc700f6 100644 --- a/node/core/dispute-coordinator/src/metrics.rs +++ b/node/core/dispute-coordinator/src/metrics.rs @@ -222,12 +222,12 @@ impl metrics::Metrics for Metrics { registry, )?, participation_priority_queue_size: prometheus::register( - prometheus::Gauge::new("polkadot_parachain_dispute_priority_queue_size", + prometheus::Gauge::new("polkadot_parachain_dispute_participation_priority_queue_size", "Number of disputes waiting for local participation in the priority queue.")?, registry, )?, participation_best_effort_queue_size: prometheus::register( - prometheus::Gauge::new("polkadot_parachain_dispute_best_effort_queue_size", + prometheus::Gauge::new("polkadot_parachain_dispute_participation_best_effort_queue_size", "Number of disputes waiting for local participation in the best effort queue.")?, registry, )?, diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index 0fb897975e24..af8ed6d1905c 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -241,8 +241,7 @@ impl Participation { recent_head: Hash, ) -> FatalResult<()> { while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS { - let maybe_req = self.queue.dequeue(); - if let Some(req) = maybe_req { + if let Some(req) = self.queue.dequeue() { self.fork_participation(ctx, req, recent_head)?; } else { break diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index e5ad0a9bcc0e..7c571d8327e8 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -140,21 +140,22 @@ impl ParticipationRequest { let Self { candidate_hash, candidate_receipt, .. } = self; (candidate_hash, candidate_receipt) } - // For tests we want to check whether requests are equal, but the - // request_timer field of ParticipationRequest doesn't implement - // eq. This helper checks whether all other fields are equal, - // which is sufficient. - #[cfg(test)] - pub fn functionally_equal(&self, other: ParticipationRequest) -> bool { - if &self.candidate_receipt == other.candidate_receipt() && - &self.candidate_hash == other.candidate_hash() && +} + +impl PartialEq for ParticipationRequest { + fn eq(&self, other: &Self) -> bool { + let ParticipationRequest { + candidate_receipt, + candidate_hash, + session: _session, + _request_timer, + } = self; + candidate_receipt == other.candidate_receipt() && + candidate_hash == other.candidate_hash() && self.session == other.session() - { - return true - } - false } } +impl Eq for ParticipationRequest {} impl Queues { /// Create new `Queues`. diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index 914d98c070e9..e4a0a8fc6980 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -108,12 +108,12 @@ fn ordering_works_as_expected() { ); // Prioritized queue is ordered correctly - assert!(queue.dequeue().unwrap().functionally_equal(req_prio)); - assert!(queue.dequeue().unwrap().functionally_equal(req_prio_2)); + assert_eq!(queue.dequeue(), Some(req_prio)); + assert_eq!(queue.dequeue(), Some(req_prio_2)); // So is the best-effort - assert!(queue.dequeue().unwrap().functionally_equal(req1)); - assert!(queue.dequeue().unwrap().functionally_equal(req3)); - assert!(queue.dequeue().unwrap().functionally_equal(req5_unknown_parent)); + assert_eq!(queue.dequeue(), Some(req1)); + assert_eq!(queue.dequeue(), Some(req3)); + assert_eq!(queue.dequeue(), Some(req5_unknown_parent)); assert_matches!(queue.dequeue(), None); } @@ -177,7 +177,7 @@ fn candidate_is_only_dequeued_once() { .unwrap(); // Make space in prio: - assert!(queue.dequeue().unwrap().functionally_equal(req_prio)); + assert_eq!(queue.dequeue(), Some(req_prio)); // Insert first as prio: queue @@ -196,8 +196,8 @@ fn candidate_is_only_dequeued_once() { ) .unwrap(); - assert!(queue.dequeue().unwrap().functionally_equal(req_best_effort_then_prio)); - assert!(queue.dequeue().unwrap().functionally_equal(req_prio_then_best_effort)); - assert!(queue.dequeue().unwrap().functionally_equal(req1)); + assert_eq!(queue.dequeue(), Some(req_best_effort_then_prio)); + assert_eq!(queue.dequeue(), Some(req_prio_then_best_effort)); + assert_eq!(queue.dequeue(), Some(req1)); assert_matches!(queue.dequeue(), None); } From 008063fce6663337d16cfc6e182c015bc866ed43 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 10 Mar 2023 12:48:44 -0800 Subject: [PATCH 10/22] Revert "Final cleanup" This reverts commit 02e5608df64b2e0f7810905e4508673b2037d351. --- node/core/dispute-coordinator/src/metrics.rs | 4 +-- .../src/participation/mod.rs | 3 ++- .../src/participation/queues/mod.rs | 25 +++++++++---------- .../src/participation/queues/tests.rs | 18 ++++++------- 4 files changed, 25 insertions(+), 25 deletions(-) diff --git a/node/core/dispute-coordinator/src/metrics.rs b/node/core/dispute-coordinator/src/metrics.rs index 977f5cc700f6..0b538a12ff44 100644 --- a/node/core/dispute-coordinator/src/metrics.rs +++ b/node/core/dispute-coordinator/src/metrics.rs @@ -222,12 +222,12 @@ impl metrics::Metrics for Metrics { registry, )?, participation_priority_queue_size: prometheus::register( - prometheus::Gauge::new("polkadot_parachain_dispute_participation_priority_queue_size", + prometheus::Gauge::new("polkadot_parachain_dispute_priority_queue_size", "Number of disputes waiting for local participation in the priority queue.")?, registry, )?, participation_best_effort_queue_size: prometheus::register( - prometheus::Gauge::new("polkadot_parachain_dispute_participation_best_effort_queue_size", + prometheus::Gauge::new("polkadot_parachain_dispute_best_effort_queue_size", "Number of disputes waiting for local participation in the best effort queue.")?, registry, )?, diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index e366adc5facb..75db0405548f 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -243,7 +243,8 @@ impl Participation { recent_head: Hash, ) -> FatalResult<()> { while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS { - if let Some(req) = self.queue.dequeue() { + let maybe_req = self.queue.dequeue(); + if let Some(req) = maybe_req { self.fork_participation(ctx, req, recent_head)?; } else { break diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 7c571d8327e8..e5ad0a9bcc0e 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -140,22 +140,21 @@ impl ParticipationRequest { let Self { candidate_hash, candidate_receipt, .. } = self; (candidate_hash, candidate_receipt) } -} - -impl PartialEq for ParticipationRequest { - fn eq(&self, other: &Self) -> bool { - let ParticipationRequest { - candidate_receipt, - candidate_hash, - session: _session, - _request_timer, - } = self; - candidate_receipt == other.candidate_receipt() && - candidate_hash == other.candidate_hash() && + // For tests we want to check whether requests are equal, but the + // request_timer field of ParticipationRequest doesn't implement + // eq. This helper checks whether all other fields are equal, + // which is sufficient. + #[cfg(test)] + pub fn functionally_equal(&self, other: ParticipationRequest) -> bool { + if &self.candidate_receipt == other.candidate_receipt() && + &self.candidate_hash == other.candidate_hash() && self.session == other.session() + { + return true + } + false } } -impl Eq for ParticipationRequest {} impl Queues { /// Create new `Queues`. diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index e4a0a8fc6980..914d98c070e9 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -108,12 +108,12 @@ fn ordering_works_as_expected() { ); // Prioritized queue is ordered correctly - assert_eq!(queue.dequeue(), Some(req_prio)); - assert_eq!(queue.dequeue(), Some(req_prio_2)); + assert!(queue.dequeue().unwrap().functionally_equal(req_prio)); + assert!(queue.dequeue().unwrap().functionally_equal(req_prio_2)); // So is the best-effort - assert_eq!(queue.dequeue(), Some(req1)); - assert_eq!(queue.dequeue(), Some(req3)); - assert_eq!(queue.dequeue(), Some(req5_unknown_parent)); + assert!(queue.dequeue().unwrap().functionally_equal(req1)); + assert!(queue.dequeue().unwrap().functionally_equal(req3)); + assert!(queue.dequeue().unwrap().functionally_equal(req5_unknown_parent)); assert_matches!(queue.dequeue(), None); } @@ -177,7 +177,7 @@ fn candidate_is_only_dequeued_once() { .unwrap(); // Make space in prio: - assert_eq!(queue.dequeue(), Some(req_prio)); + assert!(queue.dequeue().unwrap().functionally_equal(req_prio)); // Insert first as prio: queue @@ -196,8 +196,8 @@ fn candidate_is_only_dequeued_once() { ) .unwrap(); - assert_eq!(queue.dequeue(), Some(req_best_effort_then_prio)); - assert_eq!(queue.dequeue(), Some(req_prio_then_best_effort)); - assert_eq!(queue.dequeue(), Some(req1)); + assert!(queue.dequeue().unwrap().functionally_equal(req_best_effort_then_prio)); + assert!(queue.dequeue().unwrap().functionally_equal(req_prio_then_best_effort)); + assert!(queue.dequeue().unwrap().functionally_equal(req1)); assert_matches!(queue.dequeue(), None); } From 207bb2a6e712828c467dcbbdb2a4fdbeb46f873a Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 10 Mar 2023 15:06:36 -0800 Subject: [PATCH 11/22] Changing metric names --- node/core/dispute-coordinator/src/metrics.rs | 4 ++-- node/core/dispute-coordinator/src/participation/mod.rs | 3 +-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/node/core/dispute-coordinator/src/metrics.rs b/node/core/dispute-coordinator/src/metrics.rs index 0b538a12ff44..977f5cc700f6 100644 --- a/node/core/dispute-coordinator/src/metrics.rs +++ b/node/core/dispute-coordinator/src/metrics.rs @@ -222,12 +222,12 @@ impl metrics::Metrics for Metrics { registry, )?, participation_priority_queue_size: prometheus::register( - prometheus::Gauge::new("polkadot_parachain_dispute_priority_queue_size", + prometheus::Gauge::new("polkadot_parachain_dispute_participation_priority_queue_size", "Number of disputes waiting for local participation in the priority queue.")?, registry, )?, participation_best_effort_queue_size: prometheus::register( - prometheus::Gauge::new("polkadot_parachain_dispute_best_effort_queue_size", + prometheus::Gauge::new("polkadot_parachain_dispute_participation_best_effort_queue_size", "Number of disputes waiting for local participation in the best effort queue.")?, registry, )?, diff --git a/node/core/dispute-coordinator/src/participation/mod.rs b/node/core/dispute-coordinator/src/participation/mod.rs index 75db0405548f..e366adc5facb 100644 --- a/node/core/dispute-coordinator/src/participation/mod.rs +++ b/node/core/dispute-coordinator/src/participation/mod.rs @@ -243,8 +243,7 @@ impl Participation { recent_head: Hash, ) -> FatalResult<()> { while self.running_participations.len() < MAX_PARALLEL_PARTICIPATIONS { - let maybe_req = self.queue.dequeue(); - if let Some(req) = maybe_req { + if let Some(req) = self.queue.dequeue() { self.fork_participation(ctx, req, recent_head)?; } else { break From 3fcb639ab2cca3e5ed2115f41cfbe90f23668e6a Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 10 Mar 2023 15:40:01 -0800 Subject: [PATCH 12/22] Implementing Eq only for unit tests --- .../src/participation/queues/mod.rs | 29 +++++++++++-------- .../src/participation/queues/tests.rs | 21 +++++++------- 2 files changed, 27 insertions(+), 23 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index e5ad0a9bcc0e..cbfb71e20b42 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -140,21 +140,26 @@ impl ParticipationRequest { let Self { candidate_hash, candidate_receipt, .. } = self; (candidate_hash, candidate_receipt) } - // For tests we want to check whether requests are equal, but the - // request_timer field of ParticipationRequest doesn't implement - // eq. This helper checks whether all other fields are equal, - // which is sufficient. - #[cfg(test)] - pub fn functionally_equal(&self, other: ParticipationRequest) -> bool { - if &self.candidate_receipt == other.candidate_receipt() && - &self.candidate_hash == other.candidate_hash() && +} + +// We want to compare participation requests in unit tests, so we +// only implement Eq for tests. +#[cfg(test)] +impl PartialEq for ParticipationRequest { + fn eq(&self, other: &Self) -> bool { + let ParticipationRequest { + candidate_receipt, + candidate_hash, + session: _session, + _request_timer, + } = self; + candidate_receipt == other.candidate_receipt() && + candidate_hash == other.candidate_hash() && self.session == other.session() - { - return true - } - false } } +#[cfg(test)] +impl Eq for ParticipationRequest {} impl Queues { /// Create new `Queues`. diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index 914d98c070e9..f8cf9f8341d4 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -108,12 +108,12 @@ fn ordering_works_as_expected() { ); // Prioritized queue is ordered correctly - assert!(queue.dequeue().unwrap().functionally_equal(req_prio)); - assert!(queue.dequeue().unwrap().functionally_equal(req_prio_2)); + assert_eq!(queue.dequeue(), Some(req_prio)); + assert_eq!(queue.dequeue(), Some(req_prio_2)); // So is the best-effort - assert!(queue.dequeue().unwrap().functionally_equal(req1)); - assert!(queue.dequeue().unwrap().functionally_equal(req3)); - assert!(queue.dequeue().unwrap().functionally_equal(req5_unknown_parent)); + assert_eq!(queue.dequeue(), Some(req1)); + assert_eq!(queue.dequeue(), Some(req3)); + assert_eq!(queue.dequeue(), Some(req5_unknown_parent)); assert_matches!(queue.dequeue(), None); } @@ -127,7 +127,7 @@ fn candidate_is_only_dequeued_once() { let req_prio = make_participation_request(Hash::repeat_byte(0x02)); let req_best_effort_then_prio = make_participation_request(Hash::repeat_byte(0x03)); let req_prio_then_best_effort = make_participation_request(Hash::repeat_byte(0x04)); - + queue .queue_with_comparator( make_dummy_comparator(&req1, None), @@ -158,7 +158,6 @@ fn candidate_is_only_dequeued_once() { req_prio.clone(), ) .unwrap(); - // Insert first as best effort: queue .queue_with_comparator( @@ -177,7 +176,7 @@ fn candidate_is_only_dequeued_once() { .unwrap(); // Make space in prio: - assert!(queue.dequeue().unwrap().functionally_equal(req_prio)); + assert_eq!(queue.dequeue(), Some(req_prio)); // Insert first as prio: queue @@ -196,8 +195,8 @@ fn candidate_is_only_dequeued_once() { ) .unwrap(); - assert!(queue.dequeue().unwrap().functionally_equal(req_best_effort_then_prio)); - assert!(queue.dequeue().unwrap().functionally_equal(req_prio_then_best_effort)); - assert!(queue.dequeue().unwrap().functionally_equal(req1)); + assert_eq!(queue.dequeue(), Some(req_best_effort_then_prio)); + assert_eq!(queue.dequeue(), Some(req_prio_then_best_effort)); + assert_eq!(queue.dequeue(), Some(req1)); assert_matches!(queue.dequeue(), None); } From ef611aa550c61a9066c4740544161e52147b443f Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 10 Mar 2023 15:43:16 -0800 Subject: [PATCH 13/22] fmt --- node/core/dispute-coordinator/src/participation/queues/tests.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index f8cf9f8341d4..63df0d0a11ef 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -127,7 +127,7 @@ fn candidate_is_only_dequeued_once() { let req_prio = make_participation_request(Hash::repeat_byte(0x02)); let req_best_effort_then_prio = make_participation_request(Hash::repeat_byte(0x03)); let req_prio_then_best_effort = make_participation_request(Hash::repeat_byte(0x04)); - + queue .queue_with_comparator( make_dummy_comparator(&req1, None), From 1a4eb45e943a9cc169e418703d90a08b67fc9393 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Tue, 14 Mar 2023 16:06:39 -0700 Subject: [PATCH 14/22] Removing Clone trait from ParticipationRequest --- .../dispute-coordinator/src/initialized.rs | 2 +- node/core/dispute-coordinator/src/lib.rs | 2 +- .../src/participation/queues/mod.rs | 30 +++++++++++++++---- .../src/participation/queues/tests.rs | 3 +- .../src/participation/tests.rs | 2 +- 5 files changed, 28 insertions(+), 11 deletions(-) diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 620c58fbb7e6..2dbba5be328a 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -916,7 +916,7 @@ impl Initialized { } else { self.metrics.on_queued_best_effort_participation(); } - let request_timer = Arc::new(self.metrics.time_participation_pipeline()); + let request_timer = self.metrics.time_participation_pipeline(); let r = self .participation .queue_participation( diff --git a/node/core/dispute-coordinator/src/lib.rs b/node/core/dispute-coordinator/src/lib.rs index de155c1e8f42..d82c3a06c65e 100644 --- a/node/core/dispute-coordinator/src/lib.rs +++ b/node/core/dispute-coordinator/src/lib.rs @@ -347,7 +347,7 @@ impl DisputeCoordinatorSubsystem { ?candidate_hash, "Found valid dispute, with no vote from us on startup - participating." ); - let request_timer = Arc::new(self.metrics.time_participation_pipeline()); + let request_timer = self.metrics.time_participation_pipeline(); participation_requests.push(( ParticipationPriority::with_priority_if(is_included), ParticipationRequest::new( diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index cbfb71e20b42..87533626b9ab 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::{cmp::Ordering, collections::BTreeMap, sync::Arc}; +use std::{cmp::Ordering, collections::BTreeMap}; use futures::channel::oneshot; use polkadot_node_subsystem::{messages::ChainApiMessage, overseer}; @@ -65,12 +65,12 @@ pub struct Queues { } /// A dispute participation request that can be queued. -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct ParticipationRequest { candidate_hash: CandidateHash, candidate_receipt: CandidateReceipt, session: SessionIndex, - _request_timer: Arc>, // Sends metric data when request is dropped + _request_timer: Option, // Sends metric data when request is dropped } /// Whether a `ParticipationRequest` should be put on best-effort or the priority queue. @@ -117,7 +117,7 @@ impl ParticipationRequest { pub fn new( candidate_receipt: CandidateReceipt, session: SessionIndex, - request_timer: Arc>, + request_timer: Option, ) -> Self { Self { candidate_hash: candidate_receipt.hash(), @@ -142,8 +142,8 @@ impl ParticipationRequest { } } -// We want to compare participation requests in unit tests, so we -// only implement Eq for tests. +// We want to compare and clone participation requests in unit tests, so we +// only implement Eq and Clone for tests. #[cfg(test)] impl PartialEq for ParticipationRequest { fn eq(&self, other: &Self) -> bool { @@ -160,6 +160,24 @@ impl PartialEq for ParticipationRequest { } #[cfg(test)] impl Eq for ParticipationRequest {} +#[cfg(test)] +impl Clone for ParticipationRequest { + fn clone(&self) -> Self { + ParticipationRequest { + candidate_receipt: self.candidate_receipt.clone(), + candidate_hash: self.candidate_hash.clone(), + session: self.session, + _request_timer: None, + } + } + + fn clone_from(&mut self, source: &Self) + { + *self = source.clone(); + } +} + + impl Queues { /// Create new `Queues`. diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index 63df0d0a11ef..efcaa3594e28 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -18,7 +18,6 @@ use crate::{metrics::Metrics, ParticipationPriority}; use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use assert_matches::assert_matches; use polkadot_primitives::{BlockNumber, Hash}; -use std::sync::Arc; use super::{CandidateComparator, ParticipationRequest, QueueError, Queues}; @@ -27,7 +26,7 @@ fn make_participation_request(hash: Hash) -> ParticipationRequest { let mut receipt = dummy_candidate_receipt(dummy_hash()); // make it differ: receipt.commitments_hash = hash; - let request_timer = Arc::new(Metrics::default().time_participation_pipeline()); + let request_timer = Metrics::default().time_participation_pipeline(); ParticipationRequest::new(receipt, 1, request_timer) } diff --git a/node/core/dispute-coordinator/src/participation/tests.rs b/node/core/dispute-coordinator/src/participation/tests.rs index a6e5f86616d7..a5fbe2d6a5dd 100644 --- a/node/core/dispute-coordinator/src/participation/tests.rs +++ b/node/core/dispute-coordinator/src/participation/tests.rs @@ -72,7 +72,7 @@ async fn participate_with_commitments_hash( }; let session = 1; - let request_timer = Arc::new(participation.metrics.time_participation_pipeline()); + let request_timer = participation.metrics.time_participation_pipeline(); let req = ParticipationRequest::new(candidate_receipt, session, request_timer); participation From 27fd59fe6ef2c3fc6206a83fa97573e3acadc373 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Tue, 14 Mar 2023 16:33:04 -0700 Subject: [PATCH 15/22] fmt --- .../core/dispute-coordinator/src/participation/queues/mod.rs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 87533626b9ab..48db966c29c2 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -171,14 +171,11 @@ impl Clone for ParticipationRequest { } } - fn clone_from(&mut self, source: &Self) - { + fn clone_from(&mut self, source: &Self) { *self = source.clone(); } } - - impl Queues { /// Create new `Queues`. pub fn new(metrics: Metrics) -> Self { From d8e838f1e0b4add5a696841e92215e0c8804f0eb Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Wed, 15 Mar 2023 11:36:45 -0700 Subject: [PATCH 16/22] Moved clone functionality to tests helper --- .../src/participation/queues/mod.rs | 15 -------- .../src/participation/queues/tests.rs | 38 ++++++++++++------- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 48db966c29c2..ebe742ac8f37 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -160,21 +160,6 @@ impl PartialEq for ParticipationRequest { } #[cfg(test)] impl Eq for ParticipationRequest {} -#[cfg(test)] -impl Clone for ParticipationRequest { - fn clone(&self) -> Self { - ParticipationRequest { - candidate_receipt: self.candidate_receipt.clone(), - candidate_hash: self.candidate_hash.clone(), - session: self.session, - _request_timer: None, - } - } - - fn clone_from(&mut self, source: &Self) { - *self = source.clone(); - } -} impl Queues { /// Create new `Queues`. diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index efcaa3594e28..f4623e76389c 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -38,6 +38,18 @@ fn make_dummy_comparator( CandidateComparator::new_dummy(relay_parent, *req.candidate_hash()) } +/// Make a partial clone of the given ParticipationRequest, just missing +/// the request_timer field. We prefer this helper to implementing Clone +/// for ParticipationRequest, since we only clone requests in tests. +fn clone_request(request: &ParticipationRequest) -> ParticipationRequest { + ParticipationRequest { + candidate_receipt: request.candidate_receipt.clone(), + candidate_hash: request.candidate_hash.clone(), + session: request.session, + _request_timer: None, + } +} + /// Check that dequeuing acknowledges order. /// /// Any priority item will be dequeued before any best effort items, priority and best effort with @@ -58,35 +70,35 @@ fn ordering_works_as_expected() { .queue_with_comparator( make_dummy_comparator(&req1, Some(1)), ParticipationPriority::BestEffort, - req1.clone(), + clone_request(&req1), ) .unwrap(); queue .queue_with_comparator( make_dummy_comparator(&req_prio, Some(1)), ParticipationPriority::Priority, - req_prio.clone(), + clone_request(&req_prio), ) .unwrap(); queue .queue_with_comparator( make_dummy_comparator(&req3, Some(2)), ParticipationPriority::BestEffort, - req3.clone(), + clone_request(&req3), ) .unwrap(); queue .queue_with_comparator( make_dummy_comparator(&req_prio_2, Some(2)), ParticipationPriority::Priority, - req_prio_2.clone(), + clone_request(&req_prio_2), ) .unwrap(); queue .queue_with_comparator( make_dummy_comparator(&req5_unknown_parent, None), ParticipationPriority::BestEffort, - req5_unknown_parent.clone(), + clone_request(&req5_unknown_parent), ) .unwrap(); assert_matches!( @@ -131,14 +143,14 @@ fn candidate_is_only_dequeued_once() { .queue_with_comparator( make_dummy_comparator(&req1, None), ParticipationPriority::BestEffort, - req1.clone(), + clone_request(&req1), ) .unwrap(); queue .queue_with_comparator( make_dummy_comparator(&req_prio, Some(1)), ParticipationPriority::Priority, - req_prio.clone(), + clone_request(&req_prio), ) .unwrap(); // Insert same best effort again: @@ -146,7 +158,7 @@ fn candidate_is_only_dequeued_once() { .queue_with_comparator( make_dummy_comparator(&req1, None), ParticipationPriority::BestEffort, - req1.clone(), + clone_request(&req1), ) .unwrap(); // insert same prio again: @@ -154,7 +166,7 @@ fn candidate_is_only_dequeued_once() { .queue_with_comparator( make_dummy_comparator(&req_prio, Some(1)), ParticipationPriority::Priority, - req_prio.clone(), + clone_request(&req_prio), ) .unwrap(); // Insert first as best effort: @@ -162,7 +174,7 @@ fn candidate_is_only_dequeued_once() { .queue_with_comparator( make_dummy_comparator(&req_best_effort_then_prio, Some(2)), ParticipationPriority::BestEffort, - req_best_effort_then_prio.clone(), + clone_request(&req_best_effort_then_prio), ) .unwrap(); // Then as prio: @@ -170,7 +182,7 @@ fn candidate_is_only_dequeued_once() { .queue_with_comparator( make_dummy_comparator(&req_best_effort_then_prio, Some(2)), ParticipationPriority::Priority, - req_best_effort_then_prio.clone(), + clone_request(&req_best_effort_then_prio), ) .unwrap(); @@ -182,7 +194,7 @@ fn candidate_is_only_dequeued_once() { .queue_with_comparator( make_dummy_comparator(&req_prio_then_best_effort, Some(3)), ParticipationPriority::Priority, - req_prio_then_best_effort.clone(), + clone_request(&req_prio_then_best_effort), ) .unwrap(); // Then as best effort: @@ -190,7 +202,7 @@ fn candidate_is_only_dequeued_once() { .queue_with_comparator( make_dummy_comparator(&req_prio_then_best_effort, Some(3)), ParticipationPriority::BestEffort, - req_prio_then_best_effort.clone(), + clone_request(&req_prio_then_best_effort), ) .unwrap(); From 9bd4dcf41d3dcec35a9e4ce72b8c4d5583120fce Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Wed, 15 Mar 2023 11:43:51 -0700 Subject: [PATCH 17/22] fmt --- node/core/dispute-coordinator/src/participation/queues/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index ebe742ac8f37..0f81fc99652f 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -142,8 +142,8 @@ impl ParticipationRequest { } } -// We want to compare and clone participation requests in unit tests, so we -// only implement Eq and Clone for tests. +// We want to compare participation requests in unit tests, so we +// only implement Eq for tests. #[cfg(test)] impl PartialEq for ParticipationRequest { fn eq(&self, other: &Self) -> bool { From fea1997c8dadaa3563b806ab65ceafcd28bbd1f7 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Thu, 16 Mar 2023 12:31:31 -0700 Subject: [PATCH 18/22] Fixing dropped timers on repeat requests --- .../src/participation/queues/mod.rs | 42 +++++++++++++------ .../src/participation/queues/tests.rs | 2 +- 2 files changed, 31 insertions(+), 13 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 0f81fc99652f..73ab15276cd4 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -70,7 +70,7 @@ pub struct ParticipationRequest { candidate_hash: CandidateHash, candidate_receipt: CandidateReceipt, session: SessionIndex, - _request_timer: Option, // Sends metric data when request is dropped + request_timer: Option, // Sends metric data when request is dropped } /// Whether a `ParticipationRequest` should be put on best-effort or the priority queue. @@ -119,12 +119,7 @@ impl ParticipationRequest { session: SessionIndex, request_timer: Option, ) -> Self { - Self { - candidate_hash: candidate_receipt.hash(), - candidate_receipt, - session, - _request_timer: request_timer, - } + Self { candidate_hash: candidate_receipt.hash(), candidate_receipt, session, request_timer } } pub fn candidate_receipt(&'_ self) -> &'_ CandidateReceipt { @@ -151,7 +146,7 @@ impl PartialEq for ParticipationRequest { candidate_receipt, candidate_hash, session: _session, - _request_timer, + request_timer: _, } = self; candidate_receipt == other.candidate_receipt() && candidate_hash == other.candidate_hash() && @@ -237,9 +232,23 @@ impl Queues { if self.priority.len() >= PRIORITY_QUEUE_SIZE { return Err(QueueError::PriorityFull) } - // Remove any best effort entry: - self.best_effort.remove(&comparator); - self.priority.insert(comparator, req); + // Remove any best effort entry. If there is a timer, stop + // it without sending metric data. + if let Some(discarded_request) = self.best_effort.remove(&comparator) { + if let Some(timer) = discarded_request.request_timer { + timer.stop_and_discard(); + } + } + // Keeping old request if any. This prevents request timers + // from resetting on each new request at the same priority + // level for the same candidate. + if self.priority.contains_key(&comparator) { + if let Some(timer) = req.request_timer { + timer.stop_and_discard(); + } + } else { + self.priority.insert(comparator, req); + } self.metrics.report_priority_queue_size(self.priority.len() as u64); self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } else { @@ -251,7 +260,16 @@ impl Queues { if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE { return Err(QueueError::BestEffortFull) } - self.best_effort.insert(comparator, req); + // Keeping old request if any. This prevents request timers + // from resetting on each new request at the same priority + // level for the same candidate. + if self.best_effort.contains_key(&comparator) { + if let Some(timer) = req.request_timer { + timer.stop_and_discard(); + } + } else { + self.best_effort.insert(comparator, req); + } self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } Ok(()) diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index f4623e76389c..e4ccd0de8d9f 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -46,7 +46,7 @@ fn clone_request(request: &ParticipationRequest) -> ParticipationRequest { candidate_receipt: request.candidate_receipt.clone(), candidate_hash: request.candidate_hash.clone(), session: request.session, - _request_timer: None, + request_timer: None, } } From 0ac7f5bb4dec00a602d0677d3422219b4b798d53 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Thu, 16 Mar 2023 13:07:20 -0700 Subject: [PATCH 19/22] Keep older best effort timers --- .../src/participation/queues/mod.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 3b5a3aabdef4..f2a31bdf36f0 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -226,18 +226,20 @@ impl Queues { &mut self, comparator: CandidateComparator, priority: ParticipationPriority, - req: ParticipationRequest, + mut req: ParticipationRequest, ) -> std::result::Result<(), QueueError> { if priority.is_priority() { if self.priority.len() >= PRIORITY_QUEUE_SIZE { return Err(QueueError::PriorityFull) } - // Remove any best effort entry. If there is a timer, stop - // it without sending metric data. - if let Some(discarded_request) = self.best_effort.remove(&comparator) { - if let Some(timer) = discarded_request.request_timer { + // Remove any best effort entry, using it to replace our new + // request. This preserves the original request timer from + // the older request. + if let Some(older_request) = self.best_effort.remove(&comparator) { + if let Some(timer) = req.request_timer { timer.stop_and_discard(); } + req = older_request; } // Keeping old request if any. This prevents request timers // from resetting on each new request at the same priority From e7f8112f887f19199f0260ab67a3200e7f2f203e Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Fri, 17 Mar 2023 12:37:11 -0700 Subject: [PATCH 20/22] Removing comment redundency and explaining better --- .../src/participation/queues/mod.rs | 33 ++++++++++--------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index f2a31bdf36f0..849b8c37771d 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -142,15 +142,11 @@ impl ParticipationRequest { #[cfg(test)] impl PartialEq for ParticipationRequest { fn eq(&self, other: &Self) -> bool { - let ParticipationRequest { - candidate_receipt, - candidate_hash, - session: _session, - request_timer: _, - } = self; + let ParticipationRequest { candidate_receipt, candidate_hash, session, request_timer: _ } = + self; candidate_receipt == other.candidate_receipt() && candidate_hash == other.candidate_hash() && - self.session == other.session() + *session == other.session() } } #[cfg(test)] @@ -222,6 +218,18 @@ impl Queues { Ok(()) } + /// Will put message in queue, either priority or best effort depending on priority. + /// + /// If the message was already previously present on best effort, it will be moved to priority + /// if it is considered priority now. + /// + /// Returns error in case a queue was found full already. + /// + /// # Request timers + /// + /// [`ParticipationRequest`]s contain request timers. + /// Where an old request would be replaced by a new one, we keep the old request. + /// This prevents request timers from resetting on each new request. fn queue_with_comparator( &mut self, comparator: CandidateComparator, @@ -233,17 +241,14 @@ impl Queues { return Err(QueueError::PriorityFull) } // Remove any best effort entry, using it to replace our new - // request. This preserves the original request timer from - // the older request. + // request. if let Some(older_request) = self.best_effort.remove(&comparator) { if let Some(timer) = req.request_timer { timer.stop_and_discard(); } req = older_request; } - // Keeping old request if any. This prevents request timers - // from resetting on each new request at the same priority - // level for the same candidate. + // Keeping old request if any. if self.priority.contains_key(&comparator) { if let Some(timer) = req.request_timer { timer.stop_and_discard(); @@ -262,9 +267,7 @@ impl Queues { if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE { return Err(QueueError::BestEffortFull) } - // Keeping old request if any. This prevents request timers - // from resetting on each new request at the same priority - // level for the same candidate. + // Keeping old request if any. if self.best_effort.contains_key(&comparator) { if let Some(timer) = req.request_timer { timer.stop_and_discard(); From 70b21f3e5f571734ece588bdd6a569c325f190e0 Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Mon, 20 Mar 2023 09:59:08 -0700 Subject: [PATCH 21/22] Updating queue() to use single mem read --- .../src/participation/queues/mod.rs | 28 +++++++++++-------- 1 file changed, 17 insertions(+), 11 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 849b8c37771d..d33b52cd0c7e 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::{cmp::Ordering, collections::BTreeMap}; +use std::{cmp::Ordering, collections::BTreeMap, collections::btree_map::Entry}; use futures::channel::oneshot; use polkadot_node_subsystem::{messages::ChainApiMessage, overseer}; @@ -249,12 +249,15 @@ impl Queues { req = older_request; } // Keeping old request if any. - if self.priority.contains_key(&comparator) { - if let Some(timer) = req.request_timer { - timer.stop_and_discard(); + match self.priority.entry(comparator) { + Entry::Occupied(_) => { + if let Some(timer) = req.request_timer { + timer.stop_and_discard(); + } + }, + Entry::Vacant(vac) => { + vac.insert(req); } - } else { - self.priority.insert(comparator, req); } self.metrics.report_priority_queue_size(self.priority.len() as u64); self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); @@ -268,12 +271,15 @@ impl Queues { return Err(QueueError::BestEffortFull) } // Keeping old request if any. - if self.best_effort.contains_key(&comparator) { - if let Some(timer) = req.request_timer { - timer.stop_and_discard(); + match self.best_effort.entry(comparator) { + Entry::Occupied(_) => { + if let Some(timer) = req.request_timer { + timer.stop_and_discard(); + } + }, + Entry::Vacant(vac) => { + vac.insert(req); } - } else { - self.best_effort.insert(comparator, req); } self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); } From 81de5d3d75053ce7655c94d1179567ff8a2e5c9f Mon Sep 17 00:00:00 2001 From: BradleyOlson64 Date: Mon, 20 Mar 2023 10:10:38 -0700 Subject: [PATCH 22/22] fmt --- .../src/participation/queues/mod.rs | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index d33b52cd0c7e..a5a5ab962f5a 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -14,7 +14,10 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::{cmp::Ordering, collections::BTreeMap, collections::btree_map::Entry}; +use std::{ + cmp::Ordering, + collections::{btree_map::Entry, BTreeMap}, +}; use futures::channel::oneshot; use polkadot_node_subsystem::{messages::ChainApiMessage, overseer}; @@ -250,14 +253,13 @@ impl Queues { } // Keeping old request if any. match self.priority.entry(comparator) { - Entry::Occupied(_) => { + Entry::Occupied(_) => if let Some(timer) = req.request_timer { timer.stop_and_discard(); - } - }, + }, Entry::Vacant(vac) => { vac.insert(req); - } + }, } self.metrics.report_priority_queue_size(self.priority.len() as u64); self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); @@ -272,14 +274,13 @@ impl Queues { } // Keeping old request if any. match self.best_effort.entry(comparator) { - Entry::Occupied(_) => { + Entry::Occupied(_) => if let Some(timer) = req.request_timer { timer.stop_and_discard(); - } - }, + }, Entry::Vacant(vac) => { vac.insert(req); - } + }, } self.metrics.report_best_effort_queue_size(self.best_effort.len() as u64); }