From c351f0ba7fe328fb46a298b89e1c0499a32cbf2e Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Wed, 8 Jan 2025 22:12:55 +0800 Subject: [PATCH] sync-server: Fix infinite loop caused by accept error Also Add the method set_accept_retry_interval. Since poll is level-triggered, an uncorrected error can lead to an infinite loop, so we sleep for a while and wait for the error to be corrected. Fixes: #270 Signed-off-by: Tim Zhang --- src/sync/server.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/src/sync/server.rs b/src/sync/server.rs index 00cfb95b..d19b0558 100644 --- a/src/sync/server.rs +++ b/src/sync/server.rs @@ -17,6 +17,7 @@ #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use std::time::Duration; use protobuf::{CodedInputStream, Message}; use std::collections::HashMap; @@ -40,6 +41,7 @@ use crate::{MethodHandler, TtrpcContext}; const DEFAULT_WAIT_THREAD_COUNT_DEFAULT: usize = 3; const DEFAULT_WAIT_THREAD_COUNT_MIN: usize = 1; const DEFAULT_WAIT_THREAD_COUNT_MAX: usize = 5; +const DEFAULT_ACCEPT_RETRY_INTERVAL: Duration = Duration::from_secs(10); type MessageSender = Sender<(MessageHeader, Vec)>; type MessageReceiver = Receiver<(MessageHeader, Vec)>; @@ -57,6 +59,7 @@ pub struct Server { thread_count_default: usize, thread_count_min: usize, thread_count_max: usize, + accept_retry_interval: Duration, } struct Connection { @@ -244,6 +247,7 @@ impl Default for Server { thread_count_default: DEFAULT_WAIT_THREAD_COUNT_DEFAULT, thread_count_min: DEFAULT_WAIT_THREAD_COUNT_MIN, thread_count_max: DEFAULT_WAIT_THREAD_COUNT_MAX, + accept_retry_interval: DEFAULT_ACCEPT_RETRY_INTERVAL, } } } @@ -305,6 +309,11 @@ impl Server { self } + pub fn set_accept_retry_interval(mut self, interval: Duration) -> Server { + self.accept_retry_interval = interval; + self + } + pub fn start_listen(&mut self) -> Result<()> { let connections = self.connections.clone(); @@ -320,6 +329,7 @@ impl Server { let min = self.thread_count_min; let max = self.thread_count_max; let listener_quit_flag = self.listener_quit_flag.clone(); + let accept_retry_interval = self.accept_retry_interval; let reaper_tx = match self.reaper.take() { None => { @@ -373,6 +383,14 @@ impl Server { } Err(e) => { error!("listener accept got {:?}", e); + + // Resource limit errors can't be recoverd in short time + // and the poll(2) is level-triggered, an uncorrected error can lead to an infinite loop, + // so we sleep for a while and wait for the error to be corrected. + if is_resource_limit_error(e) { + thread::sleep(accept_retry_interval); + } + continue; } }; @@ -597,3 +615,11 @@ fn quit_connection(quit: Arc, control_tx: SyncSender<()>) { .send(()) .unwrap_or_else(|err| debug!("Failed to send {:?}", err)); } + +fn is_resource_limit_error(e: std::io::Error) -> bool { + if let Some(err) = e.raw_os_error() { + return [libc::EMFILE, libc::ENFILE, libc::ENOBUFS, libc::ENOMEM].contains(&err); + } + + false +}