Skip to content

Commit

Permalink
Don't panic if io-uring thread fails to initialise
Browse files Browse the repository at this point in the history
  • Loading branch information
XAMPPRocky committed Nov 8, 2023
1 parent ca2443d commit 6b53364
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 10 deletions.
15 changes: 9 additions & 6 deletions src/cli/proxy/sessions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,10 @@ use crate::{
pub(crate) mod metrics;

pub type SessionMap = crate::collections::ttl::TtlMap<SessionKey, Session>;
type UpstreamSender = mpsc::UnboundedSender<(Vec<u8>, Option<IpNetEntry>, SocketAddr)>;
type DownstreamSender = async_channel::Sender<(Vec<u8>, Option<IpNetEntry>, SocketAddr)>;
pub type DownstreamReceiver = async_channel::Receiver<(Vec<u8>, Option<IpNetEntry>, SocketAddr)>;
type ChannelData = (Vec<u8>, Option<IpNetEntry>, SocketAddr);
type UpstreamSender = mpsc::UnboundedSender<ChannelData>;
type DownstreamSender = async_channel::Sender<ChannelData>;
pub type DownstreamReceiver = async_channel::Receiver<ChannelData>;

/// A data structure that is responsible for holding sessions, and pooling
/// sockets between them. This means that we only provide new unique sockets
Expand Down Expand Up @@ -100,12 +101,11 @@ impl SessionPool {
.ok_or_else(|| eyre::eyre!("couldn't get socket address from raw socket"))
.map_err(super::PipelineError::Session)?
.port();
let (tx, mut downstream_receiver) = mpsc::unbounded_channel();
self.ports_to_sockets.write().await.insert(port, tx.clone());
let (tx, mut downstream_receiver) = mpsc::unbounded_channel::<ChannelData>();

let pool = self.clone();

uring_spawn!(async move {
let initialised = uring_spawn!(async move {
let mut buf: Vec<u8> = vec![0; 65535];
let mut last_received_at = None;
let mut shutdown_rx = pool.shutdown_rx.clone();
Expand Down Expand Up @@ -183,6 +183,9 @@ impl SessionPool {
}
});

initialised.await.map_err(|error| eyre::eyre!(error))??;

self.ports_to_sockets.write().await.insert(port, tx.clone());
self.create_session_from_existing_socket(key, tx, port, asn_info)
.await
}
Expand Down
21 changes: 17 additions & 4 deletions src/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,30 @@ impl DualStackEpollSocket {

/// On linux spawns a io-uring runtime + thread, everywhere else spawns a regular tokio task.
macro_rules! uring_spawn {
($future:expr) => {
($future:expr) => {{
let (tx, rx) = tokio::sync::oneshot::channel();
cfg_if::cfg_if! {
if #[cfg(target_os = "linux")] {
std::thread::spawn(move || {
tokio_uring::start($future);
match tokio_uring::Runtime::new(&tokio_uring::builder().entries(2048)) {
Ok(runtime) => {
let _ = tx.send(Ok(()));
runtime.block_on($future);
}
Err(error) => {
let _ = tx.send(Err(error));
}
};
});
} else {
tokio::spawn($future);
tokio::spawn(async move {
tx.send(Ok(()));
$future.await
});
}
}
};
rx
}};
}

/// On linux spawns a io-uring task, everywhere else spawns a regular tokio task.
Expand Down

0 comments on commit 6b53364

Please sign in to comment.