Skip to content

Commit

Permalink
transport: Allow manual setting of keep-alive time
Browse files Browse the repository at this point in the history
Allow configure time duration of keep-alive-timeouts in `TransportService`.
For `RequestResponseProtocol`, use its timeout configuration.
For other protocols, use the original default 5 seconds.
  • Loading branch information
Ma233 committed Jun 15, 2024
1 parent 0778a02 commit 08d5bfb
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 5 deletions.
16 changes: 13 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ use multihash::Multihash;
use transport::Endpoint;
use types::ConnectionId;

use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, sync::Arc, time::Duration};

pub use bandwidth::BandwidthSink;
pub use error::Error;
Expand Down Expand Up @@ -164,6 +164,7 @@ impl Litep2p {
protocol,
config.fallback_names.clone(),
config.codec,
Duration::from_secs(5),
);
let executor = Arc::clone(&litep2p_config.executor);
litep2p_config.executor.run(Box::pin(async move {
Expand All @@ -183,6 +184,7 @@ impl Litep2p {
protocol,
config.fallback_names.clone(),
config.codec,
config.timeout,
);
litep2p_config.executor.run(Box::pin(async move {
RequestResponseProtocol::new(service, config).run().await
Expand All @@ -193,8 +195,12 @@ impl Litep2p {
for (protocol_name, protocol) in litep2p_config.user_protocols.into_iter() {
tracing::debug!(target: LOG_TARGET, protocol = ?protocol_name, "enable user protocol");

let service =
transport_manager.register_protocol(protocol_name, Vec::new(), protocol.codec());
let service = transport_manager.register_protocol(
protocol_name,
Vec::new(),
protocol.codec(),
Duration::from_secs(5),
);
litep2p_config.executor.run(Box::pin(async move {
let _ = protocol.run(service).await;
}));
Expand All @@ -212,6 +218,7 @@ impl Litep2p {
ping_config.protocol.clone(),
Vec::new(),
ping_config.codec,
Duration::from_secs(5),
);
litep2p_config.executor.run(Box::pin(async move {
Ping::new(service, ping_config).run().await
Expand All @@ -234,6 +241,7 @@ impl Litep2p {
main_protocol.clone(),
fallback_names,
kademlia_config.codec,
Duration::from_secs(5),
);
litep2p_config.executor.run(Box::pin(async move {
let _ = Kademlia::new(service, kademlia_config).run().await;
Expand All @@ -254,6 +262,7 @@ impl Litep2p {
identify_config.protocol.clone(),
Vec::new(),
identify_config.codec,
Duration::from_secs(5),
);
identify_config.public = Some(litep2p_config.keypair.public().into());

Expand All @@ -273,6 +282,7 @@ impl Litep2p {
bitswap_config.protocol.clone(),
Vec::new(),
bitswap_config.codec,
Duration::from_secs(5),
);
litep2p_config.executor.run(Box::pin(async move {
Bitswap::new(service, bitswap_config).run().await
Expand Down
1 change: 1 addition & 0 deletions src/protocol/libp2p/kademlia/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,7 @@ mod tests {
Vec::new(),
Default::default(),
handle,
std::time::Duration::from_secs(5),
);
let (event_tx, event_rx) = channel(64);
let (_cmd_tx, cmd_rx) = channel(64);
Expand Down
1 change: 1 addition & 0 deletions src/protocol/notification/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ fn make_notification_protocol() -> (
Vec::new(),
std::sync::Arc::new(Default::default()),
handle,
std::time::Duration::from_secs(5),
);
let (config, handle) = NotificationConfig::new(
ProtocolName::from("/notif/1"),
Expand Down
1 change: 1 addition & 0 deletions src/protocol/request_response/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ fn protocol() -> (
Vec::new(),
std::sync::Arc::new(Default::default()),
handle,
std::time::Duration::from_secs(5),
);
let (config, handle) =
ConfigBuilder::new(ProtocolName::from("/req/1")).with_max_size(1024).build();
Expand Down
11 changes: 9 additions & 2 deletions src/protocol/transport_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,9 @@ pub struct TransportService {
/// Next substream ID.
next_substream_id: Arc<AtomicUsize>,

/// Close the connection if no substreams are open within this time frame.
keep_alive: Duration,

/// Pending keep-alive timeouts.
keep_alive_timeouts: FuturesUnordered<BoxFuture<'static, (PeerId, ConnectionId)>>,
}
Expand All @@ -134,6 +137,7 @@ impl TransportService {
fallback_names: Vec<ProtocolName>,
next_substream_id: Arc<AtomicUsize>,
transport_handle: TransportManagerHandle,
keep_alive: Duration,
) -> (Self, Sender<InnerTransportEvent>) {
let (tx, rx) = channel(DEFAULT_CHANNEL_SIZE);

Expand All @@ -146,6 +150,7 @@ impl TransportService {
transport_handle,
next_substream_id,
connections: HashMap::new(),
keep_alive,
keep_alive_timeouts: FuturesUnordered::new(),
},
tx,
Expand All @@ -168,6 +173,7 @@ impl TransportService {
?connection_id,
"connection established",
);
let keep_alive = self.keep_alive;

match self.connections.get_mut(&peer) {
Some(context) => match context.secondary {
Expand All @@ -183,7 +189,7 @@ impl TransportService {
}
None => {
self.keep_alive_timeouts.push(Box::pin(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
tokio::time::sleep(keep_alive).await;
(peer, connection_id)
}));
context.secondary = Some(handle);
Expand All @@ -194,7 +200,7 @@ impl TransportService {
None => {
self.connections.insert(peer, ConnectionContext::new(handle));
self.keep_alive_timeouts.push(Box::pin(async move {
tokio::time::sleep(Duration::from_secs(5)).await;
tokio::time::sleep(keep_alive).await;
(peer, connection_id)
}));

Expand Down Expand Up @@ -439,6 +445,7 @@ mod tests {
Vec::new(),
Arc::new(AtomicUsize::new(0usize)),
handle,
std::time::Duration::from_secs(5),
);

(service, sender, cmd_rx)
Expand Down
9 changes: 9 additions & 0 deletions src/transport/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use std::{
Arc,
},
task::{Context, Poll},
time::Duration,
};

pub use handle::{TransportHandle, TransportManagerHandle};
Expand Down Expand Up @@ -315,6 +316,7 @@ impl TransportManager {
protocol: ProtocolName,
fallback_names: Vec<ProtocolName>,
codec: ProtocolCodec,
keep_alive: Duration,
) -> TransportService {
assert!(!self.protocol_names.contains(&protocol));

Expand All @@ -330,6 +332,7 @@ impl TransportManager {
fallback_names.clone(),
self.next_substream_id.clone(),
self.transport_manager_handle.clone(),
keep_alive,
);

self.protocols.insert(
Expand Down Expand Up @@ -1637,11 +1640,13 @@ mod tests {
ProtocolName::from("/notif/1"),
Vec::new(),
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
);
manager.register_protocol(
ProtocolName::from("/notif/1"),
Vec::new(),
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
);
}

Expand All @@ -1657,6 +1662,7 @@ mod tests {
ProtocolName::from("/notif/1"),
Vec::new(),
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
);
manager.register_protocol(
ProtocolName::from("/notif/2"),
Expand All @@ -1665,6 +1671,7 @@ mod tests {
ProtocolName::from("/notif/1"),
],
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
);
}

Expand All @@ -1683,6 +1690,7 @@ mod tests {
ProtocolName::from("/notif/1"),
],
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
);
manager.register_protocol(
ProtocolName::from("/notif/2"),
Expand All @@ -1691,6 +1699,7 @@ mod tests {
ProtocolName::from("/notif/1/new"),
],
ProtocolCodec::UnsignedVarint(None),
Duration::from_secs(5),
);
}

Expand Down

0 comments on commit 08d5bfb

Please sign in to comment.