Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

threadpool: drop incomplete tasks on shutdown #722

Merged
6 commits merged into from Jan 17, 2019
Merged

threadpool: drop incomplete tasks on shutdown #722

6 commits merged into from Jan 17, 2019

Conversation

ghost
Copy link

@ghost ghost commented Oct 25, 2018

Motivation

When the thread pool shuts down, futures that have been polled at least once but not completed yet are simply leaked. We should drop them instead.

Solution

Multiple changes are introduced:

  • Tasks are assigned a home worker the first time they are polled.

  • Each worker contains a set of tasks (Arc<Task>) it is home to. When a task is assigned a home worker, it is registered in that worker's set of tasks. When the task is completed, it is unregistered from the set.

  • When the thread pool shuts down and after all worker threads stop, the remaining tasks in workers' sets are aborted, i.e. they are switched to the Aborted state and their Futures are dropped.

  • The thread pool shutdown process is refactored to make it more robust. We don't track the number of active threads manually anymore. Instead, there's Arc<ShutdownTrigger> that aborts remaining tasks and completes the Shutdown future once it gets destroyed (when all Workers and ThreadPool get dropped because they're the only ones to contain strong references to the ShutdownTrigger).

Closes #424
Closes #428

@ghost ghost requested a review from carllerche October 25, 2018 16:45
@appaquet
Copy link

Thanks ! Just found out this PR that fixes an issue I reported on Gitter yesterday, where a Timer based future would make the Runtime retains an UdpSocket. This resulted in the socket not being properly closed.

But, this PR seems to be breaking a simple blocking hyper HTTP request that uses block_on. I didn't boil down the example to just Tokio, but I guess it's simple enough for you to find out the issue. Here's a minimal sample: https://gist.github.com/appaquet/f141cac636e8d0df2d0bd7e37b312c0c

It fails with a Error { kind: Execute, cause: None }. Enabling logs give a bit more info:

W/hyper::common::exec: executor error: SpawnError { is_shutdown: true }
W/hyper::client: error spawning critical client task: executor failed to spawn task

@ghost
Copy link
Author

ghost commented Oct 26, 2018

@appaquet Can you post your Cargo.toml used for the http_blocking.rs example?

I tried running it with the following Cargo.toml:

[package]
name = "scratch-rs"
version = "0.1.0"

[dependencies]
tokio = { path = "../tokio" }
hyper = "*"

# [patch.crates-io]
# tokio = { path = "../tokio" }
# tokio-async-await  = { path = "../tokio/tokio-async-await" }
# tokio-codec = { path = "../tokio/tokio-codec" }
# tokio-current-thread = { path = "../tokio/tokio-current-thread" }
# tokio-executor = { path = "../tokio/tokio-executor" }
# tokio-fs = { path = "../tokio/tokio-fs" }
# tokio-io = { path = "../tokio/tokio-io" }
# tokio-reactor = { path = "../tokio/tokio-reactor" }
# tokio-signal = { path = "../tokio/tokio-signal" }
# tokio-tcp = { path = "../tokio/tokio-tcp" }
# tokio-threadpool = { path = "../tokio/tokio-threadpool" }
# tokio-timer = { path = "../tokio/tokio-timer" }
# tokio-tls = { path = "../tokio/tokio-tls" }
# tokio-udp = { path = "../tokio/tokio-udp" }
# tokio-uds = { path = "../tokio/tokio-uds" }

This prints the same error you got:

$ cargo run --bin http_blocking
    Finished dev [unoptimized + debuginfo] target(s) in 0.09s
     Running `target/debug/http_blocking`
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Error { kind: Execute, cause: None }', libcore/result.rs:1009:5
note: Run with `RUST_BACKTRACE=1` for a backtrace.

But if I uncomment those lines in Cargo.toml, then it's all fine:

$ cargo run --bin http_blocking
    Finished dev [unoptimized + debuginfo] target(s) in 0.07s
     Running `target/debug/http_blocking`
Got response: {
  "origin": "178.19.209.206"
}

The same thing happens on this branch and the master branch.

@appaquet
Copy link

appaquet commented Oct 26, 2018

Oh, I had just pointed Tokio to your branch, but didn't catch that Hyper was pulling its own Tokio version until I checked the dependency tree. Going with the patching way, my example do work. So, false alarm, but 👍 because it fixed my previous UdpSocket issue !

@carllerche
Copy link
Member

@stjepang Still working on reviewing this (will be through it soon). Did you compare the benchmarks before/after?

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good 👍 I left some thoughts inline and I'd be interested in seeing how this impacts benchmarks.

Also, I noticed that this PR does not use home_worker when the task gets scheduled. Have you considered doing this? It might not matter?

@@ -22,6 +24,9 @@ pub(crate) struct WorkerEntry {
// comments on that type.
pub state: CachePadded<AtomicUsize>,

// The set of tasks registered in this worker entry.
owned_tasks: Mutex<HashSet<*const Task>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that the mutex isn't really required, it is just here to avoid additional unsafe code (which is a good goal). In practice, the mutex should not be contended, but I wonder what the perf impact of it would be on platforms that do not have good mutex implementations.

Did this change impact benchmarks that spawn many tasks?

Another option would be to avoid the mutex in owned_tasks, then have an owned_tasks_shutdown field with the mutex. Before the worker terminates, it moves the HashSet. Or, a channel between the worker and the shutdown trigger is used to send the hashset.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The perf impact is minimal (see the comment below). How do you feel about switching to parking_lot, though? It has a very efficient implementation on all platforms.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, avoiding the mutex entirely is doable. Do you think the strategy I mentioned above isn't worth it or too complicated?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is doable, but I think the complexity to gains ratio doesn't justify it. :) Maybe once/if it becomes a problem in the future...

tokio-threadpool/src/task/mod.rs Outdated Show resolved Hide resolved
@ghost
Copy link
Author

ghost commented Nov 5, 2018

@carllerche

Still working on reviewing this (will be through it soon). Did you compare the benchmarks before/after?

Yep. Some slowdown can be observed due to this PR, but it's very small:

 name                    before ns/iter  after ns/iter  diff ns/iter  diff %  speedup
 threadpool::spawn_many  2,910,813       2,949,012            38,199   1.31%   x 0.99
 threadpool::yield_many  11,669,013      11,791,126          122,113   1.05%   x 0.99

@carllerche
Copy link
Member

@stjepang I'd rather try to avoid changes that impact perf (even a bit). What if you try switching to the FNV hasher? I wonder what is the root of the performance hit.

@ghost
Copy link
Author

ghost commented Nov 5, 2018

Switching to fnv or parking_lot didn't seem to help. Then I changed the threadpool::spawn_many benchmark to use pool_size = 1 to see what happens...

This is on the master branch:

test threadpool::spawn_many ... bench:   3,361,182 ns/iter (+/- 1,049,312)

This PR:

test threadpool::spawn_many ... bench:   4,137,889 ns/iter (+/- 238,753)

This PR with fnv:

test threadpool::spawn_many ... bench:   3,981,100 ns/iter (+/- 507,596)

This PR with fnv and parking_lot:

test threadpool::spawn_many ... bench:   3,849,528 ns/iter (+/- 752,109)

Here's an idea on how to improve performance. Perhaps we should have two sets:

  • One is called owned_tasks and may only be modified by the owning worker. When a task is polled the first time, it is inserted into the worker's owned_tasks set. It doesn't need a lock.

  • The other is called completed_tasks and is just a Mutex<Vec<*const Task>>.

Every 100th (or so) insertion into owned_tasks, we lock the mutex and take out all tasks from completed_tasks, and then remove them from owned_tasks. It's like lazy garbage collection.

When a worker completes a task it owns, it doesn't have to touch completed_tasks - it can just directly remove it from owned_tasks. That means mutexes would almost never be used.

@appaquet
Copy link

appaquet commented Nov 6, 2018

@stjepang I ran into another issue using this PR's code where I ran a into lack of file descriptors (which is low by default on MacOS at 256). lsof put me on the track that a kqueue/pipe were being leaked / retained, which ended me tracking down a retain cycle when using un-fired Delay when dropping the Runtime.

The issue is not coming from this PR specifically, but since it was supposed to fix it, I'm posting my findings here. If you run this gist on your branch on MacOS, with ulimit -n set to 256, you'll run into a Too many open files after 10 iterations. By logging various drop in the Runtime,Pool,Timer, you can clearly see that the Timer and Pool never get dropped. What happens is that when you have a scheduled timer that hasn't been fired yet, and that you drop the Runtime, the Arc<Pool> gets retained because of a cycle between itself and the un-fired timer. I tracked down the retain cycle to this: Notifier retains Arc<Pool> that retains Timer (via the custom_park) that retains timer::Entry that retains the Notifier (via the AtomicTask).

The problem can easily be fixed by making the notifier retains weakly the Pool instead. See this commit. I don't know if it could cause any side effects, but all my tests and tokio's tests run fine.

I could open a separate PR once this one is merged, or we could simply fix it in this PR. Up to you guys.

@ghost
Copy link
Author

ghost commented Nov 6, 2018

@appaquet Thanks for diagnosing the issue and the write-up, this is very useful!

We've intentionally replaced the Weak to Arc in Notifier in order to improve performance: #702

I think the right solution here is dropping the park field inside Entry on shutdown trigger. This way the Timer will get dropped too.

I wonder if we should drop the unpark field, too? It should be pretty easy to do.

@appaquet
Copy link

appaquet commented Nov 6, 2018

@stjepang I saw #702 right after posting! Your changes also fix my issue, so it's good. As for the unpark, in the case of Timer, it doesn't seem to retain the pool because the unpark() implemention of the Timer returns the reactor's unpark. But in my opinion, it would probably be safer to do the same explicit dropping since there is nothing that guarantees a future implementation of an unpark to not retain. It happens that the Timer just needed to override the park and park_timeout, but it could have overridden the unpark too if it would have been needed, potentially leading to the same cycle.

@ghost
Copy link
Author

ghost commented Nov 13, 2018

All right, we're dropping Unpark now, too. Travis is passing.

@carllerche Can you review?

@ghost
Copy link
Author

ghost commented Nov 13, 2018

Here go newest benchmarks. Each one has two lines. The first line is the original threadpool::spawn_many benchmark, and second line is the same benchmark except with .pool_size(1).

Since the latest nightly has switched from jemalloc to the system allocator, I had to manually set the allocator to jemalloc.

master branch:

test threadpool::spawn_many ... bench:   2,957,472 ns/iter (+/- 803,372)
test threadpool::spawn_many ... bench:   3,448,530 ns/iter (+/- 989,017)

Previous commit (ad14c6d):

test threadpool::spawn_many ... bench:   2,993,056 ns/iter (+/- 676,859)
test threadpool::spawn_many ... bench:   4,161,152 ns/iter (+/- 826,778)

Last commit (7887765):

test threadpool::spawn_many ... bench:   2,995,636 ns/iter (+/- 260,750)
test threadpool::spawn_many ... bench:   3,700,544 ns/iter (+/- 713,514)

The last commit is using the optimization idea from this comment. I've run out of ideas on how to make this faster, but keep in mind that task bookkeeping has some cost so there won't be free lunch.

@carllerche
Copy link
Member

There is a lot going on in this PR which makes it hard to really review and iterate on.

Would it be terribly hard to split it into a PR that does the rename from inner -> pool and switches to the shutdown trigger? Then, we can deal with the set of owned tasks separately.

ghost pushed a commit that referenced this pull request Nov 20, 2018
`inner` is a fitting name for variables of type named `Inner`, but in other cases I find them confusing - sometimes `inner` refers to a `Pool`, sometimes to a `Sender`. I renamed a bunch of variables named `inner` to be more descriptive.

This PR is the first step in an effort of splitting #722 (comment) into multiple PRs.
@ghost
Copy link
Author

ghost commented Nov 21, 2018

@carllerche There we go - this PR now deals with the set of owned tasks only.

// This is only used to know when to single `shutdown_task` once the
// shutdown process has completed.
pub num_workers: AtomicUsize,

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just an unused field from the times before ShutdownTrigger so I'm removing it.

@ghost
Copy link
Author

ghost commented Nov 29, 2018

@carllerche Just a ping for review. :)

