From 8315fbf0035d96ac922c66f5660eb9c8924b0292 Mon Sep 17 00:00:00 2001 From: Quanwei Zhou Date: Mon, 23 Sep 2024 14:54:34 +0800 Subject: [PATCH] server: fix server exit once a accept failed If the Accept error occurs, an error can be output to ensure that the subsequent connect can be accepted normally. Fixes: #239 Signed-off-by: Quanwei Zhou --- src/sync/server.rs | 8 ++++++-- src/sync/sys/unix/net.rs | 12 +----------- src/sync/sys/windows/net.rs | 17 ++++------------- 3 files changed, 11 insertions(+), 26 deletions(-) diff --git a/src/sync/server.rs b/src/sync/server.rs index b562996..00cfb95 100644 --- a/src/sync/server.rs +++ b/src/sync/server.rs @@ -358,7 +358,11 @@ impl Server { .spawn(move || { loop { trace!("listening..."); - let pipe_connection = match listener.accept(&listener_quit_flag) { + if listener_quit_flag.load(Ordering::SeqCst) { + info!("listener shutdown for quit flag"); + break; + } + let pipe_connection = match listener.accept() { Ok(None) => { continue; } @@ -369,7 +373,7 @@ impl Server { } Err(e) => { error!("listener accept got {:?}", e); - break; + continue; } }; diff --git a/src/sync/sys/unix/net.rs b/src/sync/sys/unix/net.rs index 11155d3..bf109ba 100644 --- a/src/sync/sys/unix/net.rs +++ b/src/sync/sys/unix/net.rs @@ -21,8 +21,6 @@ use std::os::unix::prelude::AsRawFd; use nix::Error; use nix::unistd::*; -use std::sync::{Arc}; -use std::sync::atomic::{AtomicBool, Ordering}; use crate::common::{self, client_connect, SOCK_CLOEXEC}; #[cfg(target_os = "macos")] use crate::common::set_fd_close_exec; @@ -84,11 +82,7 @@ impl PipeListener { // - Ok(Some(PipeConnection)) if a new connection is established // - Ok(None) if spurious wake up with no new connection // - Err(io::Error) if there is an error and listener loop should be shutdown - pub(crate) fn accept( &self, quit_flag: &Arc) -> std::result::Result, io::Error> { - if quit_flag.load(Ordering::SeqCst) { - return Err(io::Error::new(io::ErrorKind::Other, "listener shutdown for quit flag")); - } - + pub(crate) fn accept(&self) -> std::result::Result, io::Error> { let mut pollers = vec![ libc::pollfd { fd: self.monitor_fd.0, @@ -127,10 +121,6 @@ impl PipeListener { return Ok(None); } - if quit_flag.load(Ordering::SeqCst) { - return Err(io::Error::new(io::ErrorKind::Other, "listener shutdown for quit flag")); - } - #[cfg(any(target_os = "linux", target_os = "android"))] let fd = match accept4(self.fd, SockFlag::SOCK_CLOEXEC) { Ok(fd) => fd, diff --git a/src/sync/sys/windows/net.rs b/src/sync/sys/windows/net.rs index 87fcd00..cbbecb4 100644 --- a/src/sync/sys/windows/net.rs +++ b/src/sync/sys/windows/net.rs @@ -23,7 +23,6 @@ use std::os::windows::ffi::OsStrExt; use std::os::windows::fs::OpenOptionsExt; use std::os::windows::io::{IntoRawHandle}; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::{Arc}; use std::{io}; use windows_sys::Win32::Foundation::{ CloseHandle, ERROR_IO_PENDING, ERROR_PIPE_CONNECTED, INVALID_HANDLE_VALUE }; @@ -75,14 +74,7 @@ impl PipeListener { // accept returns: // - Ok(Some(PipeConnection)) if a new connection is established // - Err(io::Error) if there is an error and listener loop should be shutdown - pub(crate) fn accept(&self, quit_flag: &Arc) -> std::result::Result, io::Error> { - if quit_flag.load(Ordering::SeqCst) { - return Err(io::Error::new( - io::ErrorKind::Other, - "listener shutdown for quit flag", - )); - } - + pub(crate) fn accept(&self) -> std::result::Result, io::Error> { // Create a new pipe instance for every new client let instance = self.new_instance()?; let np = match PipeConnection::new(instance) { @@ -376,6 +368,7 @@ fn handle_windows_error(e: io::Error) -> Error { #[cfg(test)] mod test { use super::*; + use std::sync::Arc; use windows_sys::Win32::Foundation::ERROR_FILE_NOT_FOUND; #[test] @@ -398,8 +391,7 @@ mod test { let listener_server = listener.clone(); let thread = std::thread::spawn(move || { - let quit_flag = Arc::new(AtomicBool::new(false)); - match listener_server.accept(&quit_flag) { + match listener_server.accept() { Ok(Some(_)) => { // pipe is working } @@ -422,8 +414,7 @@ mod test { let listener_server = listener.clone(); let thread = std::thread::spawn(move || { - let quit_flag = Arc::new(AtomicBool::new(false)); - match listener_server.accept(&quit_flag) { + match listener_server.accept() { Ok(_) => { panic!("should not get pipe on close") }