Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
client/network-gossip: Integrate GossipEngine tasks into Future impl (#…
Browse files Browse the repository at this point in the history
…4767)

`GossipEngine` spawns two tasks, one for a periodic tick, one to forward
messages from the network to subscribers. These tasks hold an `Arc` to a
`GossipEngineInner`.

To reduce the amount of shared ownership (locking) this patch integrates
the two tasks into a `Future` implementation on the `GossipEngine`
struct. This `Future` implementation can now be called from a single
owner, e.g. the `finality-grandpa` `NetworkBridge`.

As a side effect this removes the requirement on the `network-gossip`
crate to spawn tasks and thereby removes the requirement on the
`finality-grandpa` crate to spawn any tasks.

This is part of a greater effort to reduce the number of owners of
components within `finality-grandpa`, `network` and `network-gossip` as
well as to reduce the amount of unbounded channels. For details see
d4fbb89, f0c1852 and 5afc777.
  • Loading branch information
mxinden authored Feb 12, 2020
1 parent 2290645 commit e1668c2
Show file tree
Hide file tree
Showing 9 changed files with 121 additions and 155 deletions.
2 changes: 0 additions & 2 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ pub fn new_full(config: Configuration<GenesisConfig>)
grandpa_link,
service.network(),
service.on_exit(),
service.spawn_task_handle(),
)?);
},
(true, false) => {
Expand All @@ -172,7 +171,6 @@ pub fn new_full(config: Configuration<GenesisConfig>)
on_exit: service.on_exit(),
telemetry_on_connect: Some(service.telemetry_on_connect_stream()),
voting_rule: grandpa::VotingRulesBuilder::default().build(),
executor: service.spawn_task_handle(),
};

