Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Concurrent deadlock #621

Merged
merged 2 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 55 additions & 14 deletions eventuals/concurrent.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,14 +70,7 @@ struct _Concurrent final {
// that have completed but haven't yet been pruned (see
// 'CreateOrReuseFiber()').
struct TypeErasedFiber {
void Reuse() {
done = false;
// Need to reinitialize the interrupt so that the
// previous eventual that registered with this
// interrupt won't get invoked as a handler!
interrupt.~Interrupt();
new (&interrupt) class Interrupt();
}
virtual bool Reuse() = 0;

virtual ~TypeErasedFiber() = default;

Expand Down Expand Up @@ -178,18 +171,18 @@ struct _Concurrent final {
if (!fibers_) {
fibers_.reset(CreateFiber());
fiber = fibers_.get();
} else if (fibers_->done) {
// Need to release next before we reset so it
// doesn't get deallocated as part of reset.
fibers_.reset(fibers_->next.release());
} else {
fiber = fibers_.get();
CHECK_NOTNULL(fiber);
for (;;) {
if (fiber->done) {
fiber->Reuse();
if (fiber->Reuse()) {
break;
} else if (!fiber->next) {
// TODO(benh): we will create an "infinite"
// number of fibers if none are ever done, we
// should consider adding some max number of
// concurrency and then never create more than
// that.
fiber->next.reset(CreateFiber());
fiber = fiber->next.get();
break;
Expand Down Expand Up @@ -290,6 +283,12 @@ struct _Concurrent final {
Eventual<void>()
.context(std::move(stopped_or_error))
.start([this, fiber](auto& /* stopped_or_error */, auto& k) {
CHECK_EQ(
&fiber->context.value(),
Scheduler::Context::Get().get());
CHECK(fiber->context->in_use())
<< "Context: " << fiber->context->name();
onelxj marked this conversation as resolved.
Show resolved Hide resolved

fiber->done = true;

fibers_done_ = FibersDone();
Expand All @@ -305,6 +304,12 @@ struct _Concurrent final {
auto& stopped_or_error,
auto& k,
auto&& error) {
CHECK_EQ(
&fiber->context.value(),
Scheduler::Context::Get().get());
CHECK(fiber->context->in_use())
<< "Context: " << fiber->context->name();

fiber->done = true;

if (!stopped_or_error->has_value()) {
Expand All @@ -322,6 +327,12 @@ struct _Concurrent final {
k.Start(); // Exits the synchronized block!
})
.stop([this, fiber](auto& stopped_or_error, auto& k) {
CHECK_EQ(
&fiber->context.value(),
Scheduler::Context::Get().get());
CHECK(fiber->context->in_use())
<< "Context: " << fiber->context->name();

fiber->done = true;

if (!stopped_or_error->has_value()) {
Expand Down Expand Up @@ -474,6 +485,34 @@ struct _Concurrent final {
// start for each upstream value.
template <typename E_>
struct Fiber : TypeErasedFiber {
bool Reuse() override {
if (!done) {
return false;
}

CHECK(context.has_value());

if (context->in_use() || context->blocked()) {
return false;
}

done = false;

// Need to reinitialize the interrupt so that the
// previous eventual that registered with this
// interrupt won't get invoked as a handler!
interrupt.~Interrupt();
new (&interrupt) class Interrupt();

// We should reset 'k' before 'context', because 'k' may contain
// a borrowed reference to 'context', which may lead to a deadlock.
k.reset();
onelxj marked this conversation as resolved.
Show resolved Hide resolved

context.reset();

return true;
}

using K = decltype(Build(std::declval<E_>()));
std::optional<K> k;
};
Expand Down Expand Up @@ -523,6 +562,8 @@ struct _Concurrent final {
fiber->context->scheduler()->Submit(
[fiber]() {
CHECK_EQ(&fiber->context.value(), Scheduler::Context::Get().get());
CHECK(fiber->context->in_use())
<< "Context: " << fiber->context->name();
static_cast<Fiber<E>*>(fiber)->k->Register(fiber->interrupt);
static_cast<Fiber<E>*>(fiber)->k->Start();
},
Expand Down
3 changes: 3 additions & 0 deletions eventuals/event-loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,8 @@ void EventLoop::Check() {

context->unblock();

context->use();

stout::borrowed_ref<Context> previous =
Context::Switch(std::move(waiter->context).reference());

Expand All @@ -350,6 +352,7 @@ void EventLoop::Check() {
////////////////////////////////////////////////////

CHECK_EQ(context, Context::Switch(std::move(previous)).get());
context->unuse();
}
} while (waiter != nullptr);
}
Expand Down
6 changes: 6 additions & 0 deletions eventuals/event-loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -1216,9 +1216,11 @@ struct _EventLoopSchedule final {
if (loop()->InEventLoop()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Start(std::forward<Args>(args)...);
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
if constexpr (!std::is_void_v<Arg_>) {
arg_.emplace(std::forward<Args>(args)...);
Expand Down Expand Up @@ -1246,9 +1248,11 @@ struct _EventLoopSchedule final {
if (loop()->InEventLoop()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Fail(std::forward<Error>(error));
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
// TODO(benh): avoid allocating on heap by storing args in
// pre-allocated buffer based on composing with Errors.
Expand Down Expand Up @@ -1280,9 +1284,11 @@ struct _EventLoopSchedule final {
if (loop()->InEventLoop()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Stop();
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
loop()->Submit(
this->Borrow([this]() {
Expand Down
5 changes: 5 additions & 0 deletions eventuals/grpc/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ class ServerReader {
};
}

EVENTUALS_GRPC_LOG(1)
<< "Reading requests for call (" << context_ << ")"
<< " for host = " << context_->host()
<< " and path = " << context_->method();

context_->stream()->Read(&data.buffer, &callback);
});
}
Expand Down
10 changes: 10 additions & 0 deletions eventuals/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,19 @@ class DefaultScheduler final : public Scheduler {
EVENTUALS_LOG(1)
<< "'" << context.name() << "' preempted '" << previous->name() << "'";

context.use();

callback();

CHECK_EQ(&context, Context::Switch(std::move(previous)).get());

context.unuse();
EVENTUALS_LOG(1)
<< "'"
<< Context::Get()->name()
<< "' switched back with '"
<< context.name()
<< "'";
}

void Clone(Context& context) override {
Expand Down
19 changes: 19 additions & 0 deletions eventuals/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ class Scheduler {
return blocked_;
}

bool in_use() const {
return in_use_.load() != 0;
}

const std::string& name() const {
return name_;
}
Expand All @@ -114,8 +118,10 @@ class Scheduler {
void Continue(F&& f) {
if (scheduler()->Continuable(*this)) {
auto previous = Switch(Borrow());
use();
f();
Switch(std::move(previous));
unuse();
} else {
scheduler()->Submit(std::forward<F>(f), *this);
}
Expand All @@ -125,13 +131,24 @@ class Scheduler {
void Continue(F&& f, G&& g) {
if (scheduler()->Continuable(*this)) {
auto previous = Switch(Borrow());
use();
f();
Switch(std::move(previous));
unuse();
} else {
scheduler()->Submit(g(), *this);
}
}

void use() {
in_use_.fetch_add(1);
}

void unuse() {
CHECK(in_use_.load() > 0);
in_use_.fetch_sub(1);
}

// For schedulers that need to store arbitrary data.
void* data = nullptr;

Expand All @@ -144,6 +161,8 @@ class Scheduler {

Scheduler* scheduler_ = nullptr;

std::atomic<int> in_use_ = 0;

// There is the most common set of variables to create contexts.
bool blocked_ = false;

Expand Down
4 changes: 4 additions & 0 deletions eventuals/static-thread-pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ StaticThreadPool::StaticThreadPool()

context->unblock();

context->use();

stout::borrowed_ref<Context> previous =
Context::Switch(std::move(waiter->context).reference());

Expand All @@ -92,6 +94,8 @@ StaticThreadPool::StaticThreadPool()
////////////////////////////////////////////////////

CHECK_EQ(context, Context::Switch(std::move(previous)).get());

context->unuse();
}
} while (!shutdown_.load());
});
Expand Down
13 changes: 13 additions & 0 deletions eventuals/static-thread-pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,9 +242,11 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Start(std::forward<Args>(args)...);
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
if constexpr (!std::is_void_v<Arg_>) {
arg_.emplace(std::forward<Args>(args)...);
Expand Down Expand Up @@ -290,9 +292,12 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Fail(std::forward<Error>(error));
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
;
} else {
// TODO(benh): avoid allocating on heap by storing args in
// pre-allocated buffer based on composing with Errors.
Expand Down Expand Up @@ -341,9 +346,11 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Stop();
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
EVENTUALS_LOG(1)
<< "Schedule submitting '" << context_->name() << "'";
Expand Down Expand Up @@ -378,9 +385,11 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Begin(*CHECK_NOTNULL(stream_));
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
EVENTUALS_LOG(1)
<< "Schedule submitting '" << context_->name() << "'";
Expand Down Expand Up @@ -417,9 +426,11 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Body(std::forward<Args>(args)...);
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
if constexpr (!std::is_void_v<Arg_>) {
arg_.emplace(std::forward<Args>(args)...);
Expand Down Expand Up @@ -464,9 +475,11 @@ struct _StaticThreadPoolSchedule final {
if (StaticThreadPool::member && StaticThreadPool::cpu == pinned.cpu()) {
Adapt();
auto previous = Scheduler::Context::Switch(context_->Borrow());
context_->use();
adapted_->Ended();
previous = Scheduler::Context::Switch(std::move(previous));
CHECK_EQ(previous.get(), context_.get());
context_->unuse();
} else {
EVENTUALS_LOG(1)
<< "Schedule submitting '" << context_->name() << "'";
Expand Down
14 changes: 13 additions & 1 deletion test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,18 @@ cc_library(
],
)

cc_library(
name = "event-loop-test",
hdrs = [
"event-loop-test.h",
],
visibility = ["//visibility:public"],
deps = [
"//eventuals",
"@com_github_google_googletest//:gtest",
],
)

cc_library(
name = "http-mock-server",
testonly = True,
Expand All @@ -52,7 +64,6 @@ cc_test(
"control-loop.cc",
"dns-resolver.cc",
"do-all.cc",
"event-loop-test.h",
"eventual.cc",
"executor.cc",
"expected.cc",
Expand Down Expand Up @@ -97,6 +108,7 @@ cc_test(
":generate-test-task-name",
":http-mock-server",
":promisify-for-test",
":event-loop-test",
"//eventuals",
"//test/concurrent",
"@com_github_google_googletest//:gtest_main",
Expand Down
Loading
Loading