Skip to content

Commit

Permalink
fix SnapshotSync::wait_for_setup for multiple callers
Browse files Browse the repository at this point in the history
  • Loading branch information
battlmonstr committed Sep 5, 2024
1 parent 832d4e0 commit 60191f9
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 9 deletions.
11 changes: 8 additions & 3 deletions silkworm/db/snapshot_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,17 @@ Task<void> SnapshotSync::setup() {
// Set snapshot repository into snapshot-aware database access
db::DataModel::set_snapshot_repository(&repository_);

setup_done_promise_.set_value(true);
std::scoped_lock lock{setup_done_mutex_};
setup_done_ = true;
setup_done_cond_var_.notify_all();
}

Task<void> SnapshotSync::wait_for_setup() {
auto future = setup_done_promise_.get_future();
co_await future.get_async();
std::unique_lock lock{setup_done_mutex_};
if (setup_done_) co_return;
auto waiter = setup_done_cond_var_.waiter();
lock.unlock();
co_await waiter();
}

Task<void> SnapshotSync::download_and_index_snapshots() {
Expand Down
8 changes: 6 additions & 2 deletions silkworm/db/snapshot_sync.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

#pragma once

#include <atomic>
#include <filesystem>
#include <functional>
#include <latch>
#include <mutex>
#include <string>
#include <thread>
#include <vector>
Expand All @@ -37,7 +39,7 @@
#include <silkworm/db/snapshots/snapshot_repository.hpp>
#include <silkworm/db/snapshots/snapshot_settings.hpp>
#include <silkworm/db/stage_scheduler.hpp>
#include <silkworm/infra/concurrency/awaitable_future.hpp>
#include <silkworm/infra/concurrency/awaitable_condition_variable.hpp>
#include <silkworm/infra/concurrency/stoppable.hpp>

namespace silkworm::db {
Expand Down Expand Up @@ -81,7 +83,9 @@ class SnapshotSync {
db::SnapshotMerger snapshot_merger_;

std::latch is_stopping_latch_;
concurrency::AwaitablePromise<bool> setup_done_promise_;
std::atomic_bool setup_done_;
concurrency::AwaitableConditionVariable setup_done_cond_var_;
std::mutex setup_done_mutex_;
};

} // namespace silkworm::db
6 changes: 3 additions & 3 deletions silkworm/infra/concurrency/awaitable_condition_variable.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,18 +32,18 @@ A simplified condition variable similar to Rust Tokio Notify:
It supports multiple waiters unlike EventNotifier.
Synchronize waiter()/notify_all() calls with your producer state to avoid a deadlock.
It happens if the producer readiness is able to be updated right before calling waiter().
It happens if the producer readiness updates right before calling waiter().
Example:
// consumers
std::scoped_lock lock(mutex_);
std::unique_lock lock{mutex_};
if (ready_) co_return;
auto waiter = cond_var.waiter();
lock.unlock();
co_await waiter();
// producer
std::scoped_lock lock(mutex_);
std::scoped_lock lock{mutex_};
ready_ = true;
cond_var.notify_all();
Expand Down
10 changes: 9 additions & 1 deletion silkworm/infra/concurrency/awaitable_future.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include <atomic>
#include <optional>
#include <stdexcept>
#include <utility>
Expand Down Expand Up @@ -99,7 +100,13 @@ class AwaitablePromise {
set(ptr, std::nullopt);
}

AwaitableFuture<T> get_future() { return AwaitableFuture<T>(channel_); }
AwaitableFuture<T> get_future() {
bool expected{false};
bool was_unsubscribed = subscribed_.compare_exchange_strong(expected, true);
if (!was_unsubscribed)
throw std::runtime_error("AwaitablePromise::get_future can't be called multiple times");
return AwaitableFuture<T>(channel_);
}

class AlreadySatisfiedError : public std::runtime_error {
public:
Expand All @@ -117,6 +124,7 @@ class AwaitablePromise {
}

std::shared_ptr<AsyncChannel> channel_;
std::atomic_bool subscribed_;
};

} // namespace silkworm::concurrency

0 comments on commit 60191f9

Please sign in to comment.