From 458e89430a78181dbcf98b15dfd54113b70553d7 Mon Sep 17 00:00:00 2001 From: jokemanfire Date: Sun, 28 Apr 2024 18:06:54 +0800 Subject: [PATCH] Ensure the fd will be release completely Signed-off-by: jokemanfire --- src/sync/client.rs | 34 +++++++++++++++++++--------------- src/sync/sys/unix/net.rs | 5 +++-- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/src/sync/client.rs b/src/sync/client.rs index dfeb15f..a14f95a 100644 --- a/src/sync/client.rs +++ b/src/sync/client.rs @@ -62,7 +62,7 @@ impl Client { fn new_client(pipe_client: ClientConnection) -> Result { let client = Arc::new(pipe_client); - + let weak_client = Arc::downgrade(&client); let (sender_tx, rx): (Sender, Receiver) = mpsc::channel(); let recver_map_orig = Arc::new(Mutex::new(HashMap::new())); @@ -98,20 +98,26 @@ impl Client { trace!("Sender quit"); }); - //Recver + //Reciver let receiver_connection = connection; - let receiver_client = client.clone(); + //Clone as weak Arc , it will not add the count of client + let receiver_client = weak_client.clone(); thread::spawn(move || { loop { - match receiver_client.ready() { - Ok(None) => { - continue; - } - Ok(_) => {} - Err(e) => { - error!("pipeConnection ready error {:?}", e); - break; + + if let Some(receiver_client) = receiver_client.upgrade(){ + match receiver_client.ready() { + Ok(None) => { + continue; + } + Ok(_) => {} + Err(e) => { + error!("pipeConnection ready error {:?}", e); + break; + } } + }else{ + break; } match read_message(&receiver_connection) { @@ -140,10 +146,6 @@ impl Client { }; } - let _ = receiver_client - .close_receiver() - .map_err(|e| warn!("failed to close with error: {:?}", e)); - trace!("Receiver quit"); }); @@ -191,7 +193,9 @@ impl Client { impl Drop for ClientConnection { fn drop(&mut self) { + //close all fd , make sure all fd have been release self.close().unwrap(); + self.close_receiver().unwrap(); trace!("Client is dropped"); } } diff --git a/src/sync/sys/unix/net.rs b/src/sync/sys/unix/net.rs index 3fdf47b..42b4078 100644 --- a/src/sync/sys/unix/net.rs +++ b/src/sync/sys/unix/net.rs @@ -28,6 +28,7 @@ use crate::common::{self, client_connect, SOCK_CLOEXEC}; use crate::common::set_fd_close_exec; use nix::sys::socket::{self}; +const POLL_MAX_TIME: i32 = 10; pub struct PipeListener { fd: RawFd, monitor_fd: (RawFd, RawFd), @@ -104,7 +105,7 @@ impl PipeListener { libc::poll( pollers as *mut _ as *mut libc::pollfd, pollers.len() as _, - -1, + POLL_MAX_TIME, ) }; @@ -278,7 +279,7 @@ impl ClientConnection { libc::poll( pollers as *mut _ as *mut libc::pollfd, pollers.len() as _, - -1, + POLL_MAX_TIME, ) };