From 60191f9843dbdecd5c1a9114757a6a8d7e891b8a Mon Sep 17 00:00:00 2001 From: battlmonstr Date: Thu, 5 Sep 2024 10:28:58 +0200 Subject: [PATCH] fix SnapshotSync::wait_for_setup for multiple callers --- silkworm/db/snapshot_sync.cpp | 11 ++++++++--- silkworm/db/snapshot_sync.hpp | 8 ++++++-- .../concurrency/awaitable_condition_variable.hpp | 6 +++--- silkworm/infra/concurrency/awaitable_future.hpp | 10 +++++++++- 4 files changed, 26 insertions(+), 9 deletions(-) diff --git a/silkworm/db/snapshot_sync.cpp b/silkworm/db/snapshot_sync.cpp index 18fa3760be..4e5d77cda4 100644 --- a/silkworm/db/snapshot_sync.cpp +++ b/silkworm/db/snapshot_sync.cpp @@ -117,12 +117,17 @@ Task SnapshotSync::setup() { // Set snapshot repository into snapshot-aware database access db::DataModel::set_snapshot_repository(&repository_); - setup_done_promise_.set_value(true); + std::scoped_lock lock{setup_done_mutex_}; + setup_done_ = true; + setup_done_cond_var_.notify_all(); } Task SnapshotSync::wait_for_setup() { - auto future = setup_done_promise_.get_future(); - co_await future.get_async(); + std::unique_lock lock{setup_done_mutex_}; + if (setup_done_) co_return; + auto waiter = setup_done_cond_var_.waiter(); + lock.unlock(); + co_await waiter(); } Task SnapshotSync::download_and_index_snapshots() { diff --git a/silkworm/db/snapshot_sync.hpp b/silkworm/db/snapshot_sync.hpp index ce05ce540c..cadfbe79bb 100644 --- a/silkworm/db/snapshot_sync.hpp +++ b/silkworm/db/snapshot_sync.hpp @@ -16,9 +16,11 @@ #pragma once +#include #include #include #include +#include #include #include #include @@ -37,7 +39,7 @@ #include #include #include -#include +#include #include namespace silkworm::db { @@ -81,7 +83,9 @@ class SnapshotSync { db::SnapshotMerger snapshot_merger_; std::latch is_stopping_latch_; - concurrency::AwaitablePromise setup_done_promise_; + std::atomic_bool setup_done_; + concurrency::AwaitableConditionVariable setup_done_cond_var_; + std::mutex setup_done_mutex_; }; } // namespace silkworm::db diff --git a/silkworm/infra/concurrency/awaitable_condition_variable.hpp b/silkworm/infra/concurrency/awaitable_condition_variable.hpp index 31d7160c13..32f4015b2a 100644 --- a/silkworm/infra/concurrency/awaitable_condition_variable.hpp +++ b/silkworm/infra/concurrency/awaitable_condition_variable.hpp @@ -32,18 +32,18 @@ A simplified condition variable similar to Rust Tokio Notify: It supports multiple waiters unlike EventNotifier. Synchronize waiter()/notify_all() calls with your producer state to avoid a deadlock. -It happens if the producer readiness is able to be updated right before calling waiter(). +It happens if the producer readiness updates right before calling waiter(). Example: // consumers - std::scoped_lock lock(mutex_); + std::unique_lock lock{mutex_}; if (ready_) co_return; auto waiter = cond_var.waiter(); lock.unlock(); co_await waiter(); // producer - std::scoped_lock lock(mutex_); + std::scoped_lock lock{mutex_}; ready_ = true; cond_var.notify_all(); diff --git a/silkworm/infra/concurrency/awaitable_future.hpp b/silkworm/infra/concurrency/awaitable_future.hpp index 07895153a9..626002d914 100644 --- a/silkworm/infra/concurrency/awaitable_future.hpp +++ b/silkworm/infra/concurrency/awaitable_future.hpp @@ -16,6 +16,7 @@ #pragma once +#include #include #include #include @@ -99,7 +100,13 @@ class AwaitablePromise { set(ptr, std::nullopt); } - AwaitableFuture get_future() { return AwaitableFuture(channel_); } + AwaitableFuture get_future() { + bool expected{false}; + bool was_unsubscribed = subscribed_.compare_exchange_strong(expected, true); + if (!was_unsubscribed) + throw std::runtime_error("AwaitablePromise::get_future can't be called multiple times"); + return AwaitableFuture(channel_); + } class AlreadySatisfiedError : public std::runtime_error { public: @@ -117,6 +124,7 @@ class AwaitablePromise { } std::shared_ptr channel_; + std::atomic_bool subscribed_; }; } // namespace silkworm::concurrency