Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: make TerminatableTask terminate itself when dropped #1151

Merged

Conversation

YuanYuYuan
Copy link
Contributor

This PR introduces an auto termination when dropped and resolves the bug in #1054.

@YuanYuYuan
Copy link
Contributor Author

@wyfo could you please review this? Thanks.

Copy link
Contributor

@wyfo wyfo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why you changed the receiver type, but I think you should not try to reuse terminate in drop.
Instead, you can use some intermediate function:

async fn terminate_inner(handle: JoinHandle<()>, timeout: Duration) -> bool {
    if tokio::time::timeout(timeout, handle).await.is_err() {
        tracing::error!("Failed to terminate the task");
        return false;
    }
    true
}

And use it like

fn drop(&mut self) {
    if let Some(handle) = self.handle.take() {
        ResolveFuture::new(terminate_inner(handle, std::time::Duration::from_secs(10))).res_sync();
    }
}

fn terminate(mut self, timeout: Duration) -> bool {
    ResolveFuture::new(terminate_inner(self.handle.take().unwrap(), timeout)).res_sync();
}

fn terminate_async(mut self, timeout: Duration) -> bool {
    terminate_inner(self.handle.take().unwrap(), timeout).await
}

token,
}
}

/// Attempts to terminate the task.
/// Returns true if task completed / aborted within timeout duration, false otherwise.
pub fn terminate(self, timeout: Duration) -> bool {
pub fn terminate(&mut self, timeout: Duration) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you should change the receiver type of the method

ResolveFuture::new(async move { self.terminate_async(timeout).await }).res_sync()
}

/// Async version of [`TerminatableTask::terminate()`].
pub async fn terminate_async(self, timeout: Duration) -> bool {
pub async fn terminate_async(&mut self, timeout: Duration) -> bool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Idem about the receiver type

@YuanYuYuan
Copy link
Contributor Author

If I'm not mistaken, to take out the value inside an Option requires &mut self. So we need to modify terminate and terminate_async in any way. Furthermore, we discussed with @Mallets today that moving out the ownership is not necessary as the borrow checker only lives in Rust, and mixing deconstruction and undeclaration could cause some conflicts. (You can check Undeckarable for more details.) We could follow the common pattern like https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.close. Closing a socket or receiver simply takes &mut self and hence Drop can coexist. I will create another draft PR on this topic further.

In conclusion, I suggest splitting the termination and the deconstruction of the task. @wyfo what do you think?

@wyfo
Copy link
Contributor

wyfo commented Jun 17, 2024

If I'm not mistaken, to take out the value inside an Option requires &mut self. So we need to modify terminate and terminate_async in any way.

Just use mut self instead of self, and you can still use value semantic.

We could follow the common pattern like https://docs.rs/tokio/latest/tokio/sync/mpsc/struct.Receiver.html#method.close.

This is not the same thing here, as terminate returns a boolean. Do you want to store this boolean to return it on following calls?

Closing a socket or receiver simply takes &mut self and hence Drop can coexist.

There is no issue to coexist with drop and having terminate with self receiver.

@YuanYuYuan
Copy link
Contributor Author

Hi @wyfo , thanks for you feedback.

Yes. That's what I mean we need to make some change in any way. Since I saw you wrote this in your first version, I thought you want to avoid any modification on self.

fn terminate(self, timeout: Duration) -> bool {
    ResolveFuture::new(terminate_inner(self.handle.take().unwrap(), timeout)).res_sync();
}

And I'm fine with the change of fn terminate(mut self).

The remaining issue is do we want to enforce a descrtuction here? This is related to the potential conflict with Drop. Let me explain it.

Originally, we define our TerminatableTask without Option

pub struct TerminatableTask {
    handle: JoinHandle<()>,
    token: CancellationToken,
}

This forbids us from implementing Drop for it. The conflict comes from

async fn terminate_inner(handle: JoinHandle<()>, timeout: Duration) -> bool {
    if tokio::time::timeout(timeout, handle).await.is_err() {
        tracing::error!("Failed to terminate the task");
        return false;
    }
    true
}
impl TerminatableTask {
    pub async fn terminate_async(self, timeout: Duration) -> bool {
        terminate_inner(self.handle, timeout).await  // ERROR: cannot move out of type `TerminatableTask`, which implements the `Drop` trait
    }
}

Passing self into the function potentially trigger a deconstruction or move out itself. There are same issues in the Undeclarable trait.
For instance, we implement Undeclarable trait for the Subscriber, then we can't implement Drop for it anymore.

impl<R> Drop for Subscriber<'_, R> {
    fn drop(&mut self) {
        todo!()
    }
}

(You can check this does cause lots of "cannot move out ..." errors)

So that's why I need to wrap the JoinHandle with a Option to not take out the handle but the inner value.

Back to this PR, either using &mut self or mut self can compile. But we still need to think twice: do we need to embed a deconstruction in terminate? As the example I pasted, closing a socket multiple times is acceptable in real use cases. In this PR, a 2nd termination on a closed task is simply doing nothing. This is exactly the same with the previous behavior in zenoh library (before the tokio porting).

Moreover, it can avoid any conflict with Drop if we implment/modify the TerminatableTask as below one day (which unfortunately has already existed in the case of Undeclarable).

impl TerminatableTask {
    pub async fn terminate_async(mut self, timeout: Duration) -> bool {
        let Self { token, mut handle } = self;  // ERROR: cannot move out of type `TerminatableTask`, which implements the `Drop` trait
        token.cancel();
        if let Some(handle) = handle.take() {
            if tokio::time::timeout(timeout, handle).await.is_err() {
                tracing::error!("Failed to terminate the task");
                return false;
            };
        }
        true
    }
}

BTW, I'm little bit confused with your point "should not try to reuse terminate in drop". TBH, I don't care about the return boolean especially it would report an error once it failed. And it seems that your example don't process the boolean result in Drop as well. Am I missing something?

@wyfo
Copy link
Contributor

wyfo commented Jun 20, 2024

I give way to your arguments, it's good to me.

@Mallets Mallets merged commit 2500e5a into eclipse-zenoh:main Jun 20, 2024
12 checks passed
Mallets added a commit that referenced this pull request Jul 30, 2024
* Add NOTE for LowLatency transport. (#1088)

Signed-off-by: ChenYing Kuo <evshary@gmail.com>

* fix: Improve debug messages in `zenoh-transport` (#1090)

* fix: Improve debug messages for failing RX/TX tasks

* fix: Improve debug message for `accept_link` timeout

* chore: Fix `clippy::redundant_pattern_matching` error

* Improve pipeline backoff (#1097)

* Yield task for backoff

* Improve comments and error handling in backoff

* Simplify pipeline pull

* Consider backoff configuration

* Add typos check to CI (#1065)

* Fix typos

* Add typos check to CI

* Start link tx_task before notifying router (#1098)

* Fix typos (#1110)

* bump quinn & rustls (#1086)

* bump quinn & rustls

* fix ci windows check

* add comments

* Fix interface name scanning when listening on IP unspecified for TCP/TLS/QUIC/WS (#1123)

Co-authored-by: Julien Enoch <julien.e@zettascale.tech>

* Enable releasing from any branch (#1136)

* Fix cargo clippy (#1145)

* Release tables locks before propagating subscribers and queryables declarations to void dead locks (#1150)

* Send simple sub and qabl declarations using a given function

* Send simple sub and qabl declarations after releasing tables lock

* Send simple sub and qabl declarations after releasing tables lock (missing places)

* feat: make `TerminatableTask` terminate itself when dropped (#1151)

* Fix bug in keyexpr::includes leading to call get_unchecked on empty array UB (#1208)

* REST plugin uses unbounded flume channels for queries (#1213)

* fix: typo in selector.rs (#1228)

* fix: zenohd --cfg (#1263)

* fix: zenohd --cfg

* ci: trigger

* Update zenohd/src/main.rs

---------

Co-authored-by: Luca Cominardi <luca.cominardi@gmail.com>

* Fix failover brokering bug reacting to linkstate changes (#1272)

* Change missleading log

* Fix failover brokering bug reacting to linkstate changes

* Retrigger CI

---------

Co-authored-by: Luca Cominardi <luca.cominardi@gmail.com>

* Code format

* Fix clippy warnings

* Code format

* Fix Clippy errors from Rust 1.80 (#1273)

* Allow unexpected `doc_auto_cfg` flag

* Keep never-constructed logger interceptor

* Ignore interior mutability of `Resource`

* Fix typo

* Resolve `clippy::doc-lazy-continuation` errors

* Upgrade `time@0.3.28` to `time@0.3.36`

See time-rs/time#693

* Update Cargo.toml (#1277)

Updated description to be aligned with what we use everywhere else

* Merge ci.yaml

---------

Signed-off-by: ChenYing Kuo <evshary@gmail.com>
Co-authored-by: ChenYing Kuo (CY) <evshary@gmail.com>
Co-authored-by: Mahmoud Mazouz <mazouz.mahmoud@outlook.com>
Co-authored-by: Tavo Annus <tavo.annus@gmail.com>
Co-authored-by: JLer <jlerxky@live.com>
Co-authored-by: Julien Enoch <julien.e@zettascale.tech>
Co-authored-by: OlivierHecart <olivier@zettascale.tech>
Co-authored-by: Yuyuan Yuan <az6980522@gmail.com>
Co-authored-by: Diogo Matsubara <diogo.matsubara@zettascale.tech>
Co-authored-by: OlivierHecart <olivier.hecart@adlinktech.com>
Co-authored-by: kydos <kydos@protonmail.com>
Mallets added a commit that referenced this pull request Aug 27, 2024
* Add NOTE for LowLatency transport. (#1088)

Signed-off-by: ChenYing Kuo <evshary@gmail.com>

* fix: Improve debug messages in `zenoh-transport` (#1090)

* fix: Improve debug messages for failing RX/TX tasks

* fix: Improve debug message for `accept_link` timeout

* chore: Fix `clippy::redundant_pattern_matching` error

* Improve pipeline backoff (#1097)

* Yield task for backoff

* Improve comments and error handling in backoff

* Simplify pipeline pull

* Consider backoff configuration

* Add typos check to CI (#1065)

* Fix typos

* Add typos check to CI

* Start link tx_task before notifying router (#1098)

* Fix typos (#1110)

* bump quinn & rustls (#1086)

* bump quinn & rustls

* fix ci windows check

* add comments

* Fix interface name scanning when listening on IP unspecified for TCP/TLS/QUIC/WS (#1123)

Co-authored-by: Julien Enoch <julien.e@zettascale.tech>

* Enable releasing from any branch (#1136)

* Fix cargo clippy (#1145)

* Release tables locks before propagating subscribers and queryables declarations to void dead locks (#1150)

* Send simple sub and qabl declarations using a given function

* Send simple sub and qabl declarations after releasing tables lock

* Send simple sub and qabl declarations after releasing tables lock (missing places)

* feat: make `TerminatableTask` terminate itself when dropped (#1151)

* Fix bug in keyexpr::includes leading to call get_unchecked on empty array UB (#1208)

* REST plugin uses unbounded flume channels for queries (#1213)

* fix: typo in selector.rs (#1228)

* fix: zenohd --cfg (#1263)

* fix: zenohd --cfg

* ci: trigger

* Update zenohd/src/main.rs

---------

Co-authored-by: Luca Cominardi <luca.cominardi@gmail.com>

* Fix failover brokering bug reacting to linkstate changes (#1272)

* Change missleading log

* Fix failover brokering bug reacting to linkstate changes

* Retrigger CI

---------

Co-authored-by: Luca Cominardi <luca.cominardi@gmail.com>

* Code format

* Fix clippy warnings

* Code format

* Fix Clippy errors from Rust 1.80 (#1273)

* Allow unexpected `doc_auto_cfg` flag

* Keep never-constructed logger interceptor

* Ignore interior mutability of `Resource`

* Fix typo

* Resolve `clippy::doc-lazy-continuation` errors

* Upgrade `time@0.3.28` to `time@0.3.36`

See time-rs/time#693

* Update Cargo.toml (#1277)

Updated description to be aligned with what we use everywhere else

* fix: typos (#1297)

* Replace trees computation tasks with a worker (#1303)

* Replace trees computation tasks with a worker

* Address review comments

* Remove review comments

* zenohd-default config error #1292 (#1298)

* Zenohd panic when tring load file

When zenohd trying load file, if it have a problem it crash cause another treat was "unwrap", and it return to a type config. So, it crash and cause painic.

* zenohd default config error #1292

When tring load config file defined by -c option. With haver any problema "unwrap" has been to Config type.

I treat it return a Default Config whe it happen

* If file fail when try load configs

If file fail when try load configs

* Update main.rs

* Resolve typos at comment

Resolve typos at comment

* fix: typos (#1297)

* zenohd-default config error #1292 (#1298)

* Zenohd panic when tring load file

When zenohd trying load file, if it have a problem it crash cause another treat was "unwrap", and it return to a type config. So, it crash and cause painic.

* zenohd default config error #1292

When tring load config file defined by -c option. With haver any problema "unwrap" has been to Config type.

I treat it return a Default Config whe it happen

* If file fail when try load configs

If file fail when try load configs

* Update main.rs

* Resolve typos at comment

Resolve typos at comment

* Replace trees computation tasks with a worker (#1303)

* Replace trees computation tasks with a worker

* Address review comments

* Remove review comments

* revering fix #1298

---------

Signed-off-by: ChenYing Kuo <evshary@gmail.com>
Co-authored-by: ChenYing Kuo (CY) <evshary@gmail.com>
Co-authored-by: Mahmoud Mazouz <mazouz.mahmoud@outlook.com>
Co-authored-by: Luca Cominardi <luca.cominardi@gmail.com>
Co-authored-by: Tavo Annus <tavo.annus@gmail.com>
Co-authored-by: JLer <jlerxky@live.com>
Co-authored-by: Julien Enoch <julien.e@zettascale.tech>
Co-authored-by: OlivierHecart <olivier@zettascale.tech>
Co-authored-by: Yuyuan Yuan <az6980522@gmail.com>
Co-authored-by: Diogo Matsubara <diogo.matsubara@zettascale.tech>
Co-authored-by: OlivierHecart <olivier.hecart@adlinktech.com>
Co-authored-by: kydos <kydos@protonmail.com>
Co-authored-by: brianPA <80439594+brian049@users.noreply.github.com>
Co-authored-by: Tiago Neves <32251249+anhaabaete@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: No status
Development

Successfully merging this pull request may close these issues.

3 participants