Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Declarative side effects #68

Merged
merged 11 commits into from
Feb 17, 2022
Merged
51 changes: 28 additions & 23 deletions examples/wave_sim/wave_sim.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,23 +63,32 @@ void update(celerity::distr_queue& queue, celerity::buffer<float, 2> up, celerit
step<float, update_config, class update>(queue, up, u, dt, delta);
}

void stream_open(celerity::distr_queue& queue, size_t N, size_t num_samples, celerity::experimental::host_object<std::ofstream> os) {
queue.submit([=](celerity::handler& cgh) {
celerity::experimental::side_effect os_eff{os, cgh};
cgh.host_task(celerity::on_master_node, [=] {
os_eff->open("wave_sim_result.bin", std::ios_base::out | std::ios_base::binary);
const struct { uint64_t n, t; } header{N, num_samples};
fknorr marked this conversation as resolved.
Show resolved Hide resolved
os_eff->write(reinterpret_cast<const char*>(&header), sizeof(header));
});
});
}

template <typename T>
void store(celerity::distr_queue& queue, celerity::buffer<T, 2> up, celerity::buffer<T, 1> sampled_frames, size_t frame_idx) {
void stream_append(celerity::distr_queue& queue, celerity::buffer<T, 2> up, celerity::experimental::host_object<std::ofstream> os) {
const auto range = up.get_range();
queue.submit([=](celerity::handler& cgh) {
celerity::accessor up_r{up, cgh, celerity::access::all{}, celerity::read_only_host_task};
// Use `all` range mapper to avoid unnecessary lazy resizing of the backing buffer as new frames come in
celerity::accessor sf_w{sampled_frames, cgh, celerity::access::all{}, celerity::write_only_host_task};
cgh.host_task(celerity::on_master_node, [=] { memcpy(sf_w.get_pointer() + frame_idx * range.size(), up_r.get_pointer(), range.size() * sizeof(T)); });
celerity::experimental::side_effect os_eff{os, cgh};
cgh.host_task(celerity::on_master_node, [=] { os_eff->write(reinterpret_cast<const char*>(up_r.get_pointer()), range.size() * sizeof(T)); });
});
}

template <typename T>
void write_bin(size_t N, size_t num_samples, const T* sampled_frames) {
std::ofstream os("wave_sim_result.bin", std::ios_base::out | std::ios_base::binary);
const struct { uint64_t n, t; } header{N, num_samples};
os.write(reinterpret_cast<const char*>(&header), sizeof(header));
os.write(reinterpret_cast<const char*>(sampled_frames), num_samples * N * N * sizeof(T));
void stream_close(celerity::distr_queue& queue, celerity::experimental::host_object<std::ofstream> os) {
queue.submit([=](celerity::handler& cgh) {
celerity::experimental::side_effect os_eff{os, cgh};
cgh.host_task(celerity::on_master_node, [=] { os_eff->close(); });
});
}

struct wave_sim_config {
Expand Down Expand Up @@ -134,32 +143,28 @@ int main(int argc, char* argv[]) {
celerity::buffer<float, 2> up{celerity::range<2>(cfg.N, cfg.N)}; // next
celerity::buffer<float, 2> u{celerity::range<2>(cfg.N, cfg.N)}; // current

// Create buffer for storing sampled frames.
// As we only need some form of contiguous storage for dumping the result in the end, we can simply use a 1D buffer here.
celerity::buffer<float, 1> sampled_frames{celerity::range<1>{num_samples * up.get_range().size()}};

setup_wave(queue, u, {cfg.N / 4.f, cfg.N / 4.f}, 1, {cfg.N / 8.f, cfg.N / 8.f});
zero(queue, up);
initialize(queue, up, u, cfg.dt, {cfg.dx, cfg.dy});

// Store initial state
if(cfg.output_sample_rate > 0) { store(queue, u, sampled_frames, 0); }
const celerity::experimental::host_object<std::ofstream> os;
if(cfg.output_sample_rate > 0) {
stream_open(queue, cfg.N, num_samples, os);
stream_append(queue, u, os); // Store initial state
}

auto t = 0.0;
size_t i = 0;
while(t < cfg.T) {
update(queue, up, u, cfg.dt, {cfg.dx, cfg.dy});
if(cfg.output_sample_rate != 0 && ++i % cfg.output_sample_rate == 0) { store(queue, u, sampled_frames, i / cfg.output_sample_rate); }
if(cfg.output_sample_rate > 0) {
if(++i % cfg.output_sample_rate == 0) { stream_append(queue, u, os); }
}
std::swap(u, up);
t += cfg.dt;
}

if(cfg.output_sample_rate > 0) {
queue.submit([=](celerity::handler& cgh) {
celerity::accessor sf{sampled_frames, cgh, celerity::access::all{}, celerity::read_only_host_task};
cgh.host_task(celerity::on_master_node, [=]() { write_bin(cfg.N, num_samples, sf.get_pointer()); });
});
}
if(cfg.output_sample_rate > 0) { stream_close(queue, os); }

return EXIT_SUCCESS;
}
1 change: 1 addition & 0 deletions include/celerity.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include "accessor.h"
#include "buffer.h"
#include "distr_queue.h"
#include "side_effect.h"
#include "user_bench.h"
#include "version.h"

Expand Down
5 changes: 5 additions & 0 deletions include/graph_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ namespace detail {
using buffer_state_map = std::unordered_map<buffer_id, buffer_state>;
using buffer_read_map = std::unordered_map<buffer_id, GridRegion<3>>;
using buffer_writer_map = std::unordered_map<buffer_id, region_map<std::optional<command_id>>>;
using side_effect_map = std::unordered_map<host_object_id, command_id>;

struct per_node_data {
// An "init command" is used as the last writer for host-initialized buffers.
Expand All @@ -69,6 +70,8 @@ namespace detail {
// Collective host tasks have an implicit dependency on the previous task in the same collective group, which is required in order to guarantee
// they are executed in the same order on every node.
std::unordered_map<collective_group_id, command_id> last_collective_commands;
// Side effects on the same host object create true dependencies between task commands, so we track the last effect per host object on each node.
side_effect_map host_object_last_effects;
};

public:
Expand Down Expand Up @@ -119,6 +122,8 @@ namespace detail {

void process_task_data_requirements(task_id tid);

void process_task_side_effect_requirements(task_id tid);

void generate_horizon(task_id tid);
};

Expand Down
33 changes: 19 additions & 14 deletions include/handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -287,42 +287,46 @@ namespace detail {
access_map.add_access(bid, std::move(rm));
}

void add_requirement(const host_object_id hoid, const experimental::side_effect_order order) {
assert(task == nullptr);
side_effect_map.add_side_effect(hoid, order);
}

template <int Dims>
void add_reduction(reduction_id rid) {
reductions.push_back(rid);
}

void create_host_compute_task(int dimensions, range<3> global_range, id<3> global_offset, range<3> granularity) {
void create_host_compute_task(task_geometry geometry) {
assert(task == nullptr);
if(global_range.size() == 0) {
if(geometry.global_size.size() == 0) {
// TODO this can be easily supported by not creating a task in case the execution range is empty
throw std::runtime_error{"The execution range of distributed host tasks must have at least one item"};
}
task = detail::task::make_host_compute(
tid, dimensions, global_range, global_offset, granularity, std::move(cgf), std::move(access_map), std::move(reductions));
task = detail::task::make_host_compute(tid, geometry, std::move(cgf), std::move(access_map), std::move(side_effect_map), std::move(reductions));
}

void create_device_compute_task(int dimensions, range<3> global_range, id<3> global_offset, range<3> granularity, std::string debug_name) {
void create_device_compute_task(task_geometry geometry, std::string debug_name) {
assert(task == nullptr);
if(global_range.size() == 0) {
if(geometry.global_size.size() == 0) {
// TODO unless reductions are involved, this can be easily supported by not creating a task in case the execution range is empty.
// Edge case: If the task includes reductions that specify property::reduction::initialize_to_identity, we need to create a task that sets
// the buffer state to an empty pending_reduction_state in the graph_generator. This will cause a trivial reduction_command to be generated on
// each node that reads from the reduction output buffer, initializing it to the identity value locally.
throw std::runtime_error{"The execution range of device tasks must have at least one item"};
}
task = detail::task::make_device_compute(
tid, dimensions, global_range, global_offset, granularity, std::move(cgf), std::move(access_map), std::move(reductions), std::move(debug_name));
if(!side_effect_map.empty()) { throw std::runtime_error{"Side effects cannot be used in device kernels"}; }
PeterTh marked this conversation as resolved.
Show resolved Hide resolved
task = detail::task::make_device_compute(tid, geometry, std::move(cgf), std::move(access_map), std::move(reductions), std::move(debug_name));
}

void create_collective_task(collective_group_id cgid) {
assert(task == nullptr);
task = detail::task::make_collective(tid, cgid, num_collective_nodes, std::move(cgf), std::move(access_map));
task = detail::task::make_collective(tid, cgid, num_collective_nodes, std::move(cgf), std::move(access_map), std::move(side_effect_map));
}

void create_master_node_task() {
assert(task == nullptr);
task = detail::task::make_master_node(tid, std::move(cgf), std::move(access_map));
task = detail::task::make_master_node(tid, std::move(cgf), std::move(access_map), std::move(side_effect_map));
}

std::unique_ptr<class task> into_task() && { return std::move(task); }
Expand All @@ -339,6 +343,7 @@ namespace detail {
task_id tid;
std::unique_ptr<command_group_storage_base> cgf;
buffer_access_map access_map;
side_effect_map side_effect_map;
std::vector<reduction_id> reductions;
std::unique_ptr<class task> task = nullptr;
size_t num_collective_nodes;
Expand Down Expand Up @@ -595,8 +600,8 @@ void handler::parallel_for_kernel_and_reductions(range<Dims> global_range, id<Di
granularity[d] = local_range[d];
}
}
return dynamic_cast<detail::prepass_handler&>(*this).create_device_compute_task(
Dims, detail::range_cast<3>(global_range), detail::id_cast<3>(global_offset), granularity, detail::kernel_debug_name<KernelName>());
const detail::task_geometry geometry{Dims, detail::range_cast<3>(global_range), detail::id_cast<3>(global_offset), granularity};
return dynamic_cast<detail::prepass_handler&>(*this).create_device_compute_task(geometry, detail::kernel_debug_name<KernelName>());
}

auto& device_handler = dynamic_cast<detail::live_pass_device_handler&>(*this);
Expand Down Expand Up @@ -642,8 +647,8 @@ void handler::host_task(experimental::collective_tag tag, Functor kernel) {
template <int Dims, typename Functor>
void handler::host_task(range<Dims> global_range, id<Dims> global_offset, Functor kernel) {
if(is_prepass()) {
dynamic_cast<detail::prepass_handler&>(*this).create_host_compute_task(
Dims, detail::range_cast<3>(global_range), detail::id_cast<3>(global_offset), {1, 1, 1});
const detail::task_geometry geometry{Dims, detail::range_cast<3>(global_range), detail::id_cast<3>(global_offset), {1, 1, 1}};
dynamic_cast<detail::prepass_handler&>(*this).create_host_compute_task(geometry);
} else {
dynamic_cast<detail::live_pass_host_handler&>(*this).schedule<Dims>(kernel);
}
Expand Down
175 changes: 175 additions & 0 deletions include/host_object.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#pragma once

#include <memory>
#include <mutex>
#include <type_traits>
#include <unordered_set>
#include <utility>

#include "runtime.h"
fknorr marked this conversation as resolved.
Show resolved Hide resolved


namespace celerity::experimental {

template <typename T>
class host_object;

} // namespace celerity::experimental

namespace celerity::detail {

class host_object_manager {
public:
host_object_id create_host_object() {
const std::lock_guard lock{mutex};
const auto id = next_id++;
objects.emplace(id);
return id;
}

void destroy_host_object(const host_object_id id) {
fknorr marked this conversation as resolved.
Show resolved Hide resolved
const std::lock_guard lock{mutex};
objects.erase(id);
}

// true-result only reliable if no calls to create_host_object() are pending
bool has_active_objects() const {
const std::lock_guard lock{mutex};
return !objects.empty();
}

private:
mutable std::mutex mutex;
host_object_id next_id = 0;
std::unordered_set<host_object_id> objects;
};

// Base for `state` structs in all host_object specializations: registers and unregisters host_objects with the host_object_manager.
struct host_object_tracker {
PeterTh marked this conversation as resolved.
Show resolved Hide resolved
detail::host_object_id id{};

host_object_tracker() {
if(!detail::runtime::is_initialized()) { detail::runtime::init(nullptr, nullptr); }
id = detail::runtime::get_instance().get_host_object_manager().create_host_object();
}

host_object_tracker(host_object_tracker&&) = delete;
host_object_tracker& operator=(host_object_tracker&&) = delete;

~host_object_tracker() { detail::runtime::get_instance().get_host_object_manager().destroy_host_object(id); }
};

// see host_object deduction guides
template <typename T>
struct assert_host_object_ctor_param_is_rvalue {
static_assert(std::is_rvalue_reference_v<T&&>,
"Either pass the constructor parameter as T&& or std::reference_wrapper<T>, or add explicit template arguments to host_object");
using type = T;
};

template <typename T>
using assert_host_object_ctor_param_is_rvalue_t = typename assert_host_object_ctor_param_is_rvalue<T>::type;

} // namespace celerity::detail

namespace celerity::experimental {

/**
* A `host_object` wraps state that exists separately on each worker node and can be referenced in host tasks through `side_effect`s. Celerity ensures that
* access to the object state is properly synchronized and ordered. An example usage of a host object might be a file stream that is written to from multiple
* host tasks sequentially.
*
* - The generic `host_object<T>` keeps ownership of the state at any time and is the safest way to achieve side effects on the host.
* - The `host_object<T&>` specialization attaches Celerity's tracking and synchronization mechanism to user-managed state. The user guarantees that the
* referenced object is not accessed in any way other than through a `side_effect` while the `host_object` is live.
* - `host_object<void>` does not carry internal state and can be used to track access to global variables or functions like `printf()`.
*/
template <typename T>
class host_object {
static_assert(std::is_object_v<T>); // disallow host_object<T&&> and host_object<function-type>

public:
using object_type = T;

host_object() : shared_state{std::make_shared<state>(std::in_place)} {}

explicit host_object(const T& obj) : shared_state{std::make_shared<state>(std::in_place, obj)} {}

explicit host_object(T&& obj) : shared_state{std::make_shared<state>(std::in_place, std::move(obj))} {}

/// Constructs the object in-place with the given constructor arguments.
template <typename... CtorParams>
explicit host_object(const std::in_place_t, CtorParams&&... ctor_args) // requiring std::in_place avoids overriding copy and move constructors
: shared_state{std::make_shared<state>(std::in_place, std::forward<CtorParams>(ctor_args)...)} {}

private:
template <typename, side_effect_order>
friend class side_effect;

struct state : detail::host_object_tracker {
T object;

template <typename... CtorParams>
explicit state(const std::in_place_t, CtorParams&&... ctor_args) : object{std::forward<CtorParams>(ctor_args)...} {}
};

detail::host_object_id get_id() const { return shared_state->id; }
T* get_object() const { return &shared_state->object; }

std::shared_ptr<state> shared_state;
};

template <typename T>
class host_object<T&> {
public:
using object_type = T;

explicit host_object(T& obj) : shared_state{std::make_shared<state>(obj)} {}

explicit host_object(const std::reference_wrapper<T> ref) : shared_state{std::make_shared<state>(ref.get())} {}

private:
template <typename, side_effect_order>
friend class side_effect;

struct state : detail::host_object_tracker {
T& object;

explicit state(T& object) : object{object} {}
};

detail::host_object_id get_id() const { return shared_state->id; }
T* get_object() const { return &shared_state->object; }

std::shared_ptr<state> shared_state;
};

template <>
class host_object<void> {
public:
using object_type = void;

explicit host_object() : shared_state{std::make_shared<state>()} {}

private:
template <typename, side_effect_order>
friend class side_effect;

struct state : detail::host_object_tracker {};

detail::host_object_id get_id() const { return shared_state->id; }

std::shared_ptr<state> shared_state;
};

// The universal reference parameter T&& matches U& as well as U&& for object types U, but we don't want to implicitly invoke a copy constructor: the user
// might have intended to either create a host_object<T&> (which requires a std::reference_wrapper parameter) or move-construct the interior.
template <typename T>
explicit host_object(T &&) -> host_object<detail::assert_host_object_ctor_param_is_rvalue_t<T>>;

template <typename T>
explicit host_object(std::reference_wrapper<T>) -> host_object<T&>;

explicit host_object()->host_object<void>;

} // namespace celerity::experimental
Loading