Skip to content

Commit

Permalink
Ensure the fd will be release completely
Browse files Browse the repository at this point in the history
This problem is caused by the reference count not being able to return to 0 properly.

Signed-off-by: jokemanfire <hu.dingyang@zte.com.cn>
  • Loading branch information
jokemanfire committed Apr 29, 2024
1 parent 44b31f7 commit e269739
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
36 changes: 21 additions & 15 deletions src/sync/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Client {

fn new_client(pipe_client: ClientConnection) -> Result<Client> {
let client = Arc::new(pipe_client);

let weak_client = Arc::downgrade(&client);
let (sender_tx, rx): (Sender, Receiver) = mpsc::channel();
let recver_map_orig = Arc::new(Mutex::new(HashMap::new()));

Expand Down Expand Up @@ -98,20 +98,28 @@ impl Client {
trace!("Sender quit");
});

//Recver
//Reciver
let receiver_connection = connection;
let receiver_client = client.clone();
//this thread should use weak arc for ClientConnection, otherwise the thread will occupy a reference count of ClientConnection's arc,
//ClientConnection's drop will be not call until the thread finished. It means if all the external references are finished,
//this thread should be release.
let receiver_client = weak_client.clone();
thread::spawn(move || {
loop {
match receiver_client.ready() {
Ok(None) => {
continue;
}
Ok(_) => {}
Err(e) => {
error!("pipeConnection ready error {:?}", e);
break;
//The count of ClientConnection's Arc will be add one , and back to original value when this code ends.
if let Some(receiver_client) = receiver_client.upgrade(){
match receiver_client.ready() {
Ok(None) => {
continue;
}
Ok(_) => {}
Err(e) => {
error!("pipeConnection ready error {:?}", e);
break;
}
}
}else{
break;
}

match read_message(&receiver_connection) {
Expand Down Expand Up @@ -140,10 +148,6 @@ impl Client {
};
}

let _ = receiver_client
.close_receiver()
.map_err(|e| warn!("failed to close with error: {:?}", e));

trace!("Receiver quit");
});

Expand Down Expand Up @@ -191,7 +195,9 @@ impl Client {

impl Drop for ClientConnection {
fn drop(&mut self) {
//close all fd , make sure all fd have been release
self.close().unwrap();
self.close_receiver().unwrap();
trace!("Client is dropped");
}
}
Expand Down
7 changes: 5 additions & 2 deletions src/sync/sys/unix/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ use crate::common::{self, client_connect, SOCK_CLOEXEC};
use crate::common::set_fd_close_exec;
use nix::sys::socket::{self};

//The libc::poll's max wait time
const POLL_MAX_TIME: i32 = 10;

pub struct PipeListener {
fd: RawFd,
monitor_fd: (RawFd, RawFd),
Expand Down Expand Up @@ -104,7 +107,7 @@ impl PipeListener {
libc::poll(
pollers as *mut _ as *mut libc::pollfd,
pollers.len() as _,
-1,
POLL_MAX_TIME,
)
};

Expand Down Expand Up @@ -278,7 +281,7 @@ impl ClientConnection {
libc::poll(
pollers as *mut _ as *mut libc::pollfd,
pollers.len() as _,
-1,
POLL_MAX_TIME,
)
};

Expand Down

0 comments on commit e269739

Please sign in to comment.