-
-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
threadpool: drop incomplete tasks on shutdown #722
Conversation
Thanks ! Just found out this PR that fixes an issue I reported on Gitter yesterday, where a Timer based future would make the Runtime retains an UdpSocket. This resulted in the socket not being properly closed. But, this PR seems to be breaking a simple blocking hyper HTTP request that uses It fails with a
|
@appaquet Can you post your I tried running it with the following [package]
name = "scratch-rs"
version = "0.1.0"
[dependencies]
tokio = { path = "../tokio" }
hyper = "*"
# [patch.crates-io]
# tokio = { path = "../tokio" }
# tokio-async-await = { path = "../tokio/tokio-async-await" }
# tokio-codec = { path = "../tokio/tokio-codec" }
# tokio-current-thread = { path = "../tokio/tokio-current-thread" }
# tokio-executor = { path = "../tokio/tokio-executor" }
# tokio-fs = { path = "../tokio/tokio-fs" }
# tokio-io = { path = "../tokio/tokio-io" }
# tokio-reactor = { path = "../tokio/tokio-reactor" }
# tokio-signal = { path = "../tokio/tokio-signal" }
# tokio-tcp = { path = "../tokio/tokio-tcp" }
# tokio-threadpool = { path = "../tokio/tokio-threadpool" }
# tokio-timer = { path = "../tokio/tokio-timer" }
# tokio-tls = { path = "../tokio/tokio-tls" }
# tokio-udp = { path = "../tokio/tokio-udp" }
# tokio-uds = { path = "../tokio/tokio-uds" } This prints the same error you got:
But if I uncomment those lines in
The same thing happens on this branch and the master branch. |
Oh, I had just pointed Tokio to your branch, but didn't catch that Hyper was pulling its own Tokio version until I checked the dependency tree. Going with the patching way, my example do work. So, false alarm, but 👍 because it fixed my previous UdpSocket issue ! |
@stjepang Still working on reviewing this (will be through it soon). Did you compare the benchmarks before/after? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good 👍 I left some thoughts inline and I'd be interested in seeing how this impacts benchmarks.
Also, I noticed that this PR does not use home_worker
when the task gets scheduled. Have you considered doing this? It might not matter?
tokio-threadpool/src/worker/entry.rs
Outdated
@@ -22,6 +24,9 @@ pub(crate) struct WorkerEntry { | |||
// comments on that type. | |||
pub state: CachePadded<AtomicUsize>, | |||
|
|||
// The set of tasks registered in this worker entry. | |||
owned_tasks: Mutex<HashSet<*const Task>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that the mutex isn't really required, it is just here to avoid additional unsafe code (which is a good goal). In practice, the mutex should not be contended, but I wonder what the perf impact of it would be on platforms that do not have good mutex implementations.
Did this change impact benchmarks that spawn many tasks?
Another option would be to avoid the mutex in owned_tasks
, then have an owned_tasks_shutdown
field with the mutex. Before the worker terminates, it moves the HashSet. Or, a channel between the worker and the shutdown trigger is used to send the hashset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The perf impact is minimal (see the comment below). How do you feel about switching to parking_lot
, though? It has a very efficient implementation on all platforms.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, avoiding the mutex entirely is doable. Do you think the strategy I mentioned above isn't worth it or too complicated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is doable, but I think the complexity to gains ratio doesn't justify it. :) Maybe once/if it becomes a problem in the future...
Yep. Some slowdown can be observed due to this PR, but it's very small: name before ns/iter after ns/iter diff ns/iter diff % speedup
threadpool::spawn_many 2,910,813 2,949,012 38,199 1.31% x 0.99
threadpool::yield_many 11,669,013 11,791,126 122,113 1.05% x 0.99 |
@stjepang I'd rather try to avoid changes that impact perf (even a bit). What if you try switching to the FNV hasher? I wonder what is the root of the performance hit. |
Switching to This is on the
This PR:
This PR with
This PR with
Here's an idea on how to improve performance. Perhaps we should have two sets:
Every 100th (or so) insertion into When a worker completes a task it owns, it doesn't have to touch |
@stjepang I ran into another issue using this PR's code where I ran a into lack of file descriptors (which is low by default on MacOS at 256). The issue is not coming from this PR specifically, but since it was supposed to fix it, I'm posting my findings here. If you run this gist on your branch on MacOS, with The problem can easily be fixed by making the notifier retains weakly the I could open a separate PR once this one is merged, or we could simply fix it in this PR. Up to you guys. |
@appaquet Thanks for diagnosing the issue and the write-up, this is very useful! We've intentionally replaced the I think the right solution here is dropping the I wonder if we should drop the |
@stjepang I saw #702 right after posting! Your changes also fix my issue, so it's good. As for the |
All right, we're dropping @carllerche Can you review? |
Here go newest benchmarks. Each one has two lines. The first line is the original Since the latest nightly has switched from jemalloc to the system allocator, I had to manually set the allocator to jemalloc.
Previous commit (
Last commit (
The last commit is using the optimization idea from this comment. I've run out of ideas on how to make this faster, but keep in mind that task bookkeeping has some cost so there won't be free lunch. |
There is a lot going on in this PR which makes it hard to really review and iterate on. Would it be terribly hard to split it into a PR that does the rename from |
`inner` is a fitting name for variables of type named `Inner`, but in other cases I find them confusing - sometimes `inner` refers to a `Pool`, sometimes to a `Sender`. I renamed a bunch of variables named `inner` to be more descriptive. This PR is the first step in an effort of splitting #722 (comment) into multiple PRs.
@carllerche There we go - this PR now deals with the set of owned tasks only. |
// This is only used to know when to single `shutdown_task` once the | ||
// shutdown process has completed. | ||
pub num_workers: AtomicUsize, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just an unused field from the times before ShutdownTrigger
so I'm removing it.
@carllerche Just a ping for review. :) |
The logic seems correct.... I'm a little wary of this patch as it adds non trivial amounts of work in the hot path... I wonder if we can step back for a second and think if there is another way handle this, perhaps w/ larger changes in design that we could tackle at a later point as we try to clean up / simplify the pool. What does Go do to handle this case? |
Go simply uses the garbage collector to eventually free all tasks. I've found a design that doesn't use |
@stjepang what refactoring are you waiting on? |
All right, I did some refactoring and now this PR is ready for review. Originally we used a In order to add a task to the registry, we push it into the Each
When shutting down the pool, we go through each worker, drain Benchmarks ( name before ns/iter after ns/iter diff ns/iter diff % speedup
threadpool::spawn_many 3,032,575 3,082,088 49,513 1.63% x 0.98
threadpool::yield_many 10,403,187 10,535,910 132,723 1.28% x 0.99 We're still not as fast as the master branch. This is 1-2% slowdown in a synthetic benchmark with tasks that don't do any work, which means the real-world impact should be smaller that that. Furthermore, considering the |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just some initial thoughts.
tokio-threadpool/src/task/mod.rs
Outdated
/// ID of the worker that polled this task first. | ||
pub reg_worker: AtomicUsize, | ||
|
||
/// Position of this task in the `Vec` inside the `Registry` it was registerd in. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
registerd
-> registered
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
let index = task.reg_index.load(Ordering::Acquire); | ||
assert!(Arc::ptr_eq(&self.tasks[index], task)); | ||
|
||
self.tasks.swap_remove(index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it be faster here to use Slab
instead of Vec
? That gives you the invariant of the index never changing for a task once its been registered, and eliminates that code entirely.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, I think not having to update reg_index
after reassignment also means you can avoid needing an atomic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, the AtomicUsize
can actually be turned into a Cell
, which I've just done.
Also, I've just tried replacing Vec
with Slab
and it seems to improve things a little bit. Thanks for the advice!
Benchmarks after replacing name before ns/iter after ns/iter diff ns/iter diff % speedup
threadpool::spawn_many 3,029,691 3,075,504 45,813 1.51% x 0.99
threadpool::yield_many 10,250,197 10,315,280 65,083 0.63% x 0.99 The numbers change quite a bit from run to run, but it seems we've got some improvement. This also simplified the code and reduced the number of changed LOC. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 Looks good. I really like the shutdown trigger strategy for coordinating.
I left some questions inline.
/// | ||
/// The worker ID is represented by a `u32` rather than `usize` in order to save some space | ||
/// on 64-bit platforms. | ||
pub reg_worker: Cell<Option<u32>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like this should be UnsafeCell
? I'm surprised that Cell
works. It must be because Task
is !Send
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, Task
is Send
because it can be stolen and moved between worker threads.
What is important here is that Task
has Sync
and !Sync
parts:
state
isSync
because it may be concurrently written to from multiple threads, e.g. during notification.future
is!Sync
because it's only ever used by the thread which is currently executing it.reg_worker
is!Sync
for the same reason.
It seems to me that reg_worker
can be a Cell
here just fine, since it's never accessed concurrently. Is my understanding correct? Otherwise, I can switch to UnsafeCell
, it's no problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It can be safely called, it was more of a question of how to write idiomatic unsafe rust code :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm very interested in the answer to that question, too... this kind of pattern occurs in plenty of places.
unpark: UnsafeCell<Option<BoxUnpark>>, | ||
|
||
// Tasks that have been first polled by this worker, but not completed yet. | ||
running_tasks: UnsafeCell<Slab<Arc<Task>>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with using a Slab
here is that the memory will grow to hold the max number of tasks that were concurrently running, but never shrink after. Slabs can't easily shrink due to their design.
Do you have thoughts on this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's hard to say for me whether that's really a problem in practice or not. This Slab
should be pretty compact since Arc<Task>
is just a single word. If you're worried about it, I can switch back to Vec
and we can shrink the underlying buffer when it gets less than half full.
tokio-threadpool/src/worker/entry.rs
Outdated
/// Called when the task is completed by another worker and was previously registered in this | ||
/// worker. | ||
#[inline] | ||
pub fn completed_task(&self, task: Arc<Task>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: The name of this function confused me.
The name should probably be a verb and indicate that this is only called when not on the home worker. Perhaps remotely_complete_task
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that one, sure!
tokio-threadpool/src/worker/entry.rs
Outdated
running_tasks: UnsafeCell<Slab<Arc<Task>>>, | ||
|
||
// Tasks that have been first polled by this worker, but completed by another worker. | ||
completed_tasks: SegQueue<Arc<Task>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: maybe name remotely_completed_tasks
The latest comments have been addressed. For posterity sake, here are benchmark results when we use
And here's
|
Motivation
When the thread pool shuts down, futures that have been polled at least once but not completed yet are simply leaked. We should drop them instead.
Solution
Multiple changes are introduced:
Tasks are assigned a home worker the first time they are polled.
Each worker contains a set of tasks (
Arc<Task>
) it is home to. When a task is assigned a home worker, it is registered in that worker's set of tasks. When the task is completed, it is unregistered from the set.When the thread pool shuts down and after all worker threads stop, the remaining tasks in workers' sets are aborted, i.e. they are switched to the
Aborted
state and theirFuture
s are dropped.The thread pool shutdown process is refactored to make it more robust. We don't track the number of active threads manually anymore. Instead, there's
Arc<ShutdownTrigger>
that aborts remaining tasks and completes theShutdown
future once it gets destroyed (when allWorker
s andThreadPool
get dropped because they're the only ones to contain strong references to theShutdownTrigger
).Closes #424
Closes #428