@carllerche
Copy link
Member

The logic seems correct....

I'm a little wary of this patch as it adds non trivial amounts of work in the hot path... I wonder if we can step back for a second and think if there is another way handle this, perhaps w/ larger changes in design that we could tackle at a later point as we try to clean up / simplify the pool.

What does Go do to handle this case?

@ghost
Copy link
Author

ghost commented Dec 8, 2018

What does Go do to handle this case?

Go simply uses the garbage collector to eventually free all tasks.

I've found a design that doesn't use parking_lot, doesn't use hash tables, is reasonably simple, and has very little overhead. However, I'll let this PR wait until we do some refactoring so that things don't get too messy.

@carllerche
Copy link
Member

@stjepang what refactoring are you waiting on?

@ghost
Copy link
Author

ghost commented Jan 8, 2019

All right, I did some refactoring and now this PR is ready for review.

Originally we used a HashSet for holding the list of tasks, but now we have Registry instead, which should be a faster and simpler data structure. It contains just a simple Vec<Arc<Task>>.

In order to add a task to the registry, we push it into the Vec and store its position (index) into the Task struct. To remove a task from the registry, we first get its index, then do swap_remove(index) to remove it, and finally update the index of the task that got swapped.

Each worker::Entry has fields registry: Registry and completed_tasks: SegQueue<Arc<Task>>:

  • registry holds tasks that have been polled by this worker first, but not completed. Type Registry needs &mut self to add and remove tasks.

  • completed_tasks holds tasks that have been polled by this worker first, but stolen and completed by another worker. This queue is shared among all threads.

