Skip to content

Commit

Permalink
feat(swarm): Make executor for connection tasks explicit (#3097)
Browse files Browse the repository at this point in the history
Previously, the executor for connection tasks silently defaulted to a `futures::executor::ThreadPool`. This causes issues such as #2230.

With this patch, we force the user to choose, which executor they want to run the connection tasks on which results in overall simpler API with less footguns.

Closes #3068.
  • Loading branch information
umgefahren authored Nov 15, 2022
1 parent d8fe7bf commit d5ea93d
Show file tree
Hide file tree
Showing 41 changed files with 384 additions and 181 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ full = [
"websocket",
"yamux",
]
async-std = ["libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-quic?/async-std"]

async-std = ["libp2p-swarm/async-std", "libp2p-mdns?/async-io", "libp2p-tcp?/async-io", "libp2p-dns?/async-std", "libp2p-quic?/async-std"]
autonat = ["dep:libp2p-autonat"]
dcutr = ["dep:libp2p-dcutr", "libp2p-metrics?/dcutr"]
deflate = ["dep:libp2p-deflate"]
Expand All @@ -74,7 +75,7 @@ rsa = ["libp2p-core/rsa"]
secp256k1 = ["libp2p-core/secp256k1"]
serde = ["libp2p-core/serde", "libp2p-kad?/serde", "libp2p-gossipsub?/serde"]
tcp = ["dep:libp2p-tcp"]
tokio = ["libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"]
tokio = ["libp2p-swarm/tokio", "libp2p-mdns?/tokio", "libp2p-tcp?/tokio", "libp2p-dns?/tokio", "libp2p-quic?/tokio"]
uds = ["dep:libp2p-uds"]
wasm-bindgen = ["futures-timer/wasm-bindgen", "instant/wasm-bindgen", "getrandom/js"]
wasm-ext = ["dep:libp2p-wasm-ext"]
Expand Down
3 changes: 3 additions & 0 deletions core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@

- Hide `prost::Error` from public API in `FromEnvelopeError::InvalidPeerRecord` and `signed_envelope::DecodingError`. See [PR 3058].

- Move `Executor` to `libp2p-swarm`. See [PR 3097].

[PR 3031]: https://github.com/libp2p/rust-libp2p/pull/3031
[PR 3058]: https://github.com/libp2p/rust-libp2p/pull/3058
[PR 3097]: https://github.com/libp2p/rust-libp2p/pull/3097

# 0.37.0

Expand Down
20 changes: 0 additions & 20 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,23 +83,3 @@ pub use upgrade::{InboundUpgrade, OutboundUpgrade, ProtocolName, UpgradeError, U
#[derive(thiserror::Error, Debug)]
#[error(transparent)]
pub struct DecodeError(prost::DecodeError);

use std::{future::Future, pin::Pin};

/// Implemented on objects that can run a `Future` in the background.
///
/// > **Note**: While it may be tempting to implement this trait on types such as
/// > [`futures::stream::FuturesUnordered`], please note that passing an `Executor` is
/// > optional, and that `FuturesUnordered` (or a similar struct) will automatically
/// > be used as fallback by libp2p. The `Executor` trait should therefore only be
/// > about running `Future`s in the background.
pub trait Executor {
/// Run the given future in the background until it ends.
fn exec(&self, future: Pin<Box<dyn Future<Output = ()> + Send>>);
}

impl<F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>)> Executor for F {
fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
self(f)
}
}
23 changes: 6 additions & 17 deletions examples/chat-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use libp2p::{
TokioMdns,
},
mplex, noise,
swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent},
swarm::{NetworkBehaviour, SwarmEvent},
tcp, Multiaddr, PeerId, Transport,
};
use std::error::Error;
Expand Down Expand Up @@ -97,23 +97,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
}