// the GRANDPA voter task is considered infallible, i.e.
Expand Down
2 changes: 0 additions & 2 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,6 @@ macro_rules! new_full {
grandpa_link,
service.network(),
service.on_exit(),
service.spawn_task_handle(),
)?);
},
(true, false) => {
Expand All @@ -229,7 +228,6 @@ macro_rules! new_full {
on_exit: service.on_exit(),
telemetry_on_connect: Some(service.telemetry_on_connect_stream()),
voting_rule: grandpa::VotingRulesBuilder::default().build(),
executor: service.spawn_task_handle(),
};
// the GRANDPA voter task is considered infallible, i.e.
// if it fails we take down the service with it.
Expand Down
14 changes: 9 additions & 5 deletions client/finality-grandpa/src/communication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,15 +178,14 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
service: N,
config: crate::Config,
set_state: crate::environment::SharedVoterSetState<B>,
executor: &impl futures::task::Spawn,
) -> Self {
let (validator, report_stream) = GossipValidator::new(
config,
set_state.clone(),
);

let validator = Arc::new(validator);
let gossip_engine = GossipEngine::new(service.clone(), executor, GRANDPA_ENGINE_ID, validator.clone());
let gossip_engine = GossipEngine::new(service.clone(), GRANDPA_ENGINE_ID, validator.clone());

{
// register all previous votes with the gossip service so that they're
Expand Down Expand Up @@ -374,10 +373,9 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
|to, neighbor| self.neighbor_sender.send(to, neighbor),
);

let service = self.gossip_engine.clone();
let topic = global_topic::<B>(set_id.0);
let incoming = incoming_global(
service,
self.gossip_engine.clone(),
topic,
voters,
self.validator.clone(),
Expand Down Expand Up @@ -419,7 +417,7 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
type Output = Result<(), Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
loop {
match self.neighbor_packet_worker.lock().poll_next_unpin(cx) {
Poll::Ready(Some((to, packet))) => {
Expand All @@ -444,6 +442,12 @@ impl<B: BlockT, N: Network<B>> Future for NetworkBridge<B, N> {
}
}

match self.gossip_engine.poll_unpin(cx) {
// The gossip engine future finished. We should do the same.
Poll::Ready(()) => return Poll::Ready(Ok(())),
Poll::Pending => {},
}

Poll::Pending
}
}
Expand Down
32 changes: 19 additions & 13 deletions client/finality-grandpa/src/communication/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ fn voter_set_state() -> SharedVoterSetState<Block> {
}

// needs to run in a tokio runtime.
pub(crate) fn make_test_network(executor: &impl futures::task::Spawn) -> (
pub(crate) fn make_test_network() -> (
impl Future<Output = Tester>,
TestNetwork,
) {
Expand All @@ -187,7 +187,6 @@ pub(crate) fn make_test_network(executor: &impl futures::task::Spawn) -> (
net.clone(),
config(),
voter_set_state(),
executor,
);

(
Expand Down Expand Up @@ -261,8 +260,7 @@ fn good_commit_leads_to_relay() {
let id = sc_network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let test = make_test_network(&threads_pool).0
let test = make_test_network().0
.then(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, sc_network::config::Roles::FULL);
Expand All @@ -281,6 +279,7 @@ fn good_commit_leads_to_relay() {
}

let commit_to_send = encoded_commit.clone();
let network_bridge = tester.net_handle.clone();

// asking for global communication will cause the test network
// to send us an event asking us for a stream. use it to
Expand Down Expand Up @@ -325,15 +324,19 @@ fn good_commit_leads_to_relay() {

// once the message is sent and commit is "handled" we should have
// a repropagation event coming from the network.
future::join(send_message, handle_commit).then(move |(tester, ())| {
let fut = future::join(send_message, handle_commit).then(move |(tester, ())| {
tester.filter_network_events(move |event| match event {
Event::WriteNotification(_, data) => {
data == encoded_commit
}
_ => false,
})
})
.map(|_| ())
.map(|_| ());

// Poll both the future sending and handling the commit, as well as the underlying
// NetworkBridge. Complete once the former completes.
future::select(fut, network_bridge)
});

futures::executor::block_on(test);
Expand Down Expand Up @@ -385,8 +388,7 @@ fn bad_commit_leads_to_report() {
let id = sc_network::PeerId::random();
let global_topic = super::global_topic::<Block>(set_id);

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let test = make_test_network(&threads_pool).0
let test = make_test_network().0
.map(move |tester| {
// register a peer.
tester.gossip_validator.new_peer(&mut NoopContext, &id, sc_network::config::Roles::FULL);
Expand All @@ -405,6 +407,7 @@ fn bad_commit_leads_to_report() {
}

let commit_to_send = encoded_commit.clone();
let network_bridge = tester.net_handle.clone();

// asking for global communication will cause the test network
// to send us an event asking us for a stream. use it to
Expand All @@ -427,7 +430,7 @@ fn bad_commit_leads_to_report() {
_ => false,
});

// when the commit comes in, we'll tell the callback it was good.
// when the commit comes in, we'll tell the callback it was bad.
let handle_commit = commits_in.into_future()
.map(|(item, _)| {
match item.unwrap() {
Expand All @@ -440,15 +443,19 @@ fn bad_commit_leads_to_report() {

// once the message is sent and commit is "handled" we should have
// a report event coming from the network.
future::join(send_message, handle_commit).then(move |(tester, ())| {
let fut = future::join(send_message, handle_commit).then(move |(tester, ())| {
tester.filter_network_events(move |event| match event {
Event::Report(who, cost_benefit) => {
who == id && cost_benefit == super::cost::INVALID_COMMIT
}
_ => false,
})
})
.map(|_| ())
.map(|_| ());

// Poll both the future sending and handling the commit, as well as the underlying
// NetworkBridge. Complete once the former completes.
future::select(fut, network_bridge)
});

futures::executor::block_on(test);
Expand All @@ -458,8 +465,7 @@ fn bad_commit_leads_to_report() {
fn peer_with_higher_view_leads_to_catch_up_request() {
let id = sc_network::PeerId::random();

let threads_pool = futures::executor::ThreadPool::new().unwrap();
let (tester, mut net) = make_test_network(&threads_pool);
let (tester, mut net) = make_test_network();
let test = tester
.map(move |tester| {
// register a peer with authority role.
Expand Down
16 changes: 5 additions & 11 deletions client/finality-grandpa/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -516,7 +516,7 @@ fn register_finality_tracker_inherent_data_provider<B, E, Block: BlockT, RA>(
}

/// Parameters used to run Grandpa.
pub struct GrandpaParams<B, E, Block: BlockT, N, RA, SC, VR, X, Sp> {
pub struct GrandpaParams<B, E, Block: BlockT, N, RA, SC, VR, X> {
/// Configuration for the GRANDPA service.
pub config: Config,
/// A link to the block import worker.
Expand All @@ -531,14 +531,12 @@ pub struct GrandpaParams<B, E, Block: BlockT, N, RA, SC, VR, X, Sp> {
pub telemetry_on_connect: Option<futures::channel::mpsc::UnboundedReceiver<()>>,
/// A voting rule used to potentially restrict target votes.
pub voting_rule: VR,
/// How to spawn background tasks.
pub executor: Sp,
}

/// Run a GRANDPA voter as a task. Provide configuration and a link to a
/// block import worker that has already been instantiated with `block_import`.
pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>,
pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X>,
) -> sp_blockchain::Result<impl Future<Output = ()> + Unpin + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block> + 'static,
Expand All @@ -551,7 +549,6 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
RA: Send + Sync + 'static,
X: futures::Future<Output=()> + Clone + Send + Unpin + 'static,
Client<B, E, Block, RA>: AuxStore,
Sp: futures::task::Spawn + 'static,
{
let GrandpaParams {
config,
Expand All @@ -561,7 +558,6 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
on_exit,
telemetry_on_connect,
voting_rule,
executor,
} = grandpa_params;

let LinkHalf {
Expand All @@ -575,7 +571,6 @@ pub fn run_grandpa_voter<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
network,
config.clone(),
persistent_data.set_state.clone(),
&executor,
);

register_finality_tracker_inherent_data_provider(client.clone(), &inherent_data_providers)?;
Expand Down Expand Up @@ -863,8 +858,8 @@ where
}

#[deprecated(since = "1.1.0", note = "Please switch to run_grandpa_voter.")]
pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X, Sp>,
pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X>(
grandpa_params: GrandpaParams<B, E, Block, N, RA, SC, VR, X>,
) -> sp_blockchain::Result<impl Future<Output=()> + Send + 'static> where
Block::Hash: Ord,
B: Backend<Block> + 'static,
Expand All @@ -877,7 +872,6 @@ pub fn run_grandpa<B, E, Block: BlockT, N, RA, SC, VR, X, Sp>(
VR: VotingRule<Block, Client<B, E, Block, RA>> + Clone + 'static,
X: futures::Future<Output=()> + Clone + Send + Unpin + 'static,
Client<B, E, Block, RA>: AuxStore,
Sp: futures::task::Spawn + 'static,
{
run_grandpa_voter(grandpa_params)
}
Expand Down
9 changes: 2 additions & 7 deletions client/finality-grandpa/src/observer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,20 +150,18 @@ fn grandpa_observer<B, E, Block: BlockT, RA, S, F>(
/// listening for and validating GRANDPA commits instead of following the full
/// protocol. Provide configuration and a link to a block import worker that has
/// already been instantiated with `block_import`.
pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC, Sp>(
pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC>(
config: Config,
link: LinkHalf<B, E, Block, RA, SC>,
network: N,
on_exit: impl futures::Future<Output=()> + Clone + Send + Unpin + 'static,
executor: Sp,
) -> sp_blockchain::Result<impl Future<Output = ()> + Unpin + Send + 'static> where
B: Backend<Block> + 'static,
E: CallExecutor<Block> + Send + Sync + 'static,
N: NetworkT<Block> + Send + Clone + 'static,
SC: SelectChain<Block> + 'static,
NumberFor<Block>: BlockNumberOps,
RA: Send + Sync + 'static,
Sp: futures::task::Spawn + 'static,
Client<B, E, Block, RA>: AuxStore,
{
let LinkHalf {
Expand All @@ -177,7 +175,6 @@ pub fn run_grandpa_observer<B, E, Block: BlockT, N, RA, SC, Sp>(
network,
config.clone(),
persistent_data.set_state.clone(),
&executor,
);

let observer_work = ObserverWork::new(
Expand Down Expand Up @@ -392,10 +389,8 @@ mod tests {
/// network.
#[test]
fn observer_work_polls_underlying_network_bridge() {
let thread_pool = ThreadPool::new().unwrap();

// Create a test network.
let (tester_fut, _network) = make_test_network(&thread_pool);
let (tester_fut, _network) = make_test_network();
let mut tester = executor::block_on(tester_fut);

// Create an observer.
Expand Down
Loading

0 comments on commit e1668c2

Please sign in to comment.