From a845820522b0ae38f2a29db5dc9ce0a1eaf31750 Mon Sep 17 00:00:00 2001 From: Tim Zhang Date: Wed, 8 Jan 2025 22:12:55 +0800 Subject: [PATCH] sync-server: Fix infinite loop caused by accept error Also Add the method set_accept_retry_interval. Since poll is level-triggered, an uncorrected error can lead to an infinite loop, so we sleep for a while and wait for the error to be corrected. Fixes: #270 Signed-off-by: Tim Zhang --- src/sync/server.rs | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/sync/server.rs b/src/sync/server.rs index 00cfb95..be14140 100644 --- a/src/sync/server.rs +++ b/src/sync/server.rs @@ -17,6 +17,7 @@ #[cfg(unix)] use std::os::unix::io::{AsRawFd, FromRawFd, RawFd}; +use std::time::Duration; use protobuf::{CodedInputStream, Message}; use std::collections::HashMap; @@ -40,6 +41,7 @@ use crate::{MethodHandler, TtrpcContext}; const DEFAULT_WAIT_THREAD_COUNT_DEFAULT: usize = 3; const DEFAULT_WAIT_THREAD_COUNT_MIN: usize = 1; const DEFAULT_WAIT_THREAD_COUNT_MAX: usize = 5; +const DEFAULT_ACCEPT_RETRY_INTERVAL: Duration = Duration::from_secs(10); type MessageSender = Sender<(MessageHeader, Vec)>; type MessageReceiver = Receiver<(MessageHeader, Vec)>; @@ -57,6 +59,7 @@ pub struct Server { thread_count_default: usize, thread_count_min: usize, thread_count_max: usize, + accept_retry_interval: Duration, } struct Connection { @@ -244,6 +247,7 @@ impl Default for Server { thread_count_default: DEFAULT_WAIT_THREAD_COUNT_DEFAULT, thread_count_min: DEFAULT_WAIT_THREAD_COUNT_MIN, thread_count_max: DEFAULT_WAIT_THREAD_COUNT_MAX, + accept_retry_interval: DEFAULT_ACCEPT_RETRY_INTERVAL, } } } @@ -305,6 +309,11 @@ impl Server { self } + pub fn set_accept_retry_interval(mut self, interval: Duration) -> Server { + self.accept_retry_interval = interval; + self + } + pub fn start_listen(&mut self) -> Result<()> { let connections = self.connections.clone(); @@ -320,6 +329,7 @@ impl Server { let min = self.thread_count_min; let max = self.thread_count_max; let listener_quit_flag = self.listener_quit_flag.clone(); + let accept_retry_interval = self.accept_retry_interval.clone(); let reaper_tx = match self.reaper.take() { None => { @@ -373,6 +383,9 @@ impl Server { } Err(e) => { error!("listener accept got {:?}", e); + // Since poll is level-triggered, an uncorrected error can lead to an infinite loop, + // so we sleep for a while and wait for the error to be corrected. + thread::sleep(accept_retry_interval); continue; } };