diff --git a/eden/common/utils/ProcessInfoCache.cpp b/eden/common/utils/ProcessInfoCache.cpp index 10d8ee1..2b25227 100644 --- a/eden/common/utils/ProcessInfoCache.cpp +++ b/eden/common/utils/ProcessInfoCache.cpp @@ -9,6 +9,7 @@ #include #include +#include #include #include @@ -22,10 +23,11 @@ namespace detail { class ProcessInfoNode { public: ProcessInfoNode( - folly::SemiFuture info, + std::shared_ptr> info, std::chrono::steady_clock::time_point d, ProcessInfoCache::Clock& clock) : info_{std::move(info)}, + quickAccessToInfo_{info_->getSemiFuture()}, lastAccess_{d.time_since_epoch()}, clock_{clock} {} @@ -36,7 +38,21 @@ class ProcessInfoNode { lastAccess_.store(now.time_since_epoch(), std::memory_order_release); } - folly::SemiFuture info_; + /** + * If the caller would like to wait for the process info to be available, + * we need to get a new future out of info_ and wait on that future. A future + * can only be waited on once because waiting is like adding a callback + * and each future can only have one callback. + */ + std::shared_ptr> info_; + + /** + * If the caller does not care to wait for the data to be ready, we can + * attempt to retrieve the value from this future. No callbacks can be added + * to this future. We can only attempt to check for an available value and + * read a ready value out. + */ + folly::SemiFuture quickAccessToInfo_; mutable std::atomic lastAccess_; ProcessInfoCache::Clock& clock_; }; @@ -107,19 +123,22 @@ const ProcessInfo* ProcessInfoHandle::get_optional() const { XCHECK(node_) << "attempting to use moved-from ProcessInfoHandle"; auto now = node_->clock_.now(); node_->recordAccess(now); - return node_->info_.isReady() ? &node_->info_.value() : nullptr; + return node_->quickAccessToInfo_.isReady() + ? &node_->quickAccessToInfo_.value() + : nullptr; } ProcessInfo ProcessInfoHandle::get() const { XCHECK(node_) << "attempting to use moved-from ProcessInfoHandle"; auto now = node_->clock_.now(); node_->recordAccess(now); - node_->info_.wait(); - return node_->info_.value(); + auto future = node_->info_->getSemiFuture(); + future.wait(); + return future.value(); } const folly::SemiFuture& ProcessInfoHandle::future() const { - return node_->info_; + return node_->quickAccessToInfo_; } ProcessInfoCache::ProcessInfoCache( @@ -158,10 +177,10 @@ ProcessInfoHandle ProcessInfoCache::lookup(pid_t pid) { return ProcessInfoHandle{*nodep}; } - auto [p, f] = folly::makePromiseContract(); - state->lookupQueue.emplace_back(pid, std::move(p)); + auto p = std::make_shared>(); + state->lookupQueue.emplace_back(pid, p); auto node = - std::make_shared(std::move(f), now, clock_); + std::make_shared(std::move(p), now, clock_); state->infos.emplace(pid, node); threadLocalCache_.put(pid, node); state.unlock(); @@ -223,10 +242,10 @@ void ProcessInfoCache::add(pid_t pid) { return std::nullopt; }, [&](auto& wlock) -> folly::Unit { - auto [p, f] = folly::makePromiseContract(); - wlock->lookupQueue.emplace_back(pid, std::move(p)); + auto p = std::make_shared>(); + wlock->lookupQueue.emplace_back(pid, p); auto node = std::make_shared( - std::move(f), now, clock_); + std::move(p), now, clock_); wlock->infos.emplace(pid, node); threadLocalCache_.put(pid, std::move(node)); @@ -279,7 +298,9 @@ void ProcessInfoCache::clearExpired( void ProcessInfoCache::workerThread() { // Double-buffered work queues. - std::vector>> lookupQueue; + std::vector< + std::pair>>> + lookupQueue; std::vector>> getAllQueue; // Allows periodic flushing of the expired infos without quadratic-time @@ -333,7 +354,7 @@ void ProcessInfoCache::workerThread() { // As described in ProcessInfoCache::add() above, it is critical this work // be done outside of the state lock. for (auto& [pid, p] : lookupQueue) { - p.setWith([this, pid_2 = pid] { return readInfo_(pid_2); }); + p->setWith([this, pid_2 = pid] { return readInfo_(pid_2); }); } auto now = clock_.now(); @@ -358,7 +379,7 @@ void ProcessInfoCache::workerThread() { auto state = state_.wlock(); clearExpired(now, *state); for (const auto& [pid, info] : state->infos) { - auto& fut = info->info_; + auto& fut = info->quickAccessToInfo_; if (fut.isReady() && fut.hasValue()) { allProcessInfos[pid] = fut.value(); } @@ -375,8 +396,8 @@ void ProcessInfoCache::workerThread() { std::optional ProcessInfoCache::getProcessInfo(pid_t pid) { auto state = state_.rlock(); if (auto* nodep = folly::get_ptr(state->infos, pid)) { - if ((*nodep)->info_.isReady()) { - return (*nodep)->info_.value(); + if ((*nodep)->quickAccessToInfo_.isReady()) { + return (*nodep)->quickAccessToInfo_.value(); } } return std::nullopt; diff --git a/eden/common/utils/ProcessInfoCache.h b/eden/common/utils/ProcessInfoCache.h index 7b55ba6..142ae86 100644 --- a/eden/common/utils/ProcessInfoCache.h +++ b/eden/common/utils/ProcessInfoCache.h @@ -22,6 +22,11 @@ #include "eden/common/utils/ProcessInfo.h" +namespace folly { +template +class SharedPromise; +} + namespace facebook::eden { class FaultInjector; @@ -166,7 +171,9 @@ class ProcessInfoCache { // The following queues are intentionally unbounded. add() cannot block. // TODO: We could set a high limit on the length of the queue and drop // requests if necessary. - std::vector>> lookupQueue; + std::vector< + std::pair>>> + lookupQueue; std::vector>> getAllQueue; }; diff --git a/eden/common/utils/test/ProcessInfoCacheTest.cpp b/eden/common/utils/test/ProcessInfoCacheTest.cpp index 46d8084..1e8f9ad 100644 --- a/eden/common/utils/test/ProcessInfoCacheTest.cpp +++ b/eden/common/utils/test/ProcessInfoCacheTest.cpp @@ -184,32 +184,28 @@ TEST(ProcessInfoCache, multipleLookups) { faultInjector.injectBlock("ProcessInfoCache::workerThread", ".*"); auto info1 = processInfoCache.lookup(getpid()); + auto info2 = processInfoCache.lookup(getpid()); ASSERT_TRUE( faultInjector.waitUntilBlocked("ProcessInfoCache::workerThread", 1s)); - // auto info2 = processInfoCache.lookup(getpid()); - // Assumption: these should share the same node since they are cached and // worker could not have made any progress yet. - // ASSERT_EQ(info1.node_.get(), info2.node_.get()); + ASSERT_EQ(info1.node_.get(), info2.node_.get()); - // currently if we run both threads it cause a crash sometimes. because both - // threads are calling wait() at the same time which can cause multiple - // callbacks to be added to the same future core. auto thread1 = std::thread{[info = std::move(info1)] { folly::setThreadName("info1"); EXPECT_NE("", info.get().name); }}; - // auto thread2 = std::thread{[info = std::move(info2)] { - // folly::setThreadName("info2"); - // EXPECT_NE("", info.get().name); - // }}; + auto thread2 = std::thread{[info = std::move(info2)] { + folly::setThreadName("info2"); + EXPECT_NE("", info.get().name); + }}; faultInjector.removeFault("ProcessInfoCache::workerThread", ".*"); faultInjector.unblock("ProcessInfoCache::workerThread", ".*"); thread1.join(); - // thread2.join(); + thread2.join(); } } // namespace facebook::eden