Skip to content

Commit

Permalink
Cleaning up implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Aug 21, 2023
1 parent d9aaed0 commit 9b7d3be
Showing 1 changed file with 21 additions and 117 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,6 @@ namespace hpx::threads::policies {
char const* description)
: num_queues_(num_queues)
, num_high_priority_queues_(num_queues)
, thread_queue_init_()
, affinity_data_(affinity_data)
, description_(description)
{
Expand Down Expand Up @@ -185,10 +184,7 @@ namespace hpx::threads::policies {
failed = 4
};

steal_request() noexcept
: victims_()
{
}
steal_request() = default;

steal_request(std::size_t const num_thread, task_channel* channel,
mask_cref_type victims, bool idle, bool const stealhalf)
Expand Down Expand Up @@ -218,10 +214,7 @@ namespace hpx::threads::policies {
////////////////////////////////////////////////////////////////////////
struct scheduler_data
{
scheduler_data() noexcept
: victims_()
{
}
scheduler_data() = default;

scheduler_data(scheduler_data const&) = delete;
scheduler_data(scheduler_data&&) = delete;
Expand Down Expand Up @@ -731,7 +724,6 @@ namespace hpx::threads::policies {
data_[num_thread].data_.queue_->create_thread(data, id, ec);
break;

default:
case thread_priority::unknown:
{
HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
Expand Down Expand Up @@ -879,6 +871,7 @@ namespace hpx::threads::policies {
d.queue_->increment_num_stolen_from_pending();
#endif
thrds.tasks_.push_back(HPX_MOVE(thrd));
thrd = thread_id_ref_type{};
}

// we are ready to send at least one task
Expand All @@ -905,7 +898,7 @@ namespace hpx::threads::policies {
// Return the next thread to be executed, return false if none is
// available
bool get_next_thread(std::size_t num_thread, bool running,
thread_id_ref_type& thrd, bool enable_stealing)
thread_id_ref_type& thrd, bool allow_stealing)
{
HPX_ASSERT(num_thread < num_queues_);

Expand Down Expand Up @@ -946,7 +939,7 @@ namespace hpx::threads::policies {
#endif
}

if (enable_stealing && result)
if (allow_stealing && result)
{
// We found a task to run, however before running it we handle
// steal requests (assuming that there is more work left that
Expand Down Expand Up @@ -986,7 +979,7 @@ namespace hpx::threads::policies {
bool allow_fallback = false,
thread_priority priority = thread_priority::default_) override
{
std::size_t num_thread = static_cast<std::size_t>(-1);
auto num_thread = static_cast<std::size_t>(-1);
if (schedulehint.mode == thread_schedule_hint_mode::thread)
{
num_thread = schedulehint.hint;
Expand Down Expand Up @@ -1041,7 +1034,6 @@ namespace hpx::threads::policies {
HPX_MOVE(thrd));
break;

default:
case thread_priority::unknown:
{
HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
Expand All @@ -1056,7 +1048,7 @@ namespace hpx::threads::policies {
bool allow_fallback = false,
thread_priority priority = thread_priority::default_) override
{
std::size_t num_thread = static_cast<std::size_t>(-1);
auto num_thread = static_cast<std::size_t>(-1);
if (schedulehint.mode == thread_schedule_hint_mode::thread)
{
num_thread = schedulehint.hint;
Expand Down Expand Up @@ -1101,7 +1093,6 @@ namespace hpx::threads::policies {
low_priority_queue_.schedule_thread(HPX_MOVE(thrd), true);
break;

default:
case thread_priority::default_:
case thread_priority::normal:
data_[num_thread].data_.queue_->schedule_thread(
Expand All @@ -1112,6 +1103,13 @@ namespace hpx::threads::policies {
data_[num_thread].data_.bound_queue_->schedule_thread(
HPX_MOVE(thrd), true);
break;

case thread_priority::unknown:
{
HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
"local_workrequesting_scheduler::schedule_thread_last",
"unknown thread priority value (thread_priority::unknown)");
}
}
}

Expand Down Expand Up @@ -1215,7 +1213,6 @@ namespace hpx::threads::policies {
break;
}

default:
case thread_priority::unknown:
{
HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
Expand Down Expand Up @@ -1282,7 +1279,6 @@ namespace hpx::threads::policies {
break;
}

default:
case thread_priority::unknown:
{
HPX_THROW_EXCEPTION(hpx::error::bad_parameter,
Expand Down Expand Up @@ -1556,7 +1552,7 @@ namespace hpx::threads::policies {
if (d.num_recent_steals_ >=
scheduler_data::num_steal_adaptive_interval_)
{
double ratio =
double const ratio =
static_cast<double>(d.num_recent_tasks_executed_) /
d.num_steal_adaptive_interval_;

Expand Down Expand Up @@ -1662,9 +1658,8 @@ namespace hpx::threads::policies {
// scheduler. Returns true if the OS thread calling this function has to
// be terminated (i.e. no more work has to be done).
bool wait_or_add_new(std::size_t num_thread, bool running,
[[maybe_unused]] std::int64_t& idle_loop_count,
bool enable_stealing, std::size_t& added,
thread_id_ref_type* next_thrd = nullptr)
[[maybe_unused]] std::int64_t& idle_loop_count, bool allow_stealing,
std::size_t& added, thread_id_ref_type* next_thrd = nullptr)
{
HPX_ASSERT(num_thread < num_queues_);

Expand All @@ -1676,7 +1671,7 @@ namespace hpx::threads::policies {
// threads as these threads are never created 'staged'.

bool result =
d.queue_->wait_or_add_new(running, added, enable_stealing);
d.queue_->wait_or_add_new(running, added, allow_stealing);

// check if work was available
if (0 != added)
Expand All @@ -1693,7 +1688,7 @@ namespace hpx::threads::policies {
return true;

// return if no stealing is requested (or not possible)
if (num_queues_ == 1 || !enable_stealing)
if (num_queues_ == 1 || !allow_stealing)
return result;

// attempt to steal more work
Expand Down Expand Up @@ -1769,103 +1764,12 @@ namespace hpx::threads::policies {
low_priority_queue_.on_start_thread(num_thread);
}

std::size_t const num_threads = num_queues_;
//auto const& topo = create_topology();

// Initially set all bits, code below resets the bits corresponding
// to cores that can serve as a victim for the current core. A set
// bit in this mask means 'do not steal from this core'.
resize(d.victims_, num_threads);
resize(d.victims_, num_queues_);
reset(d.victims_);
set(d.victims_, num_thread);
//for (std::size_t i = 0; i != num_threads; ++i)
//{
// set(d.victims_, i);
//}
//
//// get NUMA domain masks of all queues...
//std::vector<mask_type> numa_masks(num_threads);
//std::vector<std::ptrdiff_t> numa_domains(num_threads);
//std::vector<mask_type> core_masks(num_threads);
//for (std::size_t i = 0; i != num_threads; ++i)
//{
// std::size_t num_pu = affinity_data_.get_pu_num(i);
// numa_masks[i] = topo.get_numa_node_affinity_mask(num_pu);
// numa_domains[i] = static_cast<std::ptrdiff_t>(
// topo.get_numa_node_number(num_pu));
// core_masks[i] = topo.get_core_affinity_mask(num_pu);
//}
//
//// iterate over the number of threads again to determine where to
//// steal from
//std::ptrdiff_t radius =
// std::lround(static_cast<double>(num_threads) / 2.0);
//
//mask_cref_type numa_mask = numa_masks[num_thread];
//mask_cref_type core_mask = core_masks[num_thread];
//
//auto iterate = [&](auto&& f) {
// // check our neighbors in a radial fashion (left and right
// // alternating, increasing distance each iteration)
// std::ptrdiff_t i = 1;
// for (/**/; i < radius; ++i)
// {
// std::ptrdiff_t left =
// (static_cast<std::ptrdiff_t>(num_thread) - i) %
// static_cast<std::ptrdiff_t>(num_threads);
// if (left < 0)
// left = num_threads + left;
//
// if (f(std::size_t(left)))
// {
// unset(data_[num_thread].data_.victims_,
// static_cast<std::size_t>(left));
// }
//
// std::size_t right = (num_thread + i) % num_threads;
// if (f(right))
// {
// unset(data_[num_thread].data_.victims_, right);
// }
// }
// if ((num_threads % 2) == 0)
// {
// std::size_t right = (num_thread + i) % num_threads;
// if (f(right))
// {
// unset(data_[num_thread].data_.victims_, right);
// }
// }
//};
//
//// check for threads that share the same core...
//iterate([&](std::size_t other_num_thread) {
// return any(core_mask & core_masks[other_num_thread]);
//});
//
//// check for threads that share the same NUMA domain...
//iterate([&](std::size_t other_num_thread) {
// return !any(core_mask & core_masks[other_num_thread]) &&
// any(numa_mask & numa_masks[other_num_thread]);
//});
//
//// check for the rest and if we are NUMA aware
//if (has_scheduler_mode(
// policies::scheduler_mode::enable_stealing_numa))
//{
// iterate([&](std::size_t other_num_thread) {
// // allow stealing from neighboring NUMA domain only
// std::ptrdiff_t numa_distance = numa_domains[num_thread] -
// numa_domains[other_num_thread];
// if (numa_distance > 1 || numa_distance < -1)
// return false;
// // steal from even cores from neighboring NUMA domains
// if (numa_distance == 1 || numa_distance == -1)
// return other_num_thread % 2 == 0;
// // cores from our domain are handled above
// return false;
// });
//}
set(d.victims_, num_queues_);
}

void on_stop_thread(std::size_t num_thread) override
Expand Down

0 comments on commit 9b7d3be

Please sign in to comment.