// Create a Swarm to manage peers and events.
let mut swarm = {
let mdns = TokioMdns::new(Default::default())?;
let mut behaviour = MyBehaviour {
floodsub: Floodsub::new(peer_id),
mdns,
};

behaviour.floodsub.subscribe(floodsub_topic.clone());

SwarmBuilder::new(transport, behaviour, peer_id)
// We want the connection background tasks to be spawned
// onto the tokio runtime.
.executor(Box::new(|fut| {
tokio::spawn(fut);
}))
.build()
let mdns = TokioMdns::new(Default::default())?;
let behaviour = MyBehaviour {
floodsub: Floodsub::new(peer_id),
mdns,
};
let mut swarm = libp2p_swarm::Swarm::with_tokio_executor(transport, behaviour, peer_id);

// Reach out to another node if specified
if let Some(to_dial) = std::env::args().nth(1) {
Expand Down
2 changes: 1 addition & 1 deletion examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
};

behaviour.floodsub.subscribe(floodsub_topic.clone());
Swarm::new(transport, behaviour, local_peer_id)
Swarm::with_threadpool_executor(transport, behaviour, local_peer_id)
};

// Reach out to another node if specified
Expand Down
2 changes: 1 addition & 1 deletion examples/distributed-key-value-store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let kademlia = Kademlia::new(local_peer_id, store);
let mdns = Mdns::new(MdnsConfig::default())?;
let behaviour = MyBehaviour { kademlia, mdns };
Swarm::new(transport, behaviour, local_peer_id)
Swarm::with_async_std_executor(transport, behaviour, local_peer_id)
};

// Read full lines from stdin
Expand Down
9 changes: 3 additions & 6 deletions examples/file-sharing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,9 +219,7 @@ mod network {
ProtocolSupport, RequestId, RequestResponse, RequestResponseCodec, RequestResponseEvent,
RequestResponseMessage, ResponseChannel,
};
use libp2p::swarm::{
ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmBuilder, SwarmEvent,
};
use libp2p::swarm::{ConnectionHandlerUpgrErr, NetworkBehaviour, Swarm, SwarmEvent};
use std::collections::{hash_map, HashMap, HashSet};
use std::iter;

Expand Down Expand Up @@ -252,7 +250,7 @@ mod network {

// Build the Swarm, connecting the lower layer transport logic with the
// higher layer network behaviour logic.
let swarm = SwarmBuilder::new(
let swarm = Swarm::with_threadpool_executor(
libp2p::development_transport(id_keys).await?,
ComposedBehaviour {
kademlia: Kademlia::new(peer_id, MemoryStore::new(peer_id)),
Expand All @@ -263,8 +261,7 @@ mod network {
),
},
peer_id,
)
.build();
);

let (command_sender, command_receiver) = mpsc::channel(0);
let (event_sender, event_receiver) = mpsc::channel(0);
Expand Down
2 changes: 1 addition & 1 deletion examples/gossipsub-chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let mut swarm = {
let mdns = Mdns::new(MdnsConfig::default())?;
let behaviour = MyBehaviour { gossipsub, mdns };
Swarm::new(transport, behaviour, local_peer_id)
Swarm::with_async_std_executor(transport, behaviour, local_peer_id)
};

// Read full lines from stdin
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-kad.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
behaviour.add_address(&PeerId::from_str(peer)?, bootaddr.clone());
}

Swarm::new(transport, behaviour, local_peer_id)
Swarm::with_async_std_executor(transport, behaviour, local_peer_id)
};

// Order Kademlia to search for a peer.
Expand Down
2 changes: 1 addition & 1 deletion examples/ipfs-private.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

println!("Subscribing to {gossipsub_topic:?}");
behaviour.gossipsub.subscribe(&gossipsub_topic).unwrap();
Swarm::new(transport, behaviour, local_peer_id)
Swarm::with_async_std_executor(transport, behaviour, local_peer_id)
};

// Reach out to other nodes if specified
Expand Down
2 changes: 1 addition & 1 deletion examples/mdns-passive-discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// Create a Swarm that establishes connections through the given transport.
// Note that the MDNS behaviour itself will not actually inititiate any connections,
// as it only uses UDP.
let mut swarm = Swarm::new(transport, behaviour, peer_id);
let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;

