Skip to content

Commit

Permalink
make ProcessInfoCache thread safe
Browse files Browse the repository at this point in the history
Summary:
SemiFuture::wait is setting the callback on a future. https://www.internalfb.com/code/fbsource/[461b4373492f97727d71c8cc9ef563816a491e77]/fbcode/folly/futures/Future-inl.h?lines=2323%2C2329-2332

There may only be one callback per future. And the callback should only be set
once.

This future is saved inside an object that is stored in a cache that
is shared across threads: https://www.internalfb.com/code/fbsource/[805d0ff899c1c8524c10b3aac52a0be9ca262123]/fbcode/eden/common/utils/ProcessInfoCache.cpp?lines=22%2C39%2C157%2C165

That means multiple threads could call wait on the same future concurrently.

Which breaks some SemiFuture invariants and causes crashes like this:
P1009632719 & P1517801272. These crashes are both from Muir's machine since
he seems to repro this crash a lot. Tho we saw other users with these
crashes too IIRC.

Since we want multiple threads to take action when a promise is set.
SharedPromise fits our needs better. SharedPromise can have multiple futures
so multiple threads can wait for the promise to be set concurrently (on
seperate futures).

Reviewed By: MichaelCuevas

Differential Revision: D61479252

fbshipit-source-id: e23192e4090275648d363ce1638ad4f33fd0b3d0
  • Loading branch information
Katie Mancini authored and facebook-github-bot committed Aug 22, 2024
1 parent cffa5b3 commit b165bb6
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 29 deletions.
55 changes: 38 additions & 17 deletions eden/common/utils/ProcessInfoCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include <folly/MapUtil.h>
#include <folly/container/EvictingCacheMap.h>
#include <folly/futures/SharedPromise.h>
#include <folly/logging/xlog.h>
#include <folly/system/ThreadName.h>

