Skip to content

Commit

Permalink
refactor out the schedule-cleanup-callback complexity by firing a pos…
Browse files Browse the repository at this point in the history
…t callback on destruction of the ref-count shared_ptr

Signed-off-by: Xin Zhuang <stevenzzz@google.com>
  • Loading branch information
stevenzzzz committed Sep 10, 2019
1 parent 0b5292c commit 76fbf31
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 55 deletions.
81 changes: 42 additions & 39 deletions source/common/thread_local/thread_local_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ SlotPtr InstanceImpl::allocateSlot() {
if (free_slot_indexes_.empty()) {
std::unique_ptr<SlotImpl> slot(new SlotImpl(*this, slots_.size()));
auto wrapper = std::make_unique<Bookkeeper>(*this, std::move(slot));
slots_.push_back(&wrapper->slot());
slots_.push_back(wrapper->slot_.get());
return wrapper;
}
const uint32_t idx = free_slot_indexes_.front();
Expand Down Expand Up @@ -57,42 +57,47 @@ ThreadLocalObjectSharedPtr InstanceImpl::SlotImpl::get() {
}

InstanceImpl::Bookkeeper::Bookkeeper(InstanceImpl& parent, std::unique_ptr<SlotImpl>&& slot)
: parent_(parent), holder_(std::make_unique<SlotHolder>(std::move(slot))) {}
: parent_(parent), slot_(std::move(slot)),
ref_count_(/*not used.*/ nullptr,
[slot = slot_.get(), &parent = this->parent_](uint32_t* /* not used */) {
// On destruction, post a cleanup callback on main thread, this could happen on
// any thread.
parent.scheduleCleanup(slot);
}) {}

ThreadLocalObjectSharedPtr InstanceImpl::Bookkeeper::get() { return slot().get(); }
ThreadLocalObjectSharedPtr InstanceImpl::Bookkeeper::get() { return slot_->get(); }

void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb& cb, Event::PostCb complete_cb) {
slot().runOnAllThreads(
[cb, ref_count = holder_->ref_count_](ThreadLocalObjectSharedPtr previous) {
slot_->runOnAllThreads(
[cb, ref_count = this->ref_count_](ThreadLocalObjectSharedPtr previous) {
return cb(std::move(previous));
},
complete_cb);
}

void InstanceImpl::Bookkeeper::runOnAllThreads(const UpdateCb& cb) {
slot().runOnAllThreads(
[cb, ref_count = holder_->ref_count_](ThreadLocalObjectSharedPtr previous) {
return cb(std::move(previous));
});
slot_->runOnAllThreads([cb, ref_count = this->ref_count_](ThreadLocalObjectSharedPtr previous) {
return cb(std::move(previous));
});
}

bool InstanceImpl::Bookkeeper::currentThreadRegistered() {
return slot().currentThreadRegistered();
return slot_->currentThreadRegistered();
}

void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb) {
// Use holder_.ref_count_ to bookkeep how many on-the-fly callback are out there.
slot().runOnAllThreads([cb, ref_count = holder_->ref_count_]() { cb(); });
// Use ref_count_ to bookkeep how many on-the-fly callback are out there.
slot_->runOnAllThreads([cb, ref_count = this->ref_count_]() { cb(); });
}

void InstanceImpl::Bookkeeper::runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback) {
// Use holder_.ref_count_ to bookkeep how many on-the-fly callback are out there.
slot().runOnAllThreads([cb, main_callback, ref_count = holder_->ref_count_]() { cb(); },
// Use ref_count_ to bookkeep how many on-the-fly callback are out there.
slot_->runOnAllThreads([cb, main_callback, ref_count = this->ref_count_]() { cb(); },
main_callback);
}

void InstanceImpl::Bookkeeper::set(InitializeCb cb) {
slot().set([cb, ref_count = holder_->ref_count_](Event::Dispatcher& dispatcher)
slot_->set([cb, ref_count = this->ref_count_](Event::Dispatcher& dispatcher)
-> ThreadLocalObjectSharedPtr { return cb(dispatcher); });
}

Expand All @@ -110,37 +115,35 @@ void InstanceImpl::registerThread(Event::Dispatcher& dispatcher, bool main_threa
}
}

// Deletes a Slot if it's recycleable(no on-the-fly callbacks points to it), otherwise puts it into
// a deferred delete queue, and schedules a cleanup task to collect the Slot once it's recycleable.
void InstanceImpl::recycle(std::unique_ptr<SlotHolder>&& holder) {
if (holder->isRecycleable()) {
holder.reset();
return;
}
deferred_deletes_.emplace_back(std::move(holder));
scheduleCleanup();
// Puts the slot into a deferred delete container, the slot will be destructed when its out-going
// callback reference count goes to 0.
void InstanceImpl::recycle(std::unique_ptr<SlotImpl>&& slot) {
ASSERT(std::this_thread::get_id() == main_thread_id_);
ASSERT(slot != nullptr);
auto* slot_addr = slot.get();
deferred_deletes_.insert({slot_addr, std::move(slot)});
}

// Cleans up the deferred deletes queue.
// Removes all the items with ref count 1, and reschedule another cleanup task if there are left
// un-recycleable items.
// This way ensures that, unless server shuts down, an enqueued item will be
// deleted eventually.
void InstanceImpl::scheduleCleanup() {
ASSERT(main_thread_dispatcher_ != nullptr);
// Called by the Bookkeeper ref_count destructor, the SlotImpl in the deferred deletes map can be
// destructed now.
void InstanceImpl::scheduleCleanup(SlotImpl* slot) {
if (shutdown_) {
// If server is shutting down, do nothing here.
// The destruction of Bookkeeper has already transferred the SlotImpl to the deferred_deletes_
// queue. No matter if this method is called from a Worker thread, the SlotImpl will be
// destructed on main thread when InstanceImpl destructs.
return;
}
if (deferred_deletes_.empty()) {
if (std::this_thread::get_id() == main_thread_id_) {
// If called from main thread, save a callback.
ASSERT(deferred_deletes_.contains(slot));
deferred_deletes_.erase(slot);
return;
}
main_thread_dispatcher_->post([this]() {
deferred_deletes_.remove_if(
[](std::unique_ptr<SlotHolder>& holder) -> bool { return holder->isRecycleable(); });
if (!deferred_deletes_.empty()) {
// Reschedule another cleanup task if there are still non-recyclable slots.
scheduleCleanup();
}
main_thread_dispatcher_->post([slot, this]() {
ASSERT(deferred_deletes_.contains(slot));
// The slot is guaranteed to be put into the deferred_deletes_ map by Bookkeeper destructor.
deferred_deletes_.erase(slot);
});
}

Expand Down
24 changes: 8 additions & 16 deletions source/common/thread_local/thread_local_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "common/common/logger.h"
#include "common/common/non_copyable.h"
#include "absl/container/flat_hash_map.h"

namespace Envoy {
namespace ThreadLocal {
Expand Down Expand Up @@ -48,22 +49,11 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, pub
const uint64_t index_;
};

// A helper class for holding a SlotImpl and its bookkeeping shared_ptr which counts the number of
// update callbacks on-the-fly.
struct SlotHolder {
SlotHolder(std::unique_ptr<SlotImpl>&& slot) : slot_(std::move(slot)) {}
bool isRecycleable() { return ref_count_.use_count() == 1; }

const std::unique_ptr<SlotImpl> slot_;
const std::shared_ptr<int> ref_count_{new int(0)};
};

// A Wrapper of SlotImpl which on destruction returns the SlotImpl to the deferred delete queue
// (detaches it).
struct Bookkeeper : public Slot {
Bookkeeper(InstanceImpl& parent, std::unique_ptr<SlotImpl>&& slot);
~Bookkeeper() override { parent_.recycle(std::move(holder_)); }
SlotImpl& slot() { return *(holder_->slot_); }
~Bookkeeper() override { parent_.recycle(std::move(slot_)); }

// ThreadLocal::Slot
ThreadLocalObjectSharedPtr get() override;
Expand All @@ -75,17 +65,19 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, pub
void set(InitializeCb cb) override;

InstanceImpl& parent_;
std::unique_ptr<SlotHolder> holder_;
std::unique_ptr<SlotImpl> slot_;
std::shared_ptr<uint32_t> ref_count_;
};

struct ThreadLocalData {
Event::Dispatcher* dispatcher_{};
std::vector<ThreadLocalObjectSharedPtr> data_;
};

void recycle(std::unique_ptr<SlotHolder>&& holder);
void recycle(std::unique_ptr<SlotImpl>&& slot);
// Cleanup the deferred deletes queue.
void scheduleCleanup();
void scheduleCleanup(SlotImpl* slot);

void removeSlot(SlotImpl& slot);
void runOnAllThreads(Event::PostCb cb);
void runOnAllThreads(Event::PostCb cb, Event::PostCb main_callback);
Expand All @@ -95,7 +87,7 @@ class InstanceImpl : Logger::Loggable<Logger::Id::main>, public NonCopyable, pub

// A queue for Slots that has to be deferred to delete due to out-going callbacks
// pointing to the Slot.
std::list<std::unique_ptr<SlotHolder>> deferred_deletes_;
absl::flat_hash_map<SlotImpl*, std::unique_ptr<SlotImpl>> deferred_deletes_;

std::vector<SlotImpl*> slots_;
// A list of index of freed slots.
Expand Down

0 comments on commit 76fbf31

Please sign in to comment.