Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize block processing - Part 3 #802

Merged
merged 16 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
16 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion libraries/custom_appbase/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
add_library(custom_appbase INTERFACE)
target_include_directories(custom_appbase INTERFACE "${CMAKE_CURRENT_SOURCE_DIR}/include")
target_link_libraries(custom_appbase INTERFACE appbase)
target_link_libraries(custom_appbase INTERFACE appbase Boost::heap)

add_subdirectory(tests)
12 changes: 12 additions & 0 deletions libraries/custom_appbase/include/eosio/chain/application.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,18 @@ class priority_queue_executor {
return main_thread_id_;
}

template <typename Func>
void post( handler_id id, int priority, exec_queue q, Func&& func ) {
if (q == exec_queue::read_exclusive) {
// no reason to post to io_service which then places this in the read_exclusive_handlers queue.
// read_exclusive tasks are run exclusively by read threads by pulling off the read_exclusive handlers queue.
pri_queue_.add(id, priority, q, --order_, std::forward<Func>(func));
} else {
// post to io_service as the main thread may be blocked on io_service.run_one() in application::exec()
boost::asio::post(io_serv_, pri_queue_.wrap(id, priority, q, --order_, std::forward<Func>(func)));
}
}

template <typename Func>
void post( int priority, exec_queue q, Func&& func ) {
if (q == exec_queue::read_exclusive) {
Expand Down
117 changes: 88 additions & 29 deletions libraries/custom_appbase/include/eosio/chain/exec_pri_queue.hpp
Original file line number Diff line number Diff line change
@@ -1,14 +1,23 @@
#pragma once
#include <boost/asio.hpp>
#include <boost/heap/binomial_heap.hpp>

#include <condition_variable>
#include <mutex>
#include <optional>
#include <queue>

namespace appbase {
// adapted from: https://www.boost.org/doc/libs/1_69_0/doc/html/boost_asio/example/cpp11/invocation/prioritised_handlers.cpp

// Indicate non-unique handlers. If an existing handler at the specified priority already exists then there is
// no reason to insert a new handler to be processed.
//
// Add entries for each new non-unique handler type.
enum class handler_id {
unique, // identifies handler is unique, will not de-dup
process_incoming_block // process blocks already added to forkdb
};

enum class exec_queue {
read_only, // the queue storing tasks which are safe to execute
// in parallel with other read-only & read_exclusive tasks in the read-only
Expand All @@ -30,6 +39,12 @@ class exec_pri_queue : public boost::asio::execution_context
{
public:

~exec_pri_queue() {
clear(read_only_handlers_);
clear(read_write_handlers_);
clear(read_exclusive_handlers_);
}

// inform how many read_threads will be calling read_only/read_exclusive queues
// expected to only be called at program startup, not thread safe, not safe to call when lock_enabled_
void init_read_threads(size_t num_read_threads) {
Expand Down Expand Up @@ -61,20 +76,50 @@ class exec_pri_queue : public boost::asio::execution_context
should_exit_ = [](){ assert(false); return true; }; // should not be called when locking is disabled
}

// called from appbase::application_base::exec poll_one() or run_one()
template <typename Function>
void add(int priority, exec_queue q, size_t order, Function function) {
void add(int priority, exec_queue q, size_t order, Function&& function) {
assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive);
prio_queue& que = priority_que(q);
std::unique_ptr<queued_handler_base> handler(new queued_handler<Function>(priority, order, std::move(function)));
std::unique_ptr<queued_handler_base> handler(new queued_handler<Function>(handler_id::unique, priority, order, std::forward<Function>(function)));
if (lock_enabled_ || q == exec_queue::read_exclusive) { // called directly from any thread for read_exclusive
std::lock_guard g( mtx_ );
que.push( std::move( handler ) );
que.push( handler.release() );
if (num_waiting_)
cond_.notify_one();
} else {
que.push( std::move( handler ) );
que.push( handler.release() );
}
}

// called from appbase::application_base::exec poll_one() or run_one()
template <typename Function>
void add(handler_id id, int priority, exec_queue q, size_t order, Function&& function) {
assert( num_read_threads_ > 0 || q != exec_queue::read_exclusive);
if (id == handler_id::unique) {
return add(priority, q, order, std::forward<Function>(function));
}
prio_queue& que = priority_que(q);
std::unique_lock g( mtx_, std::defer_lock );
if (lock_enabled_ || q == exec_queue::read_exclusive) {
// called directly from any thread for read_exclusive
g.lock();
}
if (!que.empty()) {
// find the associated priority
auto end = que.ordered_end();
auto i = std::lower_bound(que.ordered_begin(), end, priority, [](const auto& h, int priority) {
return h->priority() > priority;
});
// boost::heap ordered iterator is a forward iterator
// if an existing handler with the id exists within the same priority then do not post
for (; i != end && (*i)->priority() == priority; ++i) {
if ((*i)->id() == id)
return;
}
}
que.push( new queued_handler<Function>(id, priority, order, std::forward<Function>(function)) );
if (g.owns_lock() && num_waiting_)
cond_.notify_one();
}

// only call when no lock required
Expand Down Expand Up @@ -158,8 +203,8 @@ class exec_pri_queue : public boost::asio::execution_context
class executor
{
public:
executor(exec_pri_queue& q, int p, size_t o, exec_queue que)
: context_(q), que_(que), priority_(p), order_(o)
executor(exec_pri_queue& q, handler_id id, int p, size_t o, exec_queue que)
: context_(q), que_(que), id_(id), priority_(p), order_(o)
{
}

Expand All @@ -171,19 +216,19 @@ class exec_pri_queue : public boost::asio::execution_context
template <typename Function, typename Allocator>
void dispatch(Function f, const Allocator&) const
{
context_.add(priority_, que_, order_, std::move(f));
context_.add(id_, priority_, que_, order_, std::move(f));
}

template <typename Function, typename Allocator>
void post(Function f, const Allocator&) const
{
context_.add(priority_, que_, order_, std::move(f));
context_.add(id_, priority_, que_, order_, std::move(f));
}

template <typename Function, typename Allocator>
void defer(Function f, const Allocator&) const
{
context_.add(priority_, que_, order_, std::move(f));
context_.add(id_, priority_, que_, order_, std::move(f));
}

void on_work_started() const noexcept {}
Expand All @@ -202,23 +247,32 @@ class exec_pri_queue : public boost::asio::execution_context
private:
exec_pri_queue& context_;
exec_queue que_;
handler_id id_;
int priority_;
size_t order_;
};

template <typename Function>
boost::asio::executor_binder<Function, executor>
wrap(handler_id id, int priority, exec_queue q, size_t order, Function&& func)
{
return boost::asio::bind_executor( executor(*this, id, priority, order, q), std::forward<Function>(func) );
}

template <typename Function>
boost::asio::executor_binder<Function, executor>
wrap(int priority, exec_queue q, size_t order, Function&& func)
{
return boost::asio::bind_executor( executor(*this, priority, order, q), std::forward<Function>(func) );
return boost::asio::bind_executor( executor(*this, handler_id::unique, priority, order, q), std::forward<Function>(func) );
}

private:
class queued_handler_base
{
public:
queued_handler_base( int p, size_t order )
: priority_( p )
queued_handler_base( handler_id id, int p, size_t order )
: id_( id )
, priority_( p )
, order_( order )
{
}
Expand All @@ -227,32 +281,31 @@ class exec_pri_queue : public boost::asio::execution_context

virtual void execute() = 0;

handler_id id() const { return id_; }
int priority() const { return priority_; }
// C++20
// friend std::weak_ordering operator<=>(const queued_handler_base&,
// const queued_handler_base&) noexcept = default;
friend bool operator<(const queued_handler_base& a,
const queued_handler_base& b) noexcept
{

friend bool operator<(const queued_handler_base& a, const queued_handler_base& b) noexcept {
// exclude id_
return std::tie( a.priority_, a.order_ ) < std::tie( b.priority_, b.order_ );
}

private:
int priority_;
size_t order_;
handler_id id_; // unique identifier of handler
int priority_; // priority of handler, see application_base priority
size_t order_; // maintain order within priority grouping
};

template <typename Function>
class queued_handler : public queued_handler_base
{
public:
queued_handler(int p, size_t order, Function f)
: queued_handler_base( p, order )
queued_handler(handler_id id, int p, size_t order, Function f)
: queued_handler_base( id, p, order )
, function_( std::move(f) )
{
}

void execute() override
void execute() final
{
function_();
}
Expand All @@ -264,13 +317,13 @@ class exec_pri_queue : public boost::asio::execution_context
struct deref_less
{
template<typename Pointer>
bool operator()(const Pointer& a, const Pointer& b) noexcept(noexcept(*a < *b))
bool operator()(const Pointer& a, const Pointer& b) const noexcept(noexcept(*a < *b))
{
return *a < *b;
}
};

using prio_queue = std::priority_queue<std::unique_ptr<queued_handler_base>, std::deque<std::unique_ptr<queued_handler_base>>, deref_less>;
using prio_queue = boost::heap::binomial_heap<queued_handler_base*, boost::heap::compare<deref_less>>;

prio_queue& priority_que(exec_queue q) {
switch (q) {
Expand Down Expand Up @@ -299,12 +352,18 @@ class exec_pri_queue : public boost::asio::execution_context
}

static std::unique_ptr<exec_pri_queue::queued_handler_base> pop(prio_queue& que) {
// work around std::priority_queue not having a pop() that returns value
auto t = std::move(const_cast<std::unique_ptr<queued_handler_base>&>(que.top()));
// work around priority_queue not having a pop() that returns value
// take back ownership of pointer
auto t = std::unique_ptr<queued_handler_base>(que.top());
que.pop();
return t;
}

void clear(prio_queue& que) {
while (!que.empty())
pop(que);
}

size_t num_read_threads_ = 0;
bool lock_enabled_ = false;
mutable std::mutex mtx_;
Expand Down
2 changes: 1 addition & 1 deletion libraries/custom_appbase/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ endif()

file(GLOB UNIT_TESTS "*.cpp")
add_executable( custom_appbase_test ${UNIT_TESTS} )
target_link_libraries( custom_appbase_test appbase fc Boost::included_unit_test_framework ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} )
target_link_libraries( custom_appbase_test appbase fc Boost::included_unit_test_framework Boost::heap ${CMAKE_DL_LIBS} ${PLATFORM_SPECIFIC_LIBS} )
target_include_directories( custom_appbase_test PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../include" "${CMAKE_CURRENT_SOURCE_DIR}/../../appbase/include" )

add_test( custom_appbase_test custom_appbase_test )
Loading