Skip to content

Commit

Permalink
Add utility to check whether the execution is in main thread. (#14457)
Browse files Browse the repository at this point in the history
Risk Level: Low (refactor)
Testing: new unit tests
Docs Changes: n/a
Release Notes: n/a

Signed-off-by: chaoqin-li1123 <chaoqin@uchicago.edu>
  • Loading branch information
chaoqin-li1123 authored Jan 7, 2021
1 parent 28e8d77 commit 40c44e5
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 13 deletions.
1 change: 1 addition & 0 deletions source/common/common/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,7 @@ envoy_cc_library(
external_deps = ["abseil_synchronization"],
deps = envoy_cc_platform_dep("thread_impl_lib") + [
":non_copyable",
"//source/common/singleton:threadsafe_singleton",
],
)

Expand Down
15 changes: 15 additions & 0 deletions source/common/common/thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "envoy/thread/thread.h"

#include "common/common/non_copyable.h"
#include "common/singleton/threadsafe_singleton.h"

#include "absl/synchronization/mutex.h"

Expand Down Expand Up @@ -168,5 +169,19 @@ class AtomicPtr : private AtomicPtrArray<T, 1, alloc_mode> {
T* get(const MakeObject& make_object) { return BaseClass::get(0, make_object); }
};

struct MainThread {
using MainThreadSingleton = InjectableSingleton<MainThread>;
bool inMainThread() const { return main_thread_id_ == std::this_thread::get_id(); }
static void init() { MainThreadSingleton::initialize(new MainThread()); }
static void clear() {
free(MainThreadSingleton::getExisting());
MainThreadSingleton::clear();
}
static bool isMainThread() { return MainThreadSingleton::get().inMainThread(); }

private:
std::thread::id main_thread_id_{std::this_thread::get_id()};
};

} // namespace Thread
} // namespace Envoy
17 changes: 9 additions & 8 deletions source/common/thread_local/thread_local_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@ namespace ThreadLocal {
thread_local InstanceImpl::ThreadLocalData InstanceImpl::thread_local_data_;

InstanceImpl::~InstanceImpl() {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(Thread::MainThread::isMainThread());
ASSERT(shutdown_);
thread_local_data_.data_.clear();
Thread::MainThread::clear();
}

SlotPtr InstanceImpl::allocateSlot() {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(Thread::MainThread::isMainThread());
ASSERT(!shutdown_);

if (free_slot_indexes_.empty()) {
Expand Down Expand Up @@ -91,7 +92,7 @@ void InstanceImpl::SlotImpl::runOnAllThreads(const UpdateCb& cb) {
}

void InstanceImpl::SlotImpl::set(InitializeCb cb) {
ASSERT(std::this_thread::get_id() == parent_.main_thread_id_);
ASSERT(Thread::MainThread::isMainThread());
ASSERT(!parent_.shutdown_);

for (Event::Dispatcher& dispatcher : parent_.registered_threads_) {
Expand All @@ -105,7 +106,7 @@ void InstanceImpl::SlotImpl::set(InitializeCb cb) {
}

void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_thread) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(Thread::MainThread::isMainThread());
ASSERT(!shutdown_);

if (main_thread) {
Expand All @@ -119,7 +120,7 @@ void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_threa
}

void InstanceImpl::removeSlot(uint32_t slot) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(Thread::MainThread::isMainThread());

// When shutting down, we do not post slot removals to other threads. This is because the other
// threads have already shut down and the dispatcher is no longer alive. There is also no reason
Expand All @@ -146,7 +147,7 @@ void InstanceImpl::removeSlot(uint32_t slot) {
}

void InstanceImpl::runOnAllThreads(Event::PostCb cb) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(Thread::MainThread::isMainThread());
ASSERT(!shutdown_);

for (Event::Dispatcher& dispatcher : registered_threads_) {
Expand All @@ -158,7 +159,7 @@ void InstanceImpl::runOnAllThreads(Event::PostCb cb) {
}

void InstanceImpl::runOnAllThreads(Event::PostCb cb, Event::PostCb all_threads_complete_cb) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(Thread::MainThread::isMainThread());
ASSERT(!shutdown_);
// Handle main thread first so that when the last worker thread wins, we could just call the
// all_threads_complete_cb method. Parallelism of main thread execution is being traded off
Expand All @@ -185,7 +186,7 @@ void InstanceImpl::setThreadLocal(uint32_t index, ThreadLocalObjectSharedPtr obj
}

void InstanceImpl::shutdownGlobalThreading() {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(Thread::MainThread::isMainThread());
ASSERT(!shutdown_);
shutdown_ = true;
}
Expand Down
3 changes: 1 addition & 2 deletions source/common/thread_local/thread_local_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ namespace ThreadLocal {
*/
class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, public Instance {
public:
InstanceImpl() : main_thread_id_(std::this_thread::get_id()) {}
InstanceImpl() { Thread::MainThread::init(); }
~InstanceImpl() override;

// ThreadLocal::Instance
Expand Down Expand Up @@ -81,7 +81,6 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, pub
// A list of index of freed slots.
std::list<uint32_t> free_slot_indexes_;
std::list<std::reference_wrapper<Event::Dispatcher>> registered_threads_;
std::thread::id main_thread_id_;
Event::Dispatcher* main_thread_dispatcher_{};
std::atomic<bool> shutdown_{};

Expand Down
4 changes: 2 additions & 2 deletions source/server/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ InstanceImpl::InstanceImpl(
: nullptr),
grpc_context_(store.symbolTable()), http_context_(store.symbolTable()),
router_context_(store.symbolTable()), process_context_(std::move(process_context)),
main_thread_id_(std::this_thread::get_id()), hooks_(hooks), server_contexts_(*this) {
hooks_(hooks), server_contexts_(*this) {
try {
if (!options.logPath().empty()) {
try {
Expand Down Expand Up @@ -819,7 +819,7 @@ InstanceImpl::registerCallback(Stage stage, StageCallbackWithCompletion callback
}

void InstanceImpl::notifyCallbacksForStage(Stage stage, Event::PostCb completion_cb) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(Thread::MainThread::isMainThread());
const auto it = stage_callbacks_.find(stage);
if (it != stage_callbacks_.end()) {
for (const StageCallback& callback : it->second) {
Expand Down
1 change: 0 additions & 1 deletion source/server/server.h
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,6 @@ class InstanceImpl final : Logger::Loggable<Logger::Id::main>,
Router::ContextImpl router_context_;
std::unique_ptr<ProcessContext> process_context_;
std::unique_ptr<Memory::HeapShrinker> heap_shrinker_;
const std::thread::id main_thread_id_;
// initialization_time is a histogram for tracking the initialization time across hot restarts
// whenever we have support for histogram merge across hot restarts.
Stats::TimespanPtr initialization_timer_;
Expand Down
20 changes: 20 additions & 0 deletions test/common/thread_local/thread_local_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,26 @@ using testing::ReturnPointee;
namespace Envoy {
namespace ThreadLocal {

TEST(MainThreadVerificationTest, All) {
// Main thread singleton is initialized in the constructor of tls instance. Call to main thread
// verification will fail before that.
EXPECT_DEATH(Thread::MainThread::isMainThread(),
"InjectableSingleton used prior to initialization");
{
EXPECT_DEATH(Thread::MainThread::isMainThread(),
"InjectableSingleton used prior to initialization");
InstanceImpl tls;
// Call to main thread verification should succeed after tls instance has been initialized.
ASSERT(Thread::MainThread::isMainThread());
tls.shutdownGlobalThreading();
tls.shutdownThread();
}
// Main thread singleton is cleared in the destructor of tls instance. Call to main thread
// verification will fail after that.
EXPECT_DEATH(Thread::MainThread::isMainThread(),
"InjectableSingleton used prior to initialization");
}

class TestThreadLocalObject : public ThreadLocalObject {
public:
~TestThreadLocalObject() override { onDestroy(); }
Expand Down

0 comments on commit 40c44e5

Please sign in to comment.