When shutting down the pool, we go through each worker, drain completed_tasks and remove those tasks from the registry, and finally abort the remaining tasks. Note that we also drain the completed_tasks queue on every call to Entry::unregister_task() so that it doesn't accidentally grow too big.

Benchmarks (master vs drop-tasks):

 name                    before ns/iter  after ns/iter  diff ns/iter  diff %  speedup
 threadpool::spawn_many  3,032,575       3,082,088            49,513   1.63%   x 0.98
 threadpool::yield_many  10,403,187      10,535,910          132,723   1.28%   x 0.99

We're still not as fast as the master branch. This is 1-2% slowdown in a synthetic benchmark with tasks that don't do any work, which means the real-world impact should be smaller that that. Furthermore, considering the Registry struct is pretty much the optimal data structure for this use case, I honestly don't think we can do any better. shrug

Copy link
Member

@tobz tobz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just some initial thoughts.

/// ID of the worker that polled this task first.
pub reg_worker: AtomicUsize,

/// Position of this task in the `Vec` inside the `Registry` it was registerd in.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

registerd -> registered

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

let index = task.reg_index.load(Ordering::Acquire);
assert!(Arc::ptr_eq(&self.tasks[index], task));

self.tasks.swap_remove(index);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could it be faster here to use Slab instead of Vec? That gives you the invariant of the index never changing for a task once its been registered, and eliminates that code entirely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, I think not having to update reg_index after reassignment also means you can avoid needing an atomic?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, the AtomicUsize can actually be turned into a Cell, which I've just done.

