Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Revert "remove connected disconnected state only" #3896

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions doc/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,6 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandler), Error>
where
Expand Down Expand Up @@ -214,7 +213,7 @@ impl OverseerGen for BehaveMaleficient {
),
);

Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
.map_err(|e| e.into())

// A builder pattern will simplify this further
Expand Down
5 changes: 2 additions & 3 deletions node/malus/src/variant-a.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use polkadot_cli::{
use polkadot_node_core_candidate_validation::CandidateValidationSubsystem;
use polkadot_node_subsystem::{
messages::{AllMessages, CandidateValidationMessage},
overseer::{self, OverseerConnector, OverseerHandle},
overseer::{self, OverseerHandle},
FromOverseer,
};

Expand Down Expand Up @@ -86,7 +86,6 @@ struct BehaveMaleficient;
impl OverseerGen for BehaveMaleficient {
fn generate<'a, Spawner, RuntimeClient>(
&self,
connector: OverseerConnector,
args: OverseerGenArgs<'a, Spawner, RuntimeClient>,
) -> Result<(Overseer<Spawner, Arc<RuntimeClient>>, OverseerHandle), Error>
where
Expand Down Expand Up @@ -114,7 +113,7 @@ impl OverseerGen for BehaveMaleficient {
},
);

Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner, connector)
Overseer::new(leaves, all_subsystems, registry, runtime_client, spawner)
.map_err(|e| e.into())
}
}
Expand Down
14 changes: 3 additions & 11 deletions node/overseer/examples/minimal-example.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@ use polkadot_node_subsystem_types::messages::{
use polkadot_overseer::{
self as overseer,
gen::{FromOverseer, SpawnedSubsystem},
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerConnector,
OverseerSignal, SubsystemError,
AllMessages, AllSubsystems, HeadSupportsParachains, Overseer, OverseerSignal, SubsystemError,
};
use polkadot_primitives::v1::Hash;

Expand Down Expand Up @@ -174,15 +173,8 @@ fn main() {
.replace_candidate_validation(|_| Subsystem2)
.replace_candidate_backing(|orig| orig);

let (overseer, _handle) = Overseer::new(
vec![],
all_subsystems,
None,
AlwaysSupportsParachains,
spawner,
OverseerConnector::default(),
)
.unwrap();
let (overseer, _handle) =
Overseer::new(vec![], all_subsystems, None, AlwaysSupportsParachains, spawner).unwrap();
let overseer_fut = overseer.run().fuse();
let timer_stream = timer_stream;

Expand Down
2 changes: 1 addition & 1 deletion node/overseer/overseer-gen/proc-macro/src/impl_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ pub(crate) fn impl_builder(info: &OverseerInfo) -> proc_macro2::TokenStream {
&mut self.handle
}
/// Obtain access to the overseer handle.
pub fn as_handle(&self) -> &#handle {
pub fn as_handle(&mut self) -> &#handle {
&self.handle
}
}
Expand Down
54 changes: 0 additions & 54 deletions node/overseer/src/dummy.rs

This file was deleted.

