Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
fix: avoid deadlock waiting for Session allocation (#1170)
Browse files Browse the repository at this point in the history
* fix: avoid deadlock waiting for Session allocation

Track the number of threads waiting for a `Session` to ensure we always
notify properly; the prior implementation had race conditions that
could lead to a thread waiting forever even when a `Session` was
available (see #1000 for details).

I also removed the workaround for this bug in the benchmark. With
the workaround removed, I ran the benchmark using the flags below,
both with and without the `SessionPool` fix. Without the fix it
hangs reliably, with the fix it never hangs.

```
.build/google/cloud/spanner/benchmarks/single_row_throughput_benchmark \
  --project=${GOOGLE_CLOUD_PROJECT} \
  --instance=${GOOGLE_CLOUD_CPP_SPANNER_INSTANCE} \
  --table-size=1000 \
  --minimum-clients=1 --maximum-clients=1 \
  --minimum-threads=1000 --maximum-threads=1000 \
  --iteration-duration=2 --samples=100 \
  --experiment=read
```

fixes #1000

* Factor "Wait" logic out to a private member function.
  • Loading branch information
mr-salty authored Jan 8, 2020
1 parent 48494ad commit e33b4f8
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,10 +213,8 @@ void FillTable(Config const& config, cloud_spanner::Database const& database,
}

int ClientCount(Config const& config,
google::cloud::internal::DefaultPRNG& generator,
int thread_count) {
// TODO(#1000) - avoid deadlocks with more than 100 threads per client
auto min_clients = (std::max)(thread_count / 100 + 1, config.minimum_clients);
google::cloud::internal::DefaultPRNG& generator) {
auto min_clients = config.minimum_clients;
auto const max_clients = config.maximum_clients;
if (min_clients <= max_clients) {
return min_clients;
Expand Down Expand Up @@ -249,7 +247,7 @@ class InsertOrUpdateExperiment : public Experiment {

for (int i = 0; i != config.samples; ++i) {
auto const thread_count = thread_count_gen(generator);
auto const client_count = ClientCount(config, generator, thread_count);
auto const client_count = ClientCount(config, generator);
std::vector<cloud_spanner::Client> iteration_clients(
clients.begin(), clients.begin() + client_count);
RunIteration(config, iteration_clients, thread_count, sink, generator);
Expand Down Expand Up @@ -354,7 +352,7 @@ class ReadExperiment : public Experiment {

for (int i = 0; i != config.samples; ++i) {
auto const thread_count = thread_count_gen(generator_);
auto const client_count = ClientCount(config, generator_, thread_count);
auto const client_count = ClientCount(config, generator_);
std::vector<cloud_spanner::Client> iteration_clients(
clients.begin(), clients.begin() + client_count);
RunIteration(config, iteration_clients, thread_count, sink);
Expand Down Expand Up @@ -466,7 +464,7 @@ class UpdateDmlExperiment : public Experiment {

for (int i = 0; i != config.samples; ++i) {
auto const thread_count = thread_count_gen(generator);
auto const client_count = ClientCount(config, generator, thread_count);
auto const client_count = ClientCount(config, generator);
std::vector<cloud_spanner::Client> iteration_clients(
clients.begin(), clients.begin() + client_count);
RunIteration(config, iteration_clients, thread_count, sink, generator);
Expand Down Expand Up @@ -581,7 +579,7 @@ class SelectExperiment : public Experiment {

for (int i = 0; i != config.samples; ++i) {
auto const thread_count = thread_count_gen(generator_);
auto const client_count = ClientCount(config, generator_, thread_count);
auto const client_count = ClientCount(config, generator_);
std::vector<cloud_spanner::Client> iteration_clients(
clients.begin(), clients.begin() + client_count);
RunIteration(config, iteration_clients, thread_count, sink);
Expand Down
9 changes: 3 additions & 6 deletions google/cloud/spanner/internal/session_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
if (options_.action_on_exhaustion() == ActionOnExhaustion::FAIL) {
return Status(StatusCode::kResourceExhausted, "session pool exhausted");
}
cond_.wait(lk, [this] {
Wait(lk, [this] {
return !sessions_.empty() || total_sessions_ < max_pool_size_;
});
continue;
Expand All @@ -123,8 +123,7 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
// simulaneous calls if additional sessions are needed. We can also use the
// number of waiters in the `sessions_to_create` calculation below.
if (create_in_progress_) {
cond_.wait(lk,
[this] { return !sessions_.empty() || !create_in_progress_; });
Wait(lk, [this] { return !sessions_.empty() || !create_in_progress_; });
continue;
}

Expand Down Expand Up @@ -166,10 +165,8 @@ void SessionPool::Release(std::unique_ptr<Session> session) {
--total_sessions_;
return;
}
bool notify = sessions_.empty();
sessions_.push_back(std::move(session));
// If sessions_ was empty, wake up someone who was waiting for a session.
if (notify) {
if (num_waiting_for_session_ > 0) {
lk.unlock();
cond_.notify_one();
}
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/spanner/internal/session_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,15 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
// Release session back to the pool.
void Release(std::unique_ptr<Session> session);

// Called when a thread needs to wait for a `Session` to become available.
// @p specifies the condition to wait for.
template <typename Predicate>
void Wait(std::unique_lock<std::mutex>& lk, Predicate&& p) {
++num_waiting_for_session_;
cond_.wait(lk, std::forward<Predicate>(p));
--num_waiting_for_session_;
}

Status CreateSessions(std::unique_lock<std::mutex>& lk, ChannelInfo& channel,
std::map<std::string, std::string> const& labels,
int num_sessions); // EXCLUSIVE_LOCKS_REQUIRED(mu_)
Expand All @@ -120,6 +129,7 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
std::vector<std::unique_ptr<Session>> sessions_; // GUARDED_BY(mu_)
int total_sessions_ = 0; // GUARDED_BY(mu_)
bool create_in_progress_ = false; // GUARDED_BY(mu_)
int num_waiting_for_session_ = 0; // GUARDED_BY(mu_)

// `channels_` is guaranteed to be non-empty and will not be resized after
// the constructor runs (so the iterators are guaranteed to always be valid).
Expand Down

0 comments on commit e33b4f8

Please sign in to comment.