Skip to content

Commit

Permalink
Add support for simulating multiple nodes with CELERITY_DRY_RUN_NODES
Browse files Browse the repository at this point in the history
  • Loading branch information
facuMH authored and psalz committed Sep 7, 2022
1 parent 92c814b commit 299ebbf
Show file tree
Hide file tree
Showing 7 changed files with 111 additions and 1 deletion.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ Celerity's runtime behavior:
should be queried (currently not supported when using hipSYCL).
- `CELERITY_GRAPH_PRINT_MAX_VERTS` sets the maximum number of vertices the
task/command graphs can have above which their GraphViz output will be omitted.
- `CELERITY_DRY_RUN_NODES` takes a number and simulates a run with that many nodes
without actually executing the commands.

## Disclaimer

Expand Down
3 changes: 3 additions & 0 deletions include/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ namespace detail {
*/
const std::optional<device_config>& get_device_config() const { return m_device_cfg; };
std::optional<bool> get_enable_device_profiling() const { return m_enable_device_profiling; };
bool is_dry_run() const { return m_dry_run_nodes > 0; };
int get_dry_run_nodes() const { return m_dry_run_nodes; }

size_t get_graph_print_max_verts() const { return m_graph_print_max_verts; };

Expand All @@ -49,6 +51,7 @@ namespace detail {
std::optional<device_config> m_device_cfg;
std::optional<bool> m_enable_device_profiling;
size_t m_graph_print_max_verts = 200;
int m_dry_run_nodes = 0;
};

} // namespace detail
Expand Down
2 changes: 2 additions & 0 deletions include/runtime.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ namespace detail {

host_object_manager& get_host_object_manager() const;

bool is_dry_run() const { return m_cfg->is_dry_run(); }

private:
inline static bool m_mpi_initialized = false;
inline static bool m_mpi_finalized = false;
Expand Down
10 changes: 10 additions & 0 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,16 @@ namespace detail {
const auto result = get_env("CELERITY_FORCE_WG");
if(result.first) { CELERITY_WARN("Support for CELERITY_FORCE_WG has been removed with Celerity 0.3.0."); }
}

// -------------------------------- CELERITY_DRY_RUN_NODES ---------------------------------
{
const auto [is_set, value] = get_env("CELERITY_DRY_RUN_NODES");
if(is_set) {
const auto [is_valid, num_nodes] = parse_uint(value.c_str());
if(!is_valid) { CELERITY_WARN("CELERITY_DRY_RUN_NODES contains invalid value - will be ignored"); }
m_dry_run_nodes = num_nodes;
}
}
}
} // namespace detail
} // namespace celerity
12 changes: 11 additions & 1 deletion src/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,19 @@ namespace detail {
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
m_num_nodes = world_size;

m_cfg = std::make_unique<config>(argc, argv);
if(m_cfg->is_dry_run()) {
if(m_num_nodes != 1) throw std::runtime_error("In order to run with CELERITY_DRY_RUN_NODES a single MPI process/rank must be used.\n");
m_num_nodes = m_cfg->get_dry_run_nodes();
CELERITY_WARN("Performing a dry run with {} simulated nodes", m_num_nodes);
}

int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
m_local_nid = world_rank;

spdlog::set_pattern(fmt::format("[%Y-%m-%d %H:%M:%S.%e] [{:0{}}] [%^%l%$] %v", world_rank, int(ceil(log10(world_size)))));

m_cfg = std::make_unique<config>(argc, argv);
#ifndef __APPLE__
if(const uint32_t cores = affinity_cores_available(); cores < min_cores_needed) {
CELERITY_WARN("Celerity has detected that only {} logical cores are available to this process. It is recommended to assign at least {} "
Expand Down Expand Up @@ -257,6 +263,10 @@ namespace detail {
}

void runtime::flush_command(node_id target, unique_frame_ptr<command_frame> frame) {
if(is_dry_run()) {
// We only want to send epochs to the master node for slow full sync and shutdown.
if(target != 0 || frame->pkg.get_command_type() != command_type::epoch) return;
}
// Even though command packages are small enough to use a blocking send we want to be able to send to the master node as well,
// which is why we have to use Isend after all. We also have to make sure that the buffer stays around until the send is complete.
MPI_Request req;
Expand Down
28 changes: 28 additions & 0 deletions test/runtime_tests.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

#include <catch2/catch_template_test_macros.hpp>
#include <catch2/catch_test_macros.hpp>
#include <catch2/generators/catch_generators.hpp>
#include <catch2/matchers/catch_matchers_all.hpp>

#include <celerity.h>
Expand Down Expand Up @@ -47,6 +48,8 @@ namespace detail {
static scheduler& get_schdlr(runtime& rt) { return *rt.m_schdlr; }

static executor& get_exec(runtime& rt) { return *rt.m_exec; }

static size_t get_command_count(runtime& rt) { return rt.m_cdag->command_count(); }
};

struct scheduler_testspy {
Expand Down Expand Up @@ -993,5 +996,30 @@ namespace detail {

#endif

void dry_run_with_nodes(const size_t nodes) {
const std::string dryrun_envvar_name = "CELERITY_DRY_RUN_NODES";
const auto ste = test_utils::set_test_env(dryrun_envvar_name, std::to_string(nodes));

distr_queue q;

auto& rt = runtime::get_instance();
auto& tm = rt.get_task_manager();
tm.set_horizon_step(2);

REQUIRE(rt.is_dry_run());

q.submit([=](handler& cgh) { cgh.host_task(range<1>{nodes * 2}, [](partition<1>) {}); });
q.slow_full_sync();

// (intial epoch + task + sync epoch) per node.
CHECK(runtime_testspy::get_command_count(rt) == 3 * nodes);
test_utils::maybe_print_graph(tm);
}

TEST_CASE_METHOD(test_utils::runtime_fixture, "Dry run generates commands for an arbitrary number of simulated worker nodes", "[dryrun]") {
const size_t nodes = GENERATE(values({4, 8, 16}));
dry_run_with_nodes(nodes);
}

} // namespace detail
} // namespace celerity
55 changes: 55 additions & 0 deletions test/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,12 @@
#include <string>
#include <unordered_set>

#ifdef _WIN32
#define WIN32_LEAN_AND_MEAN
#define NOMINMAX
#include <windows.h>
#endif

#include <catch2/catch_test_macros.hpp>
#include <celerity.h>

Expand Down Expand Up @@ -433,6 +439,55 @@ namespace test_utils {
}
}

class set_test_env {
public:
#ifdef _WIN32
set_test_env(const std::string& env, const std::string& val) : m_env_var_name(env) {
// We use the ANSI version of Get/Set, because it does not require type conversion of char to wchar_t, and we can safely do this
// because we are not mutating the text and therefore can treat them as raw bytes without having to worry about the text encoding.
const auto name_size = GetEnvironmentVariableA(env.c_str(), nullptr, 0);
if(name_size > 0) {
m_original_value.resize(name_size);
const auto res = GetEnvironmentVariableA(env.c_str(), m_original_value.data(), name_size);
assert(res != 0 && "Failed to get celerity environment variable");
}
const auto res = SetEnvironmentVariableA(env.c_str(), val.c_str());
assert(res != 0 && "Failed to set celerity environment variable");
}

~set_test_env() {
if(m_original_value.empty()) {
const auto res = SetEnvironmentVariableA(m_env_var_name.c_str(), NULL);
assert(res != 0 && "Failed to delete celerity environment variable");
} else {
const auto res = SetEnvironmentVariableA(m_env_var_name.c_str(), m_original_value.c_str());
assert(res != 0 && "Failed to reset celerity environment variable");
}
}

#else
set_test_env(const std::string& env, const std::string& val) {
const char* has_value = std::getenv(env.c_str());
if(has_value != nullptr) { m_original_value = has_value; }
const auto res = setenv(env.c_str(), val.c_str(), 1);
assert(res == 0 && "Failed to set celerity environment variable");
m_env_var_name = env;
}
~set_test_env() {
if(m_original_value.empty()) {
const auto res = unsetenv(m_env_var_name.c_str());
assert(res == 0 && "Failed to unset celerity environment variable");
} else {
const auto res = setenv(m_env_var_name.c_str(), m_original_value.c_str(), 1);
assert(res == 0 && "Failed to reset celerity environment variable");
}
}
#endif
private:
std::string m_env_var_name;
std::string m_original_value;
};

} // namespace test_utils
} // namespace celerity

Expand Down

0 comments on commit 299ebbf

Please sign in to comment.