Skip to content

Commit

Permalink
Use Celerity buffer for storing sampled frames in wave_sim
Browse files Browse the repository at this point in the history
Working with side effects from within host tasks (such as adding frames
to the `std::vector` in the previous implementation) requires careful
consideration as we currently lack a way of describing data dependencies
between such tasks. This can (and did) lead to confusing bugs where
depending on the `--sample-rate` setting, some `store` host tasks don't
have a clear ordering in the task graph, meaning they could be executed
in any order (leading to wrong results) or even concurrently
(potentially causing a crash).

While such problems can currently be worked around with manual syncing,
this change circumvents the problem entirely by using a
`celerity::buffer` instead of an `std::vector` for storing sampled
frames.
  • Loading branch information
psalz committed Jan 3, 2022
1 parent cfc508a commit d226b95
Showing 1 changed file with 24 additions and 30 deletions.
54 changes: 24 additions & 30 deletions examples/wave_sim/wave_sim.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,22 @@ void update(celerity::distr_queue& queue, celerity::buffer<float, 2> up, celerit
}

template <typename T>
void store(celerity::distr_queue& queue, celerity::buffer<T, 2> up, std::vector<std::vector<float>>& result_frames) {
void store(celerity::distr_queue& queue, celerity::buffer<T, 2> up, celerity::buffer<T, 1> sampled_frames, size_t frame_idx) {
const auto range = up.get_range();
queue.submit(celerity::allow_by_ref, [=, &result_frames](celerity::handler& cgh) {
queue.submit([=](celerity::handler& cgh) {
celerity::accessor up_r{up, cgh, celerity::access::all{}, celerity::read_only_host_task};
cgh.host_task(celerity::on_master_node, [=, &result_frames] {
result_frames.emplace_back();
auto& frame = *result_frames.rbegin();
frame.resize(range.size());
memcpy(frame.data(), up_r.get_pointer(), range[0] * range[1] * sizeof(float));
});
// Use `all` range mapper to avoid unnecessary lazy resizing of the backing buffer as new frames come in
celerity::accessor sf_w{sampled_frames, cgh, celerity::access::all{}, celerity::write_only_host_task};
cgh.host_task(celerity::on_master_node, [=] { memcpy(sf_w.get_pointer() + frame_idx * range.size(), up_r.get_pointer(), range.size() * sizeof(T)); });
});
}

void write_bin(size_t N, std::vector<std::vector<float>>& result_frames) {
template <typename T>
void write_bin(size_t N, size_t num_samples, const T* sampled_frames) {
std::ofstream os("wave_sim_result.bin", std::ios_base::out | std::ios_base::binary);

const struct { uint64_t n, t; } header{N, result_frames.size()};
const struct { uint64_t n, t; } header{N, num_samples};
os.write(reinterpret_cast<const char*>(&header), sizeof(header));

for(const auto& frame : result_frames) {
os.write(reinterpret_cast<const char*>(frame.data()), sizeof(float) * N * N);
}
os.write(reinterpret_cast<const char*>(sampled_frames), num_samples * N * N * sizeof(T));
}

struct wave_sim_config {
Expand Down Expand Up @@ -116,7 +110,7 @@ int main(int argc, char* argv[]) {
// Parse command line arguments
const wave_sim_config cfg = ([&]() {
wave_sim_config result;
const arg_vector args(argv + 1, argv + argc);
const arg_vector args{argv + 1, argv + argc};
for(auto it = args.cbegin(); it != args.cend(); ++it) {
if(get_cli_arg(args, it, "-N", result.N, atoi) || get_cli_arg(args, it, "-T", result.T, atoi) || get_cli_arg(args, it, "--dt", result.dt, atof)
|| get_cli_arg(args, it, "--sample-rate", result.output_sample_rate, atoi)) {
Expand All @@ -128,42 +122,42 @@ int main(int argc, char* argv[]) {
return result;
})(); // IIFE

const int num_steps = cfg.T / cfg.dt;
const size_t num_steps = cfg.T / cfg.dt;
// Sample (if enabled) every n-th frame, +1 for initial state
const size_t num_samples = cfg.output_sample_rate != 0 ? num_steps / cfg.output_sample_rate + 1 : 0;
if(cfg.output_sample_rate != 0 && num_steps % cfg.output_sample_rate != 0) {
std::cerr << "Warning: Number of time steps (" << num_steps << ") is not a multiple of the output sample rate (wasted frames)" << std::endl;
}

// TODO: We could allocate the required size at the beginning
std::vector<std::vector<float>> result_frames;
celerity::distr_queue queue;

celerity::buffer<float, 2> up(nullptr, celerity::range<2>(cfg.N, cfg.N)); // next
celerity::buffer<float, 2> u(nullptr, celerity::range<2>(cfg.N, cfg.N)); // current
celerity::buffer<float, 2> up{celerity::range<2>(cfg.N, cfg.N)}; // next
celerity::buffer<float, 2> u{celerity::range<2>(cfg.N, cfg.N)}; // current

// Create buffer for storing sampled frames.
// As we only need some form of contiguous storage for dumping the result in the end, we can simply use a 1D buffer here.
celerity::buffer<float, 1> sampled_frames{celerity::range<1>{num_samples * up.get_range().size()}};

setup_wave(queue, u, {cfg.N / 4.f, cfg.N / 4.f}, 1, {cfg.N / 8.f, cfg.N / 8.f});
zero(queue, up);
initialize(queue, up, u, cfg.dt, {cfg.dx, cfg.dy});

// Store initial state
if(cfg.output_sample_rate > 0) { store(queue, u, result_frames); }
if(cfg.output_sample_rate > 0) { store(queue, u, sampled_frames, 0); }

auto t = 0.0;
size_t i = 0;
while(t < cfg.T) {
update(queue, up, u, cfg.dt, {cfg.dx, cfg.dy});
if(cfg.output_sample_rate != 0 && ++i % cfg.output_sample_rate == 0) { store(queue, u, result_frames); }
if(cfg.output_sample_rate != 0 && ++i % cfg.output_sample_rate == 0) { store(queue, u, sampled_frames, i / cfg.output_sample_rate); }
std::swap(u, up);
t += cfg.dt;
}

queue.slow_full_sync();

if(cfg.output_sample_rate > 0) {
queue.submit(celerity::allow_by_ref, [&cfg, &result_frames](celerity::handler& cgh) {
cgh.host_task(celerity::on_master_node, [&cfg, &result_frames]() {
// TODO: Consider writing results to disk as they're coming in, instead of just at the end
write_bin(cfg.N, result_frames);
});
queue.submit([=](celerity::handler& cgh) {
celerity::accessor sf{sampled_frames, cgh, celerity::access::all{}, celerity::read_only_host_task};
cgh.host_task(celerity::on_master_node, [=]() { write_bin(cfg.N, num_samples, sf.get_pointer()); });
});
}

Expand Down

0 comments on commit d226b95

Please sign in to comment.