87 changes: 68 additions & 19 deletions node/overseer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ use std::{

use futures::{channel::oneshot, future::BoxFuture, select, Future, FutureExt, StreamExt};
use lru::LruCache;
use parking_lot::RwLock;

use client::{BlockImportNotification, BlockchainEvents, FinalityNotification};
use polkadot_primitives::v1::{Block, BlockId, BlockNumber, Hash, ParachainHost};
Expand All @@ -90,17 +91,12 @@ pub use polkadot_node_subsystem_types::{
jaeger, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, OverseerSignal,
};

/// Test helper supplements.
pub mod dummy;
pub use self::dummy::DummySubsystem;

// TODO legacy, to be deleted, left for easier integration
// TODO https://github.com/paritytech/polkadot/issues/3427
mod subsystems;
pub use self::subsystems::AllSubsystems;
pub use self::subsystems::{AllSubsystems, DummySubsystem};

/// Metrics re-exports of `polkadot-metrics`.
pub mod metrics;
mod metrics;
use self::metrics::Metrics;

use polkadot_node_metrics::{
Expand All @@ -119,7 +115,7 @@ pub use polkadot_overseer_gen::{

/// Store 2 days worth of blocks, not accounting for forks,
/// in the LRU cache. Assumes a 6-second block time.
pub const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;
const KNOWN_LEAVES_CACHE_SIZE: usize = 2 * 24 * 3600 / 6;

#[cfg(test)]
mod tests;
Expand All @@ -145,12 +141,18 @@ where
///
/// [`Overseer`]: struct.Overseer.html
#[derive(Clone)]
pub struct Handle(pub OverseerHandle);
pub enum Handle {
/// Used only at initialization to break the cyclic dependency.
// TODO: refactor in https://github.com/paritytech/polkadot/issues/3427
Disconnected(Arc<RwLock<Option<OverseerHandle>>>),
/// A handle to the overseer.
Connected(OverseerHandle),
}

impl Handle {
/// Create a new [`Handle`].
pub fn new(raw: OverseerHandle) -> Self {
Self(raw)
/// Create a new disconnected [`Handle`].
pub fn new_disconnected() -> Self {
Self::Disconnected(Arc::new(RwLock::new(None)))
}

/// Inform the `Overseer` that that some block was imported.
Expand Down Expand Up @@ -199,8 +201,58 @@ impl Handle {

/// Most basic operation, to stop a server.
async fn send_and_log_error(&mut self, event: Event) {
if self.0.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
self.try_connect();
if let Self::Connected(ref mut handle) = self {
if handle.send(event).await.is_err() {
tracing::info!(target: LOG_TARGET, "Failed to send an event to Overseer");
}
} else {
tracing::warn!(target: LOG_TARGET, "Using a disconnected Handle to send to Overseer");
}
}

/// Whether the handle is disconnected.
pub fn is_disconnected(&self) -> bool {
match self {
Self::Disconnected(ref x) => x.read().is_none(),
_ => false,
}
}

/// Connect this handle and all disconnected clones of it to the overseer.
pub fn connect_to_overseer(&mut self, handle: OverseerHandle) {
match self {
Self::Disconnected(ref mut x) => {
let mut maybe_handle = x.write();
if maybe_handle.is_none() {
tracing::info!(target: LOG_TARGET, "🖇️ Connecting all Handles to Overseer");
*maybe_handle = Some(handle);
} else {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect a clone of a connected Handle",
);
}
},
_ => {
tracing::warn!(
target: LOG_TARGET,
"Attempting to connect an already connected Handle",
);
},
}
}

/// Try upgrading from `Self::Disconnected` to `Self::Connected` state
/// after calling `connect_to_overseer` on `self` or a clone of `self`.
fn try_connect(&mut self) {
if let Self::Disconnected(ref mut x) = self {
let guard = x.write();
if let Some(ref h) = *guard {
let handle = h.clone();
drop(guard);
*self = Self::Connected(handle);
}
}
}
}
Expand Down Expand Up @@ -438,13 +490,12 @@ where
/// # use polkadot_primitives::v1::Hash;
/// # use polkadot_overseer::{
/// # self as overseer,
/// # Overseer,
/// # OverseerSignal,
/// # OverseerConnector,
/// # SubsystemSender as _,
/// # AllMessages,
/// # AllSubsystems,
/// # HeadSupportsParachains,
/// # Overseer,
/// # SubsystemError,
/// # gen::{
/// # SubsystemContext,
Expand Down Expand Up @@ -498,7 +549,6 @@ where
/// None,
/// AlwaysSupportsParachains,
/// spawner,
/// OverseerConnector::default(),
/// ).unwrap();
///
/// let timer = Delay::new(Duration::from_millis(50)).fuse();
Expand Down Expand Up @@ -565,7 +615,6 @@ where
prometheus_registry: Option<&prometheus::Registry>,
supports_parachains: SupportsParachains,
s: S,
connector: OverseerConnector,
) -> SubsystemResult<(Self, OverseerHandle)>
where
CV: Subsystem<OverseerSubsystemContext<CandidateValidationMessage>, SubsystemError> + Send,
Expand Down Expand Up @@ -630,7 +679,7 @@ where
.supports_parachains(supports_parachains)
.metrics(metrics.clone())
.spawner(s)
.build_with_connector(connector)?;
.build()?;

// spawn the metrics metronome task
{
Expand Down
4 changes: 2 additions & 2 deletions node/overseer/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
//! Prometheus metrics related to the overseer and its channels.

use super::*;
pub use polkadot_node_metrics::metrics::{self, prometheus, Metrics as MetricsTrait};
use polkadot_node_metrics::metrics::{self, prometheus};

use parity_util_mem::MemoryAllocationSnapshot;

Expand Down Expand Up @@ -110,7 +110,7 @@ impl Metrics {
}
}

impl MetricsTrait for Metrics {
impl metrics::Metrics for Metrics {
fn try_register(registry: &prometheus::Registry) -> Result<Self, prometheus::PrometheusError> {
let metrics = MetricsInner {
activated_heads_total: prometheus::register(
Expand Down
42 changes: 40 additions & 2 deletions node/overseer/src/subsystems.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,47 @@
//! In the future, everything should be set up using the generated
//! overseer builder pattern instead.

use crate::dummy::DummySubsystem;
use crate::{AllMessages, OverseerSignal};
use polkadot_node_subsystem_types::errors::SubsystemError;
use polkadot_overseer_all_subsystems_gen::AllSubsystemsGen;
use polkadot_overseer_gen::MapSubsystem;
use polkadot_overseer_gen::{
FromOverseer, MapSubsystem, SpawnedSubsystem, Subsystem, SubsystemContext,
};

/// A dummy subsystem that implements [`Subsystem`] for all
/// types of messages. Used for tests or as a placeholder.
#[derive(Clone, Copy, Debug)]
pub struct DummySubsystem;

impl<Context> Subsystem<Context, SubsystemError> for DummySubsystem
where
Context: SubsystemContext<
Signal = OverseerSignal,
Error = SubsystemError,
AllMessages = AllMessages,
>,
{
fn start(self, mut ctx: Context) -> SpawnedSubsystem<SubsystemError> {
let future = Box::pin(async move {
loop {
match ctx.recv().await {
Err(_) => return Ok(()),
Ok(FromOverseer::Signal(OverseerSignal::Conclude)) => return Ok(()),
Ok(overseer_msg) => {
tracing::debug!(
target: "dummy-subsystem",
"Discarding a message sent from overseer {:?}",
overseer_msg
);
continue
},
}
}
});

SpawnedSubsystem { name: "dummy-subsystem", future }
}
}

/// This struct is passed as an argument to create a new instance of an [`Overseer`].
///
Expand Down
Loading