loop {
Expand Down
2 changes: 1 addition & 1 deletion examples/ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let transport = libp2p::development_transport(local_key).await?;

let mut swarm = Swarm::new(transport, Behaviour::default(), local_peer_id);
let mut swarm = Swarm::with_async_std_executor(transport, Behaviour::default(), local_peer_id);

// Tell the swarm to listen on all interfaces and a random, OS-assigned
// port.
Expand Down
2 changes: 1 addition & 1 deletion misc/metrics/examples/metrics/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ fn main() -> Result<(), Box<dyn Error>> {
let local_peer_id = PeerId::from(local_key.public());
info!("Local peer id: {:?}", local_peer_id);

let mut swarm = Swarm::new(
let mut swarm = Swarm::without_executor(
block_on(libp2p::development_transport(local_key))?,
Behaviour::default(),
local_peer_id,
Expand Down
6 changes: 4 additions & 2 deletions misc/multistream-select/tests/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,10 @@ fn transport_upgrade() {

let listen_addr = Multiaddr::from(Protocol::Memory(random::<u64>()));

let mut dialer = Swarm::new(dialer_transport, dummy::Behaviour, dialer_id);
let mut listener = Swarm::new(listener_transport, dummy::Behaviour, listener_id);
let mut dialer =
Swarm::with_async_std_executor(dialer_transport, dummy::Behaviour, dialer_id);
let mut listener =
Swarm::with_async_std_executor(listener_transport, dummy::Behaviour, listener_id);

listener.listen_on(listen_addr).unwrap();
let (addr_sender, addr_receiver) = oneshot::channel();
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/examples/autonat_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let behaviour = Behaviour::new(local_key.public());

let mut swarm = Swarm::new(transport, behaviour, local_peer_id);
let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id);
swarm.listen_on(
Multiaddr::empty()
.with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/examples/autonat_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

let behaviour = Behaviour::new(local_key.public());

let mut swarm = Swarm::new(transport, behaviour, local_peer_id);
let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id);
swarm.listen_on(
Multiaddr::empty()
.with(Protocol::Ip4(Ipv4Addr::UNSPECIFIED))
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/tests/test_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ async fn init_swarm(config: Config) -> Swarm<Behaviour> {
let local_id = PeerId::from_public_key(&keypair.public());
let transport = development_transport(keypair).await.unwrap();
let behaviour = Behaviour::new(local_id, config);
Swarm::new(transport, behaviour, local_id)
Swarm::with_async_std_executor(transport, behaviour, local_id)
}

async fn spawn_server(kill: oneshot::Receiver<()>) -> (PeerId, Multiaddr) {
Expand Down
2 changes: 1 addition & 1 deletion protocols/autonat/tests/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ async fn init_swarm(config: Config) -> Swarm<Behaviour> {
let local_id = PeerId::from_public_key(&keypair.public());
let transport = development_transport(keypair).await.unwrap();
let behaviour = Behaviour::new(local_id, config);
Swarm::new(transport, behaviour, local_id)
Swarm::with_async_std_executor(transport, behaviour, local_id)
}

async fn init_server(config: Option<Config>) -> (Swarm<Behaviour>, PeerId, Multiaddr) {
Expand Down
11 changes: 7 additions & 4 deletions protocols/dcutr/examples/dcutr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// DEALINGS IN THE SOFTWARE.

use clap::Parser;
use futures::executor::block_on;
use futures::executor::{block_on, ThreadPool};
use futures::future::FutureExt;
use futures::stream::StreamExt;
use libp2p::core::multiaddr::{Multiaddr, Protocol};
Expand Down Expand Up @@ -155,9 +155,12 @@ fn main() -> Result<(), Box<dyn Error>> {
dcutr: dcutr::behaviour::Behaviour::new(),
};

let mut swarm = SwarmBuilder::new(transport, behaviour, local_peer_id)
.dial_concurrency_factor(10_u8.try_into().unwrap())
.build();
let mut swarm = match ThreadPool::new() {
Ok(tp) => SwarmBuilder::with_executor(transport, behaviour, local_peer_id, tp),
Err(_) => SwarmBuilder::without_executor(transport, behaviour, local_peer_id),
}
.dial_concurrency_factor(10_u8.try_into().unwrap())
.build();

swarm
.listen_on(
Expand Down
4 changes: 2 additions & 2 deletions protocols/dcutr/tests/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ fn build_relay() -> Swarm<relay::Relay> {

let transport = build_transport(MemoryTransport::default().boxed(), local_public_key);

Swarm::new(
Swarm::with_threadpool_executor(
transport,
relay::Relay::new(
local_peer_id,
Expand All @@ -122,7 +122,7 @@ fn build_client() -> Swarm<Client> {
local_public_key,
);

Swarm::new(
Swarm::with_threadpool_executor(
transport,
Client {
relay: behaviour,
Expand Down
2 changes: 1 addition & 1 deletion protocols/gossipsub/tests/smoke.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ fn build_node() -> (Multiaddr, Swarm<Gossipsub>) {
.build()
.unwrap();
let behaviour = Gossipsub::new(MessageAuthenticity::Author(peer_id), config).unwrap();
let mut swarm = Swarm::new(transport, behaviour, peer_id);
let mut swarm = Swarm::without_executor(transport, behaviour, peer_id);

let port = 1 + random::<u64>();
let mut addr: Multiaddr = Protocol::Memory(port).into();
Expand Down
2 changes: 1 addition & 1 deletion protocols/identify/examples/identify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
local_key.public(),
));

let mut swarm = Swarm::new(transport, behaviour, local_peer_id);
let mut swarm = Swarm::with_async_std_executor(transport, behaviour, local_peer_id);

// Tell the swarm to listen on all interfaces and a random, OS-assigned
// port.
Expand Down
12 changes: 6 additions & 6 deletions protocols/identify/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ mod tests {
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
};

Expand All @@ -593,7 +593,7 @@ mod tests {
let protocol = Behaviour::new(
Config::new("c".to_string(), pubkey.clone()).with_agent_version("d".to_string()),
);
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
};

Expand Down Expand Up @@ -661,7 +661,7 @@ mod tests {
let (mut swarm1, pubkey1) = {
let (pubkey, transport) = transport();
let protocol = Behaviour::new(Config::new("a".to_string(), pubkey.clone()));
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
};

Expand All @@ -670,7 +670,7 @@ mod tests {
let protocol = Behaviour::new(
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);
let swarm = Swarm::new(transport, protocol, pubkey.to_peer_id());
let swarm = Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id());
(swarm, pubkey)
};

Expand Down Expand Up @@ -742,7 +742,7 @@ mod tests {
.with_initial_delay(Duration::from_secs(10)),
);

Swarm::new(transport, protocol, pubkey.to_peer_id())
Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
};

let mut swarm2 = {
Expand All @@ -751,7 +751,7 @@ mod tests {
Config::new("a".to_string(), pubkey.clone()).with_agent_version("b".to_string()),
);

Swarm::new(transport, protocol, pubkey.to_peer_id())
Swarm::with_async_std_executor(transport, protocol, pubkey.to_peer_id())
};

let swarm1_peer_id = *swarm1.local_peer_id();
Expand Down
2 changes: 1 addition & 1 deletion protocols/kad/src/behaviour/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ fn build_node_with_config(cfg: KademliaConfig) -> (Multiaddr, TestSwarm) {
let store = MemoryStore::new(local_id);
let behaviour = Kademlia::with_config(local_id, store, cfg);

let mut swarm = Swarm::new(transport, behaviour, local_id);
let mut swarm = Swarm::without_executor(transport, behaviour, local_id);

let address: Multiaddr = Protocol::Memory(random::<u64>()).into();
swarm.listen_on(address.clone()).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion protocols/mdns/tests/use-async-std.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async fn create_swarm(config: MdnsConfig) -> Result<Swarm<Mdns>, Box<dyn Error>>
let peer_id = PeerId::from(id_keys.public());
let transport = libp2p::development_transport(id_keys).await?;
let behaviour = Mdns::new(config)?;
let mut swarm = Swarm::new(transport, behaviour, peer_id);
let mut swarm = Swarm::with_async_std_executor(transport, behaviour, peer_id);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
Ok(swarm)
}
Expand Down
2 changes: 1 addition & 1 deletion protocols/mdns/tests/use-tokio.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn create_swarm(config: MdnsConfig) -> Result<Swarm<TokioMdns>, Box<dyn Er
let peer_id = PeerId::from(id_keys.public());
let transport = libp2p::tokio_development_transport(id_keys)?;
let behaviour = TokioMdns::new(config)?;
let mut swarm = Swarm::new(transport, behaviour, peer_id);
let mut swarm = Swarm::with_tokio_executor(transport, behaviour, peer_id);
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
Ok(swarm)
}
Expand Down
Loading

0 comments on commit d5ea93d

Please sign in to comment.