Also, I've just tried replacing Vec with Slab and it seems to improve things a little bit. Thanks for the advice!

@ghost
Copy link
Author

ghost commented Jan 8, 2019

Benchmarks after replacing Vec with Slab:

 name                    before ns/iter  after ns/iter  diff ns/iter  diff %  speedup
 threadpool::spawn_many  3,029,691       3,075,504            45,813   1.51%   x 0.99
 threadpool::yield_many  10,250,197      10,315,280           65,083   0.63%   x 0.99

The numbers change quite a bit from run to run, but it seems we've got some improvement. This also simplified the code and reduced the number of changed LOC.

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Looks good. I really like the shutdown trigger strategy for coordinating.

I left some questions inline.

///
/// The worker ID is represented by a `u32` rather than `usize` in order to save some space
/// on 64-bit platforms.
pub reg_worker: Cell<Option<u32>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like this should be UnsafeCell? I'm surprised that Cell works. It must be because Task is !Send?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, Task is Send because it can be stolen and moved between worker threads.

What is important here is that Task has Sync and !Sync parts:

  • state is Sync because it may be concurrently written to from multiple threads, e.g. during notification.
  • future is !Sync because it's only ever used by the thread which is currently executing it. reg_worker is !Sync for the same reason.

It seems to me that reg_worker can be a Cell here just fine, since it's never accessed concurrently. Is my understanding correct? Otherwise, I can switch to UnsafeCell, it's no problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be safely called, it was more of a question of how to write idiomatic unsafe rust code :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm very interested in the answer to that question, too... this kind of pattern occurs in plenty of places.

