Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(comms/messaging): fix possible deadlock in outbound pipeline #4657

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 2 additions & 5 deletions base_layer/p2p/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ pub struct P2pConfig {
/// The maximum number of concurrent outbound tasks allowed before back-pressure is applied to outbound messaging
/// queue
pub max_concurrent_outbound_tasks: usize,
/// The size of the buffer (channel) which holds pending outbound message requests
pub outbound_buffer_size: usize,
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
/// Configuration for DHT
pub dht: DhtConfig,
/// Set to true to allow peers to provide test addresses (loopback, memory etc.). If set to false, memory
Expand Down Expand Up @@ -131,9 +129,8 @@ impl Default for P2pConfig {
transport: Default::default(),
datastore_path: PathBuf::from("peer_db"),
peer_database_name: "peers".to_string(),
max_concurrent_inbound_tasks: 50,
max_concurrent_outbound_tasks: 100,
outbound_buffer_size: 100,
max_concurrent_inbound_tasks: 4,
max_concurrent_outbound_tasks: 4,
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
dht: DhtConfig {
database_url: DbConnectionUrl::file("dht.sqlite"),
..Default::default()
Expand Down
4 changes: 1 addition & 3 deletions base_layer/p2p/src/initialization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ pub async fn initialize_local_test_comms<P: AsRef<Path>>(
let dht_outbound_layer = dht.outbound_middleware_layer();
let (event_sender, _) = broadcast::channel(100);
let pipeline = pipeline::Builder::new()
.outbound_buffer_size(10)
.with_outbound_pipeline(outbound_rx, |sink| {
ServiceBuilder::new().layer(dht_outbound_layer).service(sink)
})
Expand Down Expand Up @@ -333,7 +332,7 @@ async fn configure_comms_and_dht(
let node_identity = comms.node_identity();
let shutdown_signal = comms.shutdown_signal();
// Create outbound channel
let (outbound_tx, outbound_rx) = mpsc::channel(config.outbound_buffer_size);
let (outbound_tx, outbound_rx) = mpsc::channel(config.dht.outbound_buffer_size);

let mut dht = Dht::builder();
dht.with_config(config.dht.clone()).with_outbound_sender(outbound_tx);
Expand All @@ -350,7 +349,6 @@ async fn configure_comms_and_dht(

// Hook up DHT messaging middlewares
let messaging_pipeline = pipeline::Builder::new()
.outbound_buffer_size(config.outbound_buffer_size)
.with_outbound_pipeline(outbound_rx, |sink| {
ServiceBuilder::new().layer(dht_outbound_layer).service(sink)
})
Expand Down
1 change: 0 additions & 1 deletion base_layer/wallet/tests/contacts_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ pub fn setup_contacts_service<T: ContactsBackend + 'static>(
peer_database_name: random::string(8),
max_concurrent_inbound_tasks: 10,
max_concurrent_outbound_tasks: 10,
outbound_buffer_size: 100,
dht: DhtConfig {
discovery_request_timeout: Duration::from_secs(1),
auto_join: true,
Expand Down
2 changes: 0 additions & 2 deletions base_layer/wallet/tests/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ async fn create_wallet(
peer_database_name: random::string(8),
max_concurrent_inbound_tasks: 10,
max_concurrent_outbound_tasks: 10,
outbound_buffer_size: 100,
dht: DhtConfig {
discovery_request_timeout: Duration::from_secs(1),
auto_join: true,
Expand Down Expand Up @@ -672,7 +671,6 @@ async fn test_import_utxo() {
peer_database_name: random::string(8),
max_concurrent_inbound_tasks: 10,
max_concurrent_outbound_tasks: 10,
outbound_buffer_size: 10,
dht: Default::default(),
allow_test_addresses: true,
listener_liveness_allowlist_cidrs: StringList::new(),
Expand Down
1 change: 0 additions & 1 deletion base_layer/wallet_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3899,7 +3899,6 @@ pub unsafe extern "C" fn comms_config_create(
peer_database_name: database_name_string,
max_concurrent_inbound_tasks: 25,
max_concurrent_outbound_tasks: 50,
outbound_buffer_size: 50,
dht: DhtConfig {
discovery_request_timeout: Duration::from_secs(discovery_timeout_in_secs),
database_url: DbConnectionUrl::File(dht_database_path),
Expand Down
7 changes: 2 additions & 5 deletions common/config/presets/c_base_node.toml
Original file line number Diff line number Diff line change
Expand Up @@ -157,13 +157,10 @@ track_reorgs = true
#peer_database_name = "peers"

# The maximum number of concurrent Inbound tasks allowed before back-pressure is applied to peers
#max_concurrent_inbound_tasks = 50
#max_concurrent_inbound_tasks = 4

# The maximum number of concurrent outbound tasks allowed before back-pressure is applied to outbound messaging queue
#max_concurrent_outbound_tasks = 100

# The size of the buffer (channel) which holds pending outbound message requests
#outbound_buffer_size = 100
#max_concurrent_outbound_tasks = 4

# Set to true to allow peers to provide test addresses (loopback, memory etc.). If set to false, memory
# addresses, loopback, local-link (i.e addresses used in local tests) will not be accepted from peers. This
Expand Down
7 changes: 2 additions & 5 deletions common/config/presets/d_console_wallet.toml
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,10 @@ event_channel_size = 3500
#peer_database_name = "peers"

# The maximum number of concurrent Inbound tasks allowed before back-pressure is applied to peers
#max_concurrent_inbound_tasks = 50
#max_concurrent_inbound_tasks = 4

# The maximum number of concurrent outbound tasks allowed before back-pressure is applied to outbound messaging queue
#max_concurrent_outbound_tasks = 100

# The size of the buffer (channel) which holds pending outbound message requests
#outbound_buffer_size = 100
#max_concurrent_outbound_tasks = 4

# Set to true to allow peers to provide test addresses (loopback, memory etc.). If set to false, memory
# addresses, loopback, local-link (i.e addresses used in local tests) will not be accepted from peers. This
Expand Down
16 changes: 3 additions & 13 deletions comms/core/src/pipeline/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,14 @@ use crate::{
};

const DEFAULT_MAX_CONCURRENT_TASKS: usize = 50;
const DEFAULT_OUTBOUND_BUFFER_SIZE: usize = 50;

type OutboundMessageSinkService = SinkService<mpsc::Sender<OutboundMessage>>;
type OutboundMessageSinkService = SinkService<mpsc::UnboundedSender<OutboundMessage>>;

/// Message pipeline builder
#[derive(Default)]
pub struct Builder<TInSvc, TOutSvc, TOutReq> {
max_concurrent_inbound_tasks: usize,
max_concurrent_outbound_tasks: Option<usize>,
outbound_buffer_size: usize,
inbound: Option<TInSvc>,
outbound_rx: Option<mpsc::Receiver<TOutReq>>,
outbound_pipeline_factory: Option<Box<dyn FnOnce(OutboundMessageSinkService) -> TOutSvc>>,
Expand All @@ -50,7 +48,6 @@ impl Builder<(), (), ()> {
Self {
max_concurrent_inbound_tasks: DEFAULT_MAX_CONCURRENT_TASKS,
max_concurrent_outbound_tasks: None,
outbound_buffer_size: DEFAULT_OUTBOUND_BUFFER_SIZE,
inbound: None,
outbound_rx: None,
outbound_pipeline_factory: None,
Expand All @@ -69,11 +66,6 @@ impl<TInSvc, TOutSvc, TOutReq> Builder<TInSvc, TOutSvc, TOutReq> {
self
}

pub fn outbound_buffer_size(mut self, buf_size: usize) -> Self {
self.outbound_buffer_size = buf_size;
self
}

pub fn with_outbound_pipeline<F, S, R>(self, receiver: mpsc::Receiver<R>, factory: F) -> Builder<TInSvc, S, R>
where
// Factory function takes in a SinkService and returns a new composed service
Expand All @@ -87,7 +79,6 @@ impl<TInSvc, TOutSvc, TOutReq> Builder<TInSvc, TOutSvc, TOutReq> {
max_concurrent_inbound_tasks: self.max_concurrent_inbound_tasks,
max_concurrent_outbound_tasks: self.max_concurrent_outbound_tasks,
inbound: self.inbound,
outbound_buffer_size: self.outbound_buffer_size,
}
}

Expand All @@ -100,7 +91,6 @@ impl<TInSvc, TOutSvc, TOutReq> Builder<TInSvc, TOutSvc, TOutReq> {
max_concurrent_outbound_tasks: self.max_concurrent_outbound_tasks,
outbound_rx: self.outbound_rx,
outbound_pipeline_factory: self.outbound_pipeline_factory,
outbound_buffer_size: self.outbound_buffer_size,
}
}
}
Expand All @@ -111,7 +101,7 @@ where
TInSvc: Service<InboundMessage> + Clone + Send + 'static,
{
fn build_outbound(&mut self) -> Result<OutboundPipelineConfig<TOutReq, TOutSvc>, PipelineBuilderError> {
let (out_sender, out_receiver) = mpsc::channel(self.outbound_buffer_size);
let (out_sender, out_receiver) = mpsc::unbounded_channel();

let in_receiver = self
.outbound_rx
Expand Down Expand Up @@ -157,7 +147,7 @@ pub struct OutboundPipelineConfig<TInItem, TPipeline> {
/// Messages read from this stream are passed to the pipeline
pub in_receiver: mpsc::Receiver<TInItem>,
/// Receiver of `OutboundMessage`s coming from the pipeline
pub out_receiver: mpsc::Receiver<OutboundMessage>,
pub out_receiver: mpsc::UnboundedReceiver<OutboundMessage>,
/// The pipeline (`tower::Service`) to run for each in_stream message
pub pipeline: TPipeline,
}
Expand Down
22 changes: 18 additions & 4 deletions comms/core/src/pipeline/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fmt::Display, time::Instant};
use std::{
fmt::Display,
time::{Duration, Instant},
};

use futures::future::FusedFuture;
use log::*;
use tari_shutdown::ShutdownSignal;
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time};
use tower::{Service, ServiceExt};

use crate::bounded_executor::BoundedExecutor;
Expand Down Expand Up @@ -103,8 +106,19 @@ where
.spawn(async move {
let timer = Instant::now();
trace!(target: LOG_TARGET, "Start inbound pipeline {}", id);
if let Err(err) = service.oneshot(item).await {
warn!(target: LOG_TARGET, "Inbound pipeline returned an error: '{}'", err);
match time::timeout(Duration::from_secs(30), service.oneshot(item)).await {
Ok(Ok(_)) => {},
Ok(Err(err)) => {
warn!(target: LOG_TARGET, "Inbound pipeline returned an error: '{}'", err);
},
Err(_) => {
error!(
target: LOG_TARGET,
"Inbound pipeline {} timed out and was aborted. THIS SHOULD NOT HAPPEN: there was a \
deadlock or excessive delay in processing this pipeline.",
id
);
},
}
trace!(
target: LOG_TARGET,
Expand Down
31 changes: 22 additions & 9 deletions comms/core/src/pipeline/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{fmt::Display, time::Instant};
use std::{
fmt::Display,
time::{Duration, Instant},
};

use futures::future::Either;
use log::*;
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time};
use tower::{Service, ServiceExt};

use crate::{
Expand Down Expand Up @@ -93,16 +96,26 @@ where
let pipeline = self.config.pipeline.clone();
let id = current_id;
current_id = (current_id + 1) % u64::MAX;

self.executor
.spawn(async move {
let timer = Instant::now();
trace!(target: LOG_TARGET, "Start outbound pipeline {}", id);
if let Err(err) = pipeline.oneshot(msg).await {
error!(
target: LOG_TARGET,
"Outbound pipeline {} returned an error: '{}'", id, err
);
match time::timeout(Duration::from_secs(30), pipeline.oneshot(msg)).await {
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
Ok(Ok(_)) => {},
Ok(Err(err)) => {
error!(
target: LOG_TARGET,
"Outbound pipeline {} returned an error: '{}'", id, err
);
},
Err(_) => {
error!(
target: LOG_TARGET,
"Outbound pipeline {} timed out and was aborted. THIS SHOULD NOT HAPPEN: \
there was a deadlock or excessive delay in processing this pipeline.",
id
);
},
}

trace!(
Expand Down Expand Up @@ -174,7 +187,7 @@ mod test {
)
.await
.unwrap();
let (out_tx, out_rx) = mpsc::channel(NUM_ITEMS);
let (out_tx, out_rx) = mpsc::unbounded_channel();
let (msg_tx, mut msg_rx) = mpsc::channel(NUM_ITEMS);
let executor = Handle::current();

Expand Down
21 changes: 20 additions & 1 deletion comms/core/src/pipeline/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::task::Poll;
use std::{future, task::Poll};

use futures::{future::BoxFuture, task::Context, FutureExt};
use tower::Service;
Expand Down Expand Up @@ -59,3 +59,22 @@ where T: Send + 'static
.boxed()
}
}
impl<T> Service<T> for SinkService<tokio::sync::mpsc::UnboundedSender<T>>
where T: Send + 'static
{
type Error = PipelineError;
type Future = future::Ready<Result<Self::Response, Self::Error>>;
type Response = ();

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

fn call(&mut self, item: T) -> Self::Future {
let sink = self.0.clone();
let result = sink
.send(item)
.map_err(|_| anyhow::anyhow!("sink closed in sink service"));
future::ready(result)
}
}
6 changes: 3 additions & 3 deletions comms/core/src/protocol/messaging/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ use crate::{
runtime::task,
};

/// Buffer size for inbound messages from _all_ peers. This should be large enough to buffer quite a few incoming
/// messages before creating backpressure on peers speaking the messaging protocol.
pub const INBOUND_MESSAGE_BUFFER_SIZE: usize = 100;
/// Buffer size for inbound messages from _all_ peers. Is the message consumer is slow to get through this queue,
/// sending peers will start to experience backpressure (this is a good thing).
pub const INBOUND_MESSAGE_BUFFER_SIZE: usize = 10;
sdbondi marked this conversation as resolved.
Show resolved Hide resolved
/// Buffer size notifications that a peer wants to speak /tari/messaging. This buffer is used for all peers, but a low
/// value is ok because this events happen once (or less) per connecting peer. For e.g. a value of 10 would allow 10
/// peers to concurrently request to speak /tari/messaging.
Expand Down
Loading