Expand All @@ -22,10 +23,11 @@ namespace detail {
class ProcessInfoNode {
public:
ProcessInfoNode(
folly::SemiFuture<ProcessInfo> info,
std::shared_ptr<folly::SharedPromise<ProcessInfo>> info,
std::chrono::steady_clock::time_point d,
ProcessInfoCache::Clock& clock)
: info_{std::move(info)},
quickAccessToInfo_{info_->getSemiFuture()},
lastAccess_{d.time_since_epoch()},
clock_{clock} {}

Expand All @@ -36,7 +38,21 @@ class ProcessInfoNode {
lastAccess_.store(now.time_since_epoch(), std::memory_order_release);
}

folly::SemiFuture<ProcessInfo> info_;
/**
* If the caller would like to wait for the process info to be available,
* we need to get a new future out of info_ and wait on that future. A future
* can only be waited on once because waiting is like adding a callback
* and each future can only have one callback.
*/
std::shared_ptr<folly::SharedPromise<ProcessInfo>> info_;

/**
* If the caller does not care to wait for the data to be ready, we can
* attempt to retrieve the value from this future. No callbacks can be added
* to this future. We can only attempt to check for an available value and
* read a ready value out.
*/
folly::SemiFuture<ProcessInfo> quickAccessToInfo_;
mutable std::atomic<std::chrono::steady_clock::duration> lastAccess_;
ProcessInfoCache::Clock& clock_;
};
Expand Down Expand Up @@ -107,19 +123,22 @@ const ProcessInfo* ProcessInfoHandle::get_optional() const {
XCHECK(node_) << "attempting to use moved-from ProcessInfoHandle";
auto now = node_->clock_.now();
node_->recordAccess(now);
return node_->info_.isReady() ? &node_->info_.value() : nullptr;
return node_->quickAccessToInfo_.isReady()
? &node_->quickAccessToInfo_.value()
: nullptr;
}

ProcessInfo ProcessInfoHandle::get() const {
XCHECK(node_) << "attempting to use moved-from ProcessInfoHandle";
auto now = node_->clock_.now();
node_->recordAccess(now);
node_->info_.wait();
return node_->info_.value();
auto future = node_->info_->getSemiFuture();
future.wait();
return future.value();
}

const folly::SemiFuture<ProcessInfo>& ProcessInfoHandle::future() const {
return node_->info_;
return node_->quickAccessToInfo_;
}

ProcessInfoCache::ProcessInfoCache(
Expand Down Expand Up @@ -158,10 +177,10 @@ ProcessInfoHandle ProcessInfoCache::lookup(pid_t pid) {
return ProcessInfoHandle{*nodep};
}

auto [p, f] = folly::makePromiseContract<ProcessInfo>();
state->lookupQueue.emplace_back(pid, std::move(p));
auto p = std::make_shared<folly::SharedPromise<ProcessInfo>>();
state->lookupQueue.emplace_back(pid, p);
auto node =
std::make_shared<detail::ProcessInfoNode>(std::move(f), now, clock_);
std::make_shared<detail::ProcessInfoNode>(std::move(p), now, clock_);
state->infos.emplace(pid, node);
threadLocalCache_.put(pid, node);
state.unlock();
Expand Down Expand Up @@ -223,10 +242,10 @@ void ProcessInfoCache::add(pid_t pid) {
return std::nullopt;
},
[&](auto& wlock) -> folly::Unit {
auto [p, f] = folly::makePromiseContract<ProcessInfo>();
wlock->lookupQueue.emplace_back(pid, std::move(p));
auto p = std::make_shared<folly::SharedPromise<ProcessInfo>>();
wlock->lookupQueue.emplace_back(pid, p);
auto node = std::make_shared<detail::ProcessInfoNode>(
std::move(f), now, clock_);
std::move(p), now, clock_);
wlock->infos.emplace(pid, node);
threadLocalCache_.put(pid, std::move(node));

Expand Down Expand Up @@ -279,7 +298,9 @@ void ProcessInfoCache::clearExpired(

void ProcessInfoCache::workerThread() {
// Double-buffered work queues.
std::vector<std::pair<pid_t, folly::Promise<ProcessInfo>>> lookupQueue;
std::vector<
std::pair<pid_t, std::shared_ptr<folly::SharedPromise<ProcessInfo>>>>
lookupQueue;
std::vector<folly::Promise<std::map<pid_t, ProcessInfo>>> getAllQueue;

// Allows periodic flushing of the expired infos without quadratic-time
Expand Down Expand Up @@ -333,7 +354,7 @@ void ProcessInfoCache::workerThread() {
// As described in ProcessInfoCache::add() above, it is critical this work
// be done outside of the state lock.
for (auto& [pid, p] : lookupQueue) {
p.setWith([this, pid_2 = pid] { return readInfo_(pid_2); });
p->setWith([this, pid_2 = pid] { return readInfo_(pid_2); });
}

auto now = clock_.now();
Expand All @@ -358,7 +379,7 @@ void ProcessInfoCache::workerThread() {
auto state = state_.wlock();
clearExpired(now, *state);
for (const auto& [pid, info] : state->infos) {
auto& fut = info->info_;
auto& fut = info->quickAccessToInfo_;
if (fut.isReady() && fut.hasValue()) {
allProcessInfos[pid] = fut.value();
}
Expand All @@ -375,8 +396,8 @@ void ProcessInfoCache::workerThread() {
std::optional<ProcessInfo> ProcessInfoCache::getProcessInfo(pid_t pid) {
auto state = state_.rlock();
if (auto* nodep = folly::get_ptr(state->infos, pid)) {
if ((*nodep)->info_.isReady()) {
return (*nodep)->info_.value();
if ((*nodep)->quickAccessToInfo_.isReady()) {
return (*nodep)->quickAccessToInfo_.value();
}
}
return std::nullopt;
Expand Down
9 changes: 8 additions & 1 deletion eden/common/utils/ProcessInfoCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@

#include "eden/common/utils/ProcessInfo.h"

namespace folly {
template <class T>
class SharedPromise;
}

namespace facebook::eden {

class FaultInjector;
Expand Down Expand Up @@ -166,7 +171,9 @@ class ProcessInfoCache {
// The following queues are intentionally unbounded. add() cannot block.
// TODO: We could set a high limit on the length of the queue and drop
// requests if necessary.
std::vector<std::pair<pid_t, folly::Promise<ProcessInfo>>> lookupQueue;
std::vector<
std::pair<pid_t, std::shared_ptr<folly::SharedPromise<ProcessInfo>>>>
lookupQueue;
std::vector<folly::Promise<std::map<pid_t, ProcessInfo>>> getAllQueue;
};

Expand Down
18 changes: 7 additions & 11 deletions eden/common/utils/test/ProcessInfoCacheTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,32 +184,28 @@ TEST(ProcessInfoCache, multipleLookups) {
faultInjector.injectBlock("ProcessInfoCache::workerThread", ".*");

auto info1 = processInfoCache.lookup(getpid());
auto info2 = processInfoCache.lookup(getpid());
ASSERT_TRUE(
faultInjector.waitUntilBlocked("ProcessInfoCache::workerThread", 1s));

// auto info2 = processInfoCache.lookup(getpid());

// Assumption: these should share the same node since they are cached and
// worker could not have made any progress yet.
// ASSERT_EQ(info1.node_.get(), info2.node_.get());
ASSERT_EQ(info1.node_.get(), info2.node_.get());

// currently if we run both threads it cause a crash sometimes. because both
// threads are calling wait() at the same time which can cause multiple
// callbacks to be added to the same future core.
auto thread1 = std::thread{[info = std::move(info1)] {
folly::setThreadName("info1");
EXPECT_NE("", info.get().name);
}};

// auto thread2 = std::thread{[info = std::move(info2)] {
// folly::setThreadName("info2");
// EXPECT_NE("", info.get().name);
// }};
auto thread2 = std::thread{[info = std::move(info2)] {
folly::setThreadName("info2");
EXPECT_NE("", info.get().name);
}};

faultInjector.removeFault("ProcessInfoCache::workerThread", ".*");
faultInjector.unblock("ProcessInfoCache::workerThread", ".*");

thread1.join();
// thread2.join();
thread2.join();
}
} // namespace facebook::eden

0 comments on commit b165bb6

Please sign in to comment.