Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The ttrpc client lifetime is so long for Main thread #225

Closed
jokemanfire opened this issue Apr 26, 2024 · 9 comments · Fixed by #226
Closed

The ttrpc client lifetime is so long for Main thread #225

jokemanfire opened this issue Apr 26, 2024 · 9 comments · Fixed by #226

Comments

@jokemanfire
Copy link
Contributor

Description of problem

I code example client.rs like this

fn main() {
    simple_logging::log_to_stderr(LevelFilter::Trace);

   
    for i in 0..10 {
        let c = Client::connect(utils::SOCK_ADDR).unwrap();
        let hc = health_ttrpc::HealthClient::new(c.clone());
        let ac = agent_ttrpc::AgentServiceClient::new(c);
    
        let thc = hc.clone();
        let tac = ac.clone();
    
        let now = std::time::Instant::now();
        let t = thread::spawn(move || {
            let req = health::CheckRequest::new();
            println!(
                "OS Thread {:?} - health.check() started: {:?}",
                std::thread::current().id(),
                now.elapsed(),
            );
    
            let rsp = thc.check(default_ctx(), &req);
            match rsp.as_ref() {
                Err(Error::RpcStatus(s)) => {
                    assert_eq!(Code::NOT_FOUND, s.code());
                    assert_eq!("Just for fun".to_string(), s.message())
                }
                Err(e) => {
                    panic!("not expecting an error from the example server: {:?}", e)
                }
                Ok(x) => {
                    panic!("not expecting a OK response from the example server: {:?}", x)
                }
            }
            println!(
                "OS Thread {:?} - health.check() -> {:?} ended: {:?}",
                std::thread::current().id(),
                rsp,
                now.elapsed(),
            );
        });
    
        let t2 = thread::spawn(move || {
            println!(
                "OS Thread {:?} - agent.list_interfaces() started: {:?}",
                std::thread::current().id(),
                now.elapsed(),
            );
    
            let show = match tac.list_interfaces(default_ctx(), &agent::ListInterfacesRequest::new()) {
                Err(e) => {
                    panic!("not expecting an error from the example server: {:?}", e)
                }
                Ok(s) => {
                    assert_eq!("first".to_string(), s.Interfaces[0].name);
                    assert_eq!("second".to_string(), s.Interfaces[1].name);
                    format!("{s:?}")
                }
            };
    
            println!(
                "OS Thread {:?} - agent.list_interfaces() -> {} ended: {:?}",
                std::thread::current().id(),
                show,
                now.elapsed(),
            );
        });


   
    

    println!(
        "Main OS Thread - agent.online_cpu_mem() started: {:?}",
        now.elapsed()
    );
    let show = match ac.online_cpu_mem(default_ctx(), &agent::OnlineCPUMemRequest::new()) {
        Err(Error::RpcStatus(s)) => {
            assert_eq!(Code::NOT_FOUND, s.code());
            assert_eq!(
                "/grpc.AgentService/OnlineCPUMem is not supported".to_string(),
                s.message()
            );
            format!("{s:?}")
        }
        Err(e) => {
            panic!("not expecting an error from the example server: {:?}", e)
        }
        Ok(s) => {
            panic!("not expecting a OK response from the example server: {:?}", s)
        }
    };
    println!(
        "Main OS Thread - agent.online_cpu_mem() -> {} ended: {:?}",
        show,
        now.elapsed()
    );

        println!("\nsleep 2 seconds ...\n");
        thread::sleep(std::time::Duration::from_secs(2));

        let version = hc.version(default_ctx(), &health::CheckRequest::new());
        assert_eq!("mock.0.1", version.as_ref().unwrap().agent_version.as_str());
        assert_eq!("0.0.1", version.as_ref().unwrap().grpc_version.as_str());
        println!(
            "Main OS Thread - health.version() started: {:?}",
            now.elapsed()
        );
        println!(
            "Main OS Thread - health.version() -> {:?} ended: {:?}",
            version,
            now.elapsed()
        );
        t.join().unwrap();
        t2.join().unwrap();

    }
    loop{
        
    }

  
}

this this the phenomenon, the fd will infinite growth.

(replace this text with the list of steps you followed)

Expected result

method 1
1、Provide a interface that I can proactively release the fd.
2、If it exceeds the lifecycle ,release the fd automatic.

Actual result

1、fd will not increase in main thread.

(replace this text with details of what actually happened)
If it is true, I will submit a PR.

@lifupan
Copy link
Collaborator

lifupan commented Apr 28, 2024

Hi @jokemanfire

Why do you think the ttrpc client will live infinitely? Once the t and t2 thread were joined, then hc, thc, ac and tac would be destroyed, right?

@jokemanfire
Copy link
Contributor Author

Hi @jokemanfire

Why do you think the ttrpc client will live infinitely? Once the t and t2 thread were joined, then hc, thc, ac and tac would be destroyed, right?

Hello , when I try to code 'like loop (create a client)' in the main thread (I need to create client_ttprc in my project , all my project function is in main thread), I found the /proc/{self.pid}/fd will increase infinitely.The file descriptor of the process is restricted to go online. I found the fd will not be release in main_thread , though I use client_ttprc in a wrap function, Is it the usage error caused by my insufficient understanding? thank you.

@lifupan
Copy link
Collaborator

lifupan commented Apr 28, 2024

let c = Client::connect(utils::SOCK_ADDR).unwrap();

The key is the client you create with " let c = Client::connect(utils::SOCK_ADDR).unwrap();" , once your client is over, you should make sure the "c" and any other clone entity should be destroyed.

@jokemanfire
Copy link
Contributor Author

let c = Client::connect(utils::SOCK_ADDR).unwrap();

The key is the client you create with " let c = Client::connect(utils::SOCK_ADDR).unwrap();" , once your client is over, you should make sure the "c" and any other clone entity should be destroyed.

thank you ,but I feel confused , In the example code , please check If my error:
1 、 for i in 0..10 in the next iter ,the lifetime in this iter have been finished,
2、 ` let c = Client::connect(utils::SOCK_ADDR).unwrap();
let hc = health_ttrpc::HealthClient::new(c.clone());
let ac = agent_ttrpc::AgentServiceClient::new(c.clone());

    let thc = hc.clone();
    let tac = ac.clone();`   the thc and tac 's life time will finished until the thread join completely.

3、the hc 、ac 's life time should finished after this iter . Is there some desotry method for the hc 、ac variable?
Thank you, big shot

@jokemanfire
Copy link
Contributor Author

If the Client Inner _connection: Arc, this variable Arc count will not be zero cause this problem? the thread will hold it forever in Func new_client.

@lifupan
Copy link
Collaborator

lifupan commented Apr 28, 2024

Since the hc and ac's life time ended, then it should close the connection automatically.

If the Client Inner _connection: Arc, this variable Arc count will not be zero cause this problem? the thread will hold it forever in Func new_client.

That would be, but in your example, after the loop, all of the client and clone entity had been dropped and it should be reduced to zero? Anyway, would you check was there any thread created by client new exit after all of the client dropped?

@jokemanfire
Copy link
Contributor Author

jokemanfire commented Apr 28, 2024

thanks , now I get this problem clearly , first change the thread's client_connection to weak Arc , this will not increase the Arc count, And then when use libc poll ,there must a timeout parameters , otherwise it will wait forever... I will push a pr ,and then you can check it. there is my test code
I have change two point to resolve this problem ,
1、change it to weak_arc in the thread.
2、add timeout to libc.poll

fn get_describe_numbers(pid: &str) -> i64{
    let proc_dir = format!("/proc/{}/fd", pid);
    let mut cnt = 0;
    match read_dir(PathBuf::from(proc_dir)) {
        Ok(entries) => {
            for entry in entries {
                match entry {
                    Ok(e) => {
                        cnt += 1;
                    },
                    Err(e) => {
                        eprintln!("Error reading directory entry: {}", e);
                    }
                }
            }
        },
        Err(e) => {
            eprintln!("Error reading directory: {}", e);
        }
    }
    cnt
}
fn main() {
    simple_logging::log_to_stderr(LevelFilter::Trace);
    let pid = process::id().to_string();
    let  pre_fds = get_describe_numbers(pid.as_str());
    for i in 0..1000 {
        let now_fds = get_describe_numbers(pid.as_str());
        if now_fds!= pre_fds{
            println!("fd is not release pre_fd {pre_fds:?}  now_fd {now_fds:?}");
            // pre_fds = now_fds
        }
        println!("-------------now fd number is {now_fds:?}---------------------------------------");
        let c = Client::connect(utils::SOCK_ADDR).unwrap();
       
    }
    let now_fds = get_describe_numbers(pid.as_str());
    if now_fds!= pre_fds{
        println!("fd is not release pre_fd {pre_fds:?}  now_fd {now_fds:?}");
        // pre_fds = now_fds
    }

    println!("over");
  
}

@jokemanfire
Copy link
Contributor Author

jokemanfire commented Apr 28, 2024

#226

@wllenyj
Copy link
Collaborator

wllenyj commented May 10, 2024

#226

Fixed

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants