Skip to content

Commit

Permalink
sync-server: Fix infinite loop caused by accept error
Browse files Browse the repository at this point in the history
Also Add the method set_accept_retry_interval.

Since poll is level-triggered, an uncorrected error can lead to an infinite loop,
so we sleep for a while and wait for the error to be corrected.

Fixes: #270

Signed-off-by: Tim Zhang <tim@hyper.sh>
  • Loading branch information
Tim-Zhang committed Jan 9, 2025
1 parent 4a493e1 commit a845820
Showing 1 changed file with 13 additions and 0 deletions.
13 changes: 13 additions & 0 deletions src/sync/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#[cfg(unix)]
use std::os::unix::io::{AsRawFd, FromRawFd, RawFd};
use std::time::Duration;

use protobuf::{CodedInputStream, Message};
use std::collections::HashMap;
Expand All @@ -40,6 +41,7 @@ use crate::{MethodHandler, TtrpcContext};
const DEFAULT_WAIT_THREAD_COUNT_DEFAULT: usize = 3;
const DEFAULT_WAIT_THREAD_COUNT_MIN: usize = 1;
const DEFAULT_WAIT_THREAD_COUNT_MAX: usize = 5;
const DEFAULT_ACCEPT_RETRY_INTERVAL: Duration = Duration::from_secs(10);

type MessageSender = Sender<(MessageHeader, Vec<u8>)>;
type MessageReceiver = Receiver<(MessageHeader, Vec<u8>)>;
Expand All @@ -57,6 +59,7 @@ pub struct Server {
thread_count_default: usize,
thread_count_min: usize,
thread_count_max: usize,
accept_retry_interval: Duration,
}

struct Connection {
Expand Down Expand Up @@ -244,6 +247,7 @@ impl Default for Server {
thread_count_default: DEFAULT_WAIT_THREAD_COUNT_DEFAULT,
thread_count_min: DEFAULT_WAIT_THREAD_COUNT_MIN,
thread_count_max: DEFAULT_WAIT_THREAD_COUNT_MAX,
accept_retry_interval: DEFAULT_ACCEPT_RETRY_INTERVAL,
}
}
}
Expand Down Expand Up @@ -305,6 +309,11 @@ impl Server {
self
}

pub fn set_accept_retry_interval(mut self, interval: Duration) -> Server {
self.accept_retry_interval = interval;
self
}

pub fn start_listen(&mut self) -> Result<()> {
let connections = self.connections.clone();

Expand All @@ -320,6 +329,7 @@ impl Server {
let min = self.thread_count_min;
let max = self.thread_count_max;
let listener_quit_flag = self.listener_quit_flag.clone();
let accept_retry_interval = self.accept_retry_interval.clone();

let reaper_tx = match self.reaper.take() {
None => {
Expand Down Expand Up @@ -373,6 +383,9 @@ impl Server {
}
Err(e) => {
error!("listener accept got {:?}", e);
// Since poll is level-triggered, an uncorrected error can lead to an infinite loop,
// so we sleep for a while and wait for the error to be corrected.
thread::sleep(accept_retry_interval);
continue;
}
};
Expand Down

0 comments on commit a845820

Please sign in to comment.