Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Overhaul logging mechanism #80

Merged
merged 4 commits into from
Feb 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,10 @@ install(
NAMESPACE spdlog::
FILE spdlogConfigTargets.cmake
)
configure_file(
vendor/spdlog/cmake/spdlogConfig.cmake.in
"${spdlog_PROJECT_CONFIG_OUT}"
@ONLY
configure_package_config_file(
vendor/spdlog/cmake/spdlogConfig.cmake.in
"${spdlog_PROJECT_CONFIG_OUT}"
INSTALL_DESTINATION lib/celerity/vendor/cmake
)
write_basic_package_version_file(
"${spdlog_VERSION_CONFIG_FILE}"
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ Celerity's runtime behavior:
automatically assign a unique device to each worker on a host.
- `CELERITY_PROFILE_KERNEL` controls whether SYCL queue profiling information
should be queried (currently not supported when using hipSYCL).
- `CELERITY_GRAPH_PRINT_MAX_VERTS` sets the maximum number of vertices the
task/command graphs can have above which their GraphViz output will be omitted.

## Disclaimer

Expand Down
5 changes: 0 additions & 5 deletions include/buffer_transfer_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

#include "buffer_storage.h"
#include "command.h"
#include "logger.h"
#include "mpi_support.h"
#include "types.h"

Expand All @@ -24,8 +23,6 @@ namespace detail {
bool complete = false;
};

explicit buffer_transfer_manager(std::shared_ptr<logger> transfer_logger) : transfer_logger(transfer_logger) {}

std::shared_ptr<const transfer_handle> push(const command_pkg& pkg);
std::shared_ptr<const transfer_handle> await_push(const command_pkg& pkg);

Expand Down Expand Up @@ -70,8 +67,6 @@ namespace detail {
// - Still outstanding pushes that have been requested through ::await_push
std::unordered_map<command_id, std::shared_ptr<incoming_transfer_handle>> push_blackboard;

std::shared_ptr<logger> transfer_logger;

void poll_incoming_transfers();
void update_incoming_transfers();
void update_outgoing_transfers();
Expand Down
6 changes: 3 additions & 3 deletions include/command_graph.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <memory>
#include <optional>
#include <string>
#include <type_traits>
#include <unordered_map>
#include <unordered_set>
Expand All @@ -12,8 +14,6 @@
namespace celerity {
namespace detail {

class logger;

// TODO: Could be extended (using SFINAE) to support additional iterator types (e.g. random access)
template <typename Iterator, typename PredicateFn>
class filter_iterator {
Expand Down Expand Up @@ -126,7 +126,7 @@ namespace detail {

auto& task_commands(task_id tid) { return by_task.at(tid); }

void print_graph(logger& graph_logger) const;
std::optional<std::string> print_graph(size_t max_nodes) const;

// TODO unify dependency terminology to this
void add_dependency(abstract_command* depender, abstract_command* dependee, dependency_kind kind = dependency_kind::TRUE_DEP) {
Expand Down
11 changes: 5 additions & 6 deletions include/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

#include <optional>

#include "logger.h"
#include "log.h"

namespace celerity {
namespace detail {
Expand All @@ -22,12 +22,8 @@ namespace detail {
public:
/**
* Initializes the @p config by parsing environment variables and passed arguments.
*
* @param logger The logger is used to print warnings about invalid configuration options.
* Additionally, the logger's level is set to the same level as is
* returned by ::get_log_level().
*/
config(int* argc, char** argv[], logger& logger);
config(int* argc, char** argv[]);

log_level get_log_level() const { return log_lvl; }

Expand All @@ -44,11 +40,14 @@ namespace detail {
const std::optional<device_config>& get_device_config() const { return device_cfg; };
std::optional<bool> get_enable_device_profiling() const { return enable_device_profiling; };

size_t get_graph_print_max_verts() const { return graph_print_max_verts; };

private:
log_level log_lvl;
host_config host_cfg;
std::optional<device_config> device_cfg;
std::optional<bool> enable_device_profiling;
size_t graph_print_max_verts = 200;
};

} // namespace detail
Expand Down
4 changes: 0 additions & 4 deletions include/device_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <CL/sycl.hpp>

#include "config.h"
#include "logger.h"
#include "workaround.h"

namespace celerity {
Expand All @@ -18,8 +17,6 @@ namespace detail {
*/
class device_queue {
public:
device_queue(logger& queue_logger) : queue_logger(queue_logger){};

/**
* @brief Initializes the @p device_queue, selecting an appropriate device in the process.
*
Expand Down Expand Up @@ -59,7 +56,6 @@ namespace detail {
}

private:
logger& queue_logger;
std::unique_ptr<cl::sycl::queue> sycl_queue;
bool device_profiling_enabled = false;

Expand Down
11 changes: 3 additions & 8 deletions include/executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

#include "buffer_manager.h"
#include "buffer_transfer_manager.h"
#include "logger.h"
#include "worker_job.h"

namespace celerity {
Expand Down Expand Up @@ -42,8 +41,8 @@ namespace detail {
class executor {
public:
// TODO: Try to decouple this more.
executor(node_id local_nid, host_queue& h_queue, device_queue& d_queue, task_manager& tm, buffer_manager& buffer_mngr,
reduction_manager& reduction_mngr, std::shared_ptr<logger> execution_logger);
executor(
node_id local_nid, host_queue& h_queue, device_queue& d_queue, task_manager& tm, buffer_manager& buffer_mngr, reduction_manager& reduction_mngr);

void startup();

Expand All @@ -66,7 +65,6 @@ namespace detail {
buffer_manager& buffer_mngr;
reduction_manager& reduction_mngr;
std::unique_ptr<buffer_transfer_manager> btm;
std::shared_ptr<logger> execution_logger;
std::thread exec_thrd;
size_t running_device_compute_jobs = 0;
std::atomic<uint64_t> highest_executed_sync_id = {0};
Expand All @@ -87,10 +85,7 @@ namespace detail {

template <typename Job, typename... Args>
void create_job(const command_pkg& pkg, const std::vector<command_id>& dependencies, Args&&... args) {
auto logger = execution_logger->create_context({{"job", std::to_string(pkg.cid)}});
if(pkg.cmd == command_type::HORIZON) { logger = logger->create_context({{"task", std::to_string(std::get<horizon_data>(pkg.data).tid)}}); }
if(pkg.cmd == command_type::EXECUTION) { logger = logger->create_context({{"task", std::to_string(std::get<execution_data>(pkg.data).tid)}}); }
jobs[pkg.cid] = {std::make_unique<Job>(pkg, logger, std::forward<Args>(args)...), pkg.cmd, {}, 0};
jobs[pkg.cid] = {std::make_unique<Job>(pkg, std::forward<Args>(args)...), pkg.cmd, {}, 0};

// If job doesn't exist we assume it has already completed.
// This is true as long as we're respecting task-graph (anti-)dependencies when processing tasks.
Expand Down
1 change: 1 addition & 0 deletions include/grid.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <CL/sycl.hpp>
#include <allscale/api/user/data/grid.h>
#undef assert_fail // Incompatible with fmt

#include "ranges.h"

Expand Down
24 changes: 11 additions & 13 deletions include/host_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ namespace detail {
time_point end_time{};
};

explicit host_queue(logger& queue_logger) : queue_logger(queue_logger) {
host_queue() {
// TODO what is a good thread count for the non-collective thread pool?
threads.emplace(std::piecewise_construct, std::tuple{0}, std::tuple{MPI_COMM_NULL, 4});
}
Expand All @@ -133,17 +133,16 @@ namespace detail {
std::future<execution_info> submit(collective_group_id cgid, Fn&& fn) {
auto it = threads.find(cgid);
assert(it != threads.end());
return it->second.thread.push(
[&ql = queue_logger, fn = std::forward<Fn>(fn), submit_time = std::chrono::steady_clock::now(), comm = it->second.comm](int) {
auto start_time = std::chrono::steady_clock::now();
try {
fn(comm);
} catch(std::exception& e) { ql.error("exception in thread pool: {}", e.what()); } catch(...) {
ql.error("unknown exception in thread pool");
}
auto end_time = std::chrono::steady_clock::now();
return execution_info{submit_time, start_time, end_time};
});
return it->second.thread.push([fn = std::forward<Fn>(fn), submit_time = std::chrono::steady_clock::now(), comm = it->second.comm](int) {
auto start_time = std::chrono::steady_clock::now();
try {
fn(comm);
} catch(std::exception& e) { CELERITY_ERROR("exception in thread pool: {}", e.what()); } catch(...) {
CELERITY_ERROR("unknown exception in thread pool");
}
auto end_time = std::chrono::steady_clock::now();
return execution_info{submit_time, start_time, end_time};
});
}

/**
Expand All @@ -164,7 +163,6 @@ namespace detail {
};

std::unordered_map<collective_group_id, comm_thread> threads;
logger& queue_logger;
};

} // namespace detail
Expand Down
90 changes: 90 additions & 0 deletions include/log.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#pragma once

#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <variant>

// TODO: Make this configurable through CMake?
#define SPDLOG_ACTIVE_LEVEL SPDLOG_LEVEL_TRACE
#include <spdlog/spdlog.h>

// Enable formatting of types that support operator<<(std::ostream&, T)
#include <spdlog/fmt/ostr.h>

#include "print_utils.h"

#define CELERITY_LOG_SET_SCOPED_CTX(ctx) CELERITY_DETAIL_LOG_SET_SCOPED_CTX(ctx)

#define CELERITY_TRACE(...) SPDLOG_TRACE("{}{}", *celerity::detail::active_log_ctx, fmt::format(__VA_ARGS__))
#define CELERITY_DEBUG(...) SPDLOG_DEBUG("{}{}", *celerity::detail::active_log_ctx, fmt::format(__VA_ARGS__))
#define CELERITY_INFO(...) SPDLOG_INFO("{}{}", *celerity::detail::active_log_ctx, fmt::format(__VA_ARGS__))
#define CELERITY_WARN(...) SPDLOG_WARN("{}{}", *celerity::detail::active_log_ctx, fmt::format(__VA_ARGS__))
#define CELERITY_ERROR(...) SPDLOG_ERROR("{}{}", *celerity::detail::active_log_ctx, fmt::format(__VA_ARGS__))
#define CELERITY_CRITICAL(...) SPDLOG_CRITICAL("{}{}", *celerity::detail::active_log_ctx, fmt::format(__VA_ARGS__))

namespace celerity {
namespace detail {

using log_level = spdlog::level::level_enum;

template <typename... Es>
struct log_map {
const std::tuple<Es...>& entries;
log_map(const std::tuple<Es...>& entries) : entries(entries) {}
};

struct log_context {
std::string value;
log_context() = default;
template <typename... Es>
explicit log_context(const std::tuple<Es...>& entries) {
static_assert(sizeof...(Es) % 2 == 0, "log_context requires key/value pairs");
value = fmt::format("[{}] ", log_map{entries});
}
};

inline const std::string null_log_ctx;
inline thread_local const std::string* active_log_ctx = &null_log_ctx;
fknorr marked this conversation as resolved.
Show resolved Hide resolved

struct log_ctx_setter {
log_ctx_setter(log_context& ctx) { celerity::detail::active_log_ctx = &ctx.value; }
~log_ctx_setter() { celerity::detail::active_log_ctx = &celerity::detail::null_log_ctx; }
};

#define CELERITY_DETAIL_LOG_SET_SCOPED_CTX(ctx) \
log_ctx_setter _set_log_ctx_##__COUNTER__ { ctx }

template <typename A, typename B, typename... Rest, size_t... Is, typename Callback>
constexpr void tuple_for_each_pair_impl(const std::tuple<A, B, Rest...>& tuple, Callback&& cb, std::index_sequence<Is...>) {
cb(std::get<0>(tuple), std::get<1>(tuple));
if constexpr(sizeof...(Rest) > 0) {
tuple_for_each_pair_impl(std::tuple{std::get<2 + Is>(tuple)...}, std::forward<Callback>(cb), std::make_index_sequence<sizeof...(Rest)>{});
}
}

template <typename... Es, typename Callback>
constexpr void tuple_for_each_pair(const std::tuple<Es...>& tuple, Callback&& cb) {
static_assert(sizeof...(Es) % 2 == 0, "an even number of entries is required");
tuple_for_each_pair_impl(tuple, std::forward<Callback>(cb), std::make_index_sequence<sizeof...(Es) - 2>{});
}

} // namespace detail
} // namespace celerity

template <typename... Es>
struct fmt::formatter<celerity::detail::log_map<Es...>> {
constexpr auto parse(format_parse_context& ctx) { return ctx.begin(); }

template <typename FormatContext>
auto format(const celerity::detail::log_map<Es...>& map, FormatContext& ctx) {
auto&& out = ctx.out();
int i = 0;
tuple_for_each_pair(map.entries, [&i, &out](auto& a, auto& b) {
if(i++ > 0) { fmt::format_to(out, ", "); }
fmt::format_to(out, "{}={}", a, b);
});
return out;
}
};
Loading