Skip to content

Commit

Permalink
Create listeners in each tokio runtime. This maybe fixed one runtime …
Browse files Browse the repository at this point in the history
…handling all the interrupts, which is terrible for performance.
  • Loading branch information
Icelk committed Nov 19, 2023
1 parent b1eabf8 commit 1cef1c7
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 71 deletions.
158 changes: 95 additions & 63 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ impl RunConfig {
/// shutdown_manager.wait().await;
/// # };
/// ```
#[allow(clippy::type_complexity)]
pub async fn execute(self) -> Arc<shutdown::Manager> {
#[cfg(feature = "async-networking")]
use socket2::{Domain, Protocol, Type};
Expand All @@ -223,7 +224,8 @@ impl RunConfig {
info!("Starting server on {} ports.", ports.len());

let len = ports.len();
let mut shutdown_manager = shutdown::Manager::new(len);
// * 8 for some buffer, since unsafe writes could happen otherwise
let mut shutdown_manager = unsafe { shutdown::Manager::new(len * 8) };

#[cfg(feature = "handover")]
let ports_clone = Arc::new(ports.clone());
Expand All @@ -239,20 +241,31 @@ impl RunConfig {
let instances = 1;

// artefact from ↑. When not uring, the outer vec is 1 in length
let mut all_listeners: Vec<Vec<(AcceptManager, Arc<PortDescriptor>)>> =
Vec::with_capacity(instances);
let mut all_listeners: Vec<
Vec<(Box<dyn Fn() -> AcceptManager + Send + Sync>, Arc<PortDescriptor>)>,
> = Vec::with_capacity(instances);

let ports: Vec<_> = ports.into_iter().map(Arc::new).collect();

#[cfg(feature = "handover")]
let handover_path = ctl_path.unwrap_or_else(ctl::socket_path);

#[cfg(feature = "graceful-shutdown")]
{
shutdown_manager.handover_socket_path = Some(handover_path.clone());
}

let shutdown_manager = shutdown_manager.build();

for _ in 0..instances {
let mut listeners = Vec::new();
let mut listeners: Vec<(Box<dyn Fn() -> AcceptManager + Send + Sync>, _)> = Vec::new();
#[cfg(feature = "async-networking")]
for descriptor in &ports {
fn create_listener(
create_socket: impl Fn() -> socket2::Socket,
#[allow(unused_variables)] tcp: bool,
address: SocketAddr,
shutdown_manager: &mut shutdown::Manager,
shutdown_manager: &shutdown::Manager,
#[allow(unused_variables)] descriptor: &PortDescriptor,
) -> AcceptManager {
let socket = create_socket();
Expand Down Expand Up @@ -322,64 +335,90 @@ impl RunConfig {
}

if matches!(descriptor.version, BindIpVersion::V4 | BindIpVersion::Both) {
let listener = create_listener(
|| {
socket2::Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))
let mgr = shutdown_manager.clone();
let d = descriptor.clone();
let listener = move || {
create_listener(
|| {
socket2::Socket::new(
Domain::IPV4,
Type::STREAM,
Some(Protocol::TCP),
)
.expect("Failed to create a new IPv4 socket configuration")
},
true,
SocketAddr::new(IpAddr::V4(net::Ipv4Addr::UNSPECIFIED), descriptor.port),
&mut shutdown_manager,
descriptor,
);
listeners.push((listener, Arc::clone(descriptor)));
},
true,
SocketAddr::new(IpAddr::V4(net::Ipv4Addr::UNSPECIFIED), d.port),
&mgr,
&d,
)
};
listeners.push((Box::new(listener), Arc::clone(descriptor)));
}
if matches!(descriptor.version, BindIpVersion::V6 | BindIpVersion::Both) {
let listener = create_listener(
|| {
socket2::Socket::new(Domain::IPV6, Type::STREAM, Some(Protocol::TCP))
let mgr = shutdown_manager.clone();
let d = descriptor.clone();
let listener = move || {
create_listener(
|| {
socket2::Socket::new(
Domain::IPV6,
Type::STREAM,
Some(Protocol::TCP),
)
.expect("Failed to create a new IPv6 socket configuration")
},
true,
SocketAddr::new(IpAddr::V6(net::Ipv6Addr::UNSPECIFIED), descriptor.port),
&mut shutdown_manager,
descriptor,
);
listeners.push((listener, Arc::clone(descriptor)));
},
true,
SocketAddr::new(IpAddr::V6(net::Ipv6Addr::UNSPECIFIED), d.port),
&mgr,
&d,
)
};
listeners.push((Box::new(listener), Arc::clone(descriptor)));
}
#[cfg(feature = "http3")]
if descriptor.server_config.is_some() {
if matches!(descriptor.version, BindIpVersion::V4 | BindIpVersion::Both) {
let listener = create_listener(
|| {
socket2::Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
let mgr = shutdown_manager.clone();
let d = descriptor.clone();
let listener = move || {
create_listener(
|| {
socket2::Socket::new(
Domain::IPV4,
Type::DGRAM,
Some(Protocol::UDP),
)
.expect("Failed to create a new IPv4 socket configuration")
},
false,
SocketAddr::new(
IpAddr::V4(net::Ipv4Addr::UNSPECIFIED),
descriptor.port,
),
&mut shutdown_manager,
descriptor,
);
listeners.push((listener, Arc::clone(descriptor)));
},
false,
SocketAddr::new(IpAddr::V4(net::Ipv4Addr::UNSPECIFIED), d.port),
&mgr,
&d,
)
};
listeners.push((Box::new(listener), Arc::clone(descriptor)));
}
if matches!(descriptor.version, BindIpVersion::V6 | BindIpVersion::Both) {
let listener = create_listener(
|| {
socket2::Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
let mgr = shutdown_manager.clone();
let d = descriptor.clone();
let listener = move || {
create_listener(
|| {
socket2::Socket::new(
Domain::IPV6,
Type::DGRAM,
Some(Protocol::UDP),
)
.expect("Failed to create a new IPv6 socket configuration")
},
false,
SocketAddr::new(
IpAddr::V6(net::Ipv6Addr::UNSPECIFIED),
descriptor.port,
),
&mut shutdown_manager,
descriptor,
);
listeners.push((listener, Arc::clone(descriptor)));
},
false,
SocketAddr::new(IpAddr::V6(net::Ipv6Addr::UNSPECIFIED), d.port),
&mgr,
&d,
)
};
listeners.push((Box::new(listener), Arc::clone(descriptor)));
}
}
}
Expand Down Expand Up @@ -411,16 +450,6 @@ impl RunConfig {
all_listeners.push(listeners);
}

#[cfg(feature = "handover")]
let handover_path = ctl_path.unwrap_or_else(ctl::socket_path);

#[cfg(feature = "graceful-shutdown")]
{
shutdown_manager.handover_socket_path = Some(handover_path.clone());
}

let shutdown_manager = shutdown_manager.build();

#[cfg(feature = "handover")]
if ctl {
// make sure we shut down before listening
Expand All @@ -446,7 +475,10 @@ impl RunConfig {
let shutdown_manager = Arc::clone(&shutdown_manager);
std::thread::spawn(move || {
tokio_uring::start(async move {
accept(listener, descriptor, &shutdown_manager, n == 0)
// `TODO`: seems like tokio-uring sometimes segfaults when exiting. Maybe
// this has to do with sockets being moved between runtimes?
// Is a socket registered on another runtime also slowing things down?
accept(listener(), descriptor, &shutdown_manager, n == 0)
.await
.expect("failed to accept message");
shutdown_manager.wait().await;
Expand All @@ -460,7 +492,7 @@ impl RunConfig {
for (listener, descriptor) in listeners {
let shutdown_manager = Arc::clone(&shutdown_manager);
let future = async move {
accept(listener, descriptor, &shutdown_manager, true)
accept(listener(), descriptor, &shutdown_manager, true)
.await
.expect("Failed to accept message!");
};
Expand Down
33 changes: 25 additions & 8 deletions src/shutdown.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct Manager {
received: AtomicBool,

#[cfg(feature = "graceful-shutdown")]
wakers: WakerList,
wakers: std::cell::UnsafeCell<std::sync::Mutex<WakerList>>,

#[cfg(feature = "graceful-shutdown")]
inititate_channel: (WatchSender<()>, WatchReceiver<()>),
Expand All @@ -92,9 +92,15 @@ pub struct Manager {
#[cfg(feature = "graceful-shutdown")]
pub(crate) handover_socket_path: Option<PathBuf>,
}
unsafe impl Send for Manager {}
unsafe impl Sync for Manager{}
impl Manager {
/// Creates a new shutdown manager with the capacity of the list of wakers set to `_capacity`.
pub fn new(_capacity: usize) -> Self {
///
/// # Safety
///
/// `_capacity >= number of add_listener calls`
pub unsafe fn new(_capacity: usize) -> Self {
#[cfg(feature = "graceful-shutdown")]
{
let channel = watch_channel(());
Expand All @@ -105,7 +111,9 @@ impl Manager {
connections: AtomicIsize::new(0),
received: AtomicBool::new(false),

wakers: WakerList::new(_capacity),
wakers: std::cell::UnsafeCell::new(std::sync::Mutex::new(WakerList::new(
_capacity,
))),

inititate_channel: watch_channel(()),
finished_channel: (Arc::new(channel.0), channel.1),
Expand All @@ -123,11 +131,12 @@ impl Manager {
/// Adds a listener to this manager.
///
/// This is used so the `accept` future resolves immediately when the shutdown is triggered.
pub(crate) fn add_listener(&mut self, listener: Listener) -> AcceptManager {
pub(crate) fn add_listener(&self, listener: Listener) -> AcceptManager {
AcceptManager {
#[cfg(feature = "graceful-shutdown")]
index: {
let wakers = self.wakers.get_mut();
let mut lock = unsafe { &*self.wakers.get() }.lock().unwrap();
let wakers = lock.get_mut();
let len = wakers.len();
wakers.push(None);
WakerIndex(len)
Expand Down Expand Up @@ -206,17 +215,25 @@ impl Manager {
/// it does not matter which comes first. Also, only one thread should write to this
/// with the same `index`; this is not a problem since only the Kvarn crate has access to this.
/// This is also upheld by [`WakerIndex`].
///
/// Also, the list never decreases in length, so the index will always be valid.
/// Unless it's extended when this is running. But since we initiate with the necessary
/// capacity (and not less), it **should** never expand.
#[cfg(all(feature = "graceful-shutdown", feature = "async-networking"))]
pub(crate) fn set_waker(&self, index: WakerIndex, waker: Waker) {
let wakers = unsafe { &mut *self.wakers.get() };
let inner = unsafe { &mut *self.wakers.get() };
let inner = inner.get_mut().unwrap();
let wakers = unsafe { &mut *inner.get() };
wakers[index.0] = Some(waker);
}
/// # Safety
///
/// See [`Self::set_waker`].
#[cfg(all(feature = "graceful-shutdown", feature = "async-networking"))]
pub(crate) fn remove_waker(&self, index: WakerIndex) {
let wakers = unsafe { &mut *self.wakers.get() };
let inner = unsafe { &mut *self.wakers.get() };
let inner = inner.get_mut().unwrap();
let wakers = unsafe { &mut *inner.get() };
wakers[index.0] = None;
}

Expand Down Expand Up @@ -264,7 +281,7 @@ impl Manager {

// we stop listening immediately
info!("Notifying wakers.");
self.wakers.notify();
unsafe { &*self.wakers.get() }.lock().unwrap().notify();
}
#[cfg(feature = "graceful-shutdown")]
fn _shutdown(&self) {
Expand Down

0 comments on commit 1cef1c7

Please sign in to comment.