unpark: UnsafeCell<Option<BoxUnpark>>,

// Tasks that have been first polled by this worker, but not completed yet.
running_tasks: UnsafeCell<Slab<Arc<Task>>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with using a Slab here is that the memory will grow to hold the max number of tasks that were concurrently running, but never shrink after. Slabs can't easily shrink due to their design.

Do you have thoughts on this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's hard to say for me whether that's really a problem in practice or not. This Slab should be pretty compact since Arc<Task> is just a single word. If you're worried about it, I can switch back to Vec and we can shrink the underlying buffer when it gets less than half full.

tokio-threadpool/Cargo.toml Show resolved Hide resolved
/// Called when the task is completed by another worker and was previously registered in this
/// worker.
#[inline]
pub fn completed_task(&self, task: Arc<Task>) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: The name of this function confused me.

The name should probably be a verb and indicate that this is only called when not on the home worker. Perhaps remotely_complete_task?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like that one, sure!

running_tasks: UnsafeCell<Slab<Arc<Task>>>,

// Tasks that have been first polled by this worker, but completed by another worker.
completed_tasks: SegQueue<Arc<Task>>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: maybe name remotely_completed_tasks

@ghost
Copy link
Author

ghost commented Jan 17, 2019

The latest comments have been addressed.

For posterity sake, here are benchmark results when we use Slab vs doubly linked list for keeping track of registered tasks:

 name                    slab ns/iter  linkedlist ns/iter  diff ns/iter  diff %  speedup
 threadpool::spawn_many  3,078,609     3,703,809                625,200  20.31%   x 0.83
 threadpool::yield_many  10,252,081    10,359,495               107,414   1.05%   x 0.99

And here's Slab vs Vec (to remove a task from Vec, call swap_remove() + update reg_index for the swapped task):

 name                    slab ns/iter  vec ns/iter  diff ns/iter  diff %  speedup
 threadpool::spawn_many  3,078,609     3,055,488         -23,121  -0.75%   x 1.01
 threadpool::yield_many  10,252,081    10,387,390        135,309   1.32%   x 0.99

@ghost ghost merged commit 4c8f274 into tokio-rs:master Jan 17, 2019
@ghost ghost deleted the drop-tasks branch January 17, 2019 21:12
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants