Skip to content

Commit

Permalink
[async] Parallel compilation infrastructure (#816)
Browse files Browse the repository at this point in the history
  • Loading branch information
yuanming-hu authored Apr 19, 2020
1 parent 95319c9 commit 1c9f669
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 19 deletions.
71 changes: 71 additions & 0 deletions misc/benchmark_parallel_compilation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# This file has a kernel with 16 equal offloaded tasks.

import taichi as ti
ti.init(arch=ti.x64)
quality = 1 # Use a larger value for higher-res simulations
n_particles, n_grid = 9000 * quality**2, 128 * quality
dx, inv_dx = 1 / n_grid, float(n_grid)
dt = 1e-4 / quality
p_vol, p_rho = (dx * 0.5)**2, 1
p_mass = p_vol * p_rho
E, nu = 0.1e4, 0.2 # Young's modulus and Poisson's ratio
mu_0, lambda_0 = E / (2 * (1 + nu)), E * nu / (
(1 + nu) * (1 - 2 * nu)) # Lame parameters
x = ti.Vector(2, dt=ti.f32, shape=n_particles) # position
v = ti.Vector(2, dt=ti.f32, shape=n_particles) # velocity
C = ti.Matrix(2, 2, dt=ti.f32, shape=n_particles) # affine velocity field
F = ti.Matrix(2, 2, dt=ti.f32, shape=n_particles) # deformation gradient
material = ti.var(dt=ti.i32, shape=n_particles) # material id
Jp = ti.var(dt=ti.f32, shape=n_particles) # plastic deformation
grid_v = ti.Vector(2, dt=ti.f32,
shape=(n_grid, n_grid)) # grid node momentum/velocity
grid_m = ti.var(dt=ti.f32, shape=(n_grid, n_grid)) # grid node mass


@ti.kernel
def substep():
for K in ti.static(range(16)):
for p in x:
base = (x[p] * inv_dx - 0.5).cast(int)
fx = x[p] * inv_dx - base.cast(float)
w = [
0.5 * ti.sqr(1.5 - fx), 0.75 - ti.sqr(fx - 1),
0.5 * ti.sqr(fx - 0.5)
]
F[p] = (ti.Matrix.identity(ti.f32, 2) + dt * C[p]) @ F[p]
h = ti.exp(10 * (1.0 - Jp[p]))
if material[p] == 1:
h = 0.3
mu, la = mu_0 * h, lambda_0 * h
if material[p] == 0: # liquid
mu = 0.0
U, sig, V = ti.svd(F[p])
J = 1.0
for d in ti.static(range(2)):
new_sig = sig[d, d]
if material[p] == 2: # Snow
new_sig = min(max(sig[d, d], 1 - 2.5e-2), 1 + 4.5e-3)
Jp[p] *= sig[d, d] / new_sig
sig[d, d] = new_sig
J *= new_sig
if material[p] == 0:
F[p] = ti.Matrix.identity(ti.f32, 2) * ti.sqrt(J)
elif material[p] == 2:
F[p] = U @ sig @ V.T()
stress = 2 * mu * (F[p] - U @ V.T()) @ F[p].T(
) + ti.Matrix.identity(ti.f32, 2) * la * J * (J - 1)
stress = (-dt * p_vol * 4 * inv_dx * inv_dx) * stress
affine = stress + p_mass * C[p]
for i, j in ti.static(ti.ndrange(3, 3)):
offset = ti.Vector([i, j])
dpos = (offset.cast(float) - fx) * dx
weight = w[i][0] * w[j][1]
grid_v[base +
offset] += weight * (p_mass * v[p] + affine @ dpos)
grid_m[base + offset] += weight * p_mass


substep()

ti.profiler_print()
ti.core.print_profile_info()
4 changes: 3 additions & 1 deletion taichi/llvm/llvm_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
// in charge of creating & JITing arch-specific LLVM modules,
// and invoking compiled functions (kernels).

#include <mutex>
#include <functional>

#include "taichi/lang_util.h"
#include "llvm_fwd.h"
#include "taichi/llvm/llvm_fwd.h"
#include "taichi/ir/snode.h"
#include "taichi/jit/jit_session.h"

Expand All @@ -20,6 +21,7 @@ class TaichiLLVMContext {
std::unique_ptr<JITSession> jit;
std::unique_ptr<llvm::Module> runtime_module, struct_module;
JITModule *runtime_jit_module;
std::mutex mut;
Arch arch;

SNodeAttributes snode_attr;
Expand Down
29 changes: 25 additions & 4 deletions taichi/program/async_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,34 @@ void ExecutionQueue::enqueue(KernelLaunchRecord ker) {
}

void ExecutionQueue::synchronize() {
TI_INFO("Flushing execution queue with {} tasks", task_queue.size());
std::mutex mut;

std::unordered_set<uint64> to_be_compiled;

for (int i = 0; i < (int)task_queue.size(); i++) {
auto ker = task_queue[i];
auto h = hash(ker.stmt);
if (compiled_func.find(h) == compiled_func.end() &&
to_be_compiled.find(h) == to_be_compiled.end()) {
to_be_compiled.insert(h);
compilation_workers.enqueue([&, ker, h, this]() {
{
auto func = CodeGenCPU(ker.kernel, ker.stmt).codegen();
std::lock_guard<std::mutex> _(mut);
compiled_func[h] = func;
}
});
}
}

auto t = Time::get_time();
compilation_workers.flush();
TI_WARN("Flushing time {:.3f} ms", (Time::get_time() - t) * 1000);

while (!task_queue.empty()) {
auto ker = task_queue.front();
std::string serialized;
auto h = hash(ker.stmt);
if (compiled_func.find(h) == compiled_func.end()) {
compiled_func[h] = CodeGenCPU(ker.kernel, ker.stmt).codegen();
}
compiled_func[h](ker.context);
task_queue.pop_front();
}
Expand Down
31 changes: 18 additions & 13 deletions taichi/program/async_engine.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <deque>
#include <thread>
#include <mutex>
#include <atomic>

#define TI_RUNTIME_HOST
#include "taichi/ir/ir.h"
Expand All @@ -16,14 +17,10 @@ class ParallelExecutor {
public:
using TaskType = std::function<void()>;

enum class ExecutorStatus {
uninitialized,
initialized,
finalized,
};

explicit ParallelExecutor(int num_threads)
: num_threads(num_threads), status(ExecutorStatus::uninitialized) {
: num_threads(num_threads),
status(ExecutorStatus::uninitialized),
running_threads(0) {
auto _ = std::lock_guard<std::mutex>(mut);

for (int i = 0; i < num_threads; i++) {
Expand All @@ -41,7 +38,7 @@ class ParallelExecutor {
void flush() {
while (true) {
std::unique_lock<std::mutex> lock(mut);
if (task_queue.empty()) {
if (task_queue.empty() && running_threads == 0) {
break;
} else {
lock.unlock();
Expand All @@ -66,6 +63,12 @@ class ParallelExecutor {
}

private:
enum class ExecutorStatus {
uninitialized,
initialized,
finalized,
};

void task() {
TI_DEBUG("Starting worker thread.");
while (true) {
Expand All @@ -75,19 +78,20 @@ class ParallelExecutor {
Time::sleep(1e-6);
continue; // wait until initialized
}
if (status == ExecutorStatus::finalized) {
if (status == ExecutorStatus::finalized && task_queue.empty()) {
break; // finalized, exit
}
// initialized and not finalized. Do work.
if (!task_queue.empty()) {
auto task = task_queue.front();
task_queue.pop_front();
running_threads++;
lock.unlock();
// Run the task
task();
running_threads--;
}
}
TI_DEBUG("Exiting worker thread.");
}

int num_threads;
Expand All @@ -96,6 +100,7 @@ class ParallelExecutor {

std::vector<std::thread> threads;
std::deque<TaskType> task_queue;
std::atomic<int> running_threads;
};

class KernelLaunchRecord {
Expand All @@ -112,12 +117,12 @@ class ExecutionQueue {
public:
std::deque<KernelLaunchRecord> task_queue;

std::vector<std::thread> compilation_workers; // parallel
std::thread launch_worker; // serial
ParallelExecutor compilation_workers; // parallel compilation
std::thread launch_worker; // serial launching

std::unordered_map<uint64, FunctionType> compiled_func;

ExecutionQueue() {
ExecutionQueue() : compilation_workers(4) { // TODO: remove 4
}

void enqueue(KernelLaunchRecord ker);
Expand Down
6 changes: 5 additions & 1 deletion taichi/program/program.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ Program *current_program = nullptr;
std::atomic<int> Program::num_instances;

Program::Program(Arch desired_arch) {
TI_TRACE("Program initializing...");
auto arch = desired_arch;
if (arch == Arch::cuda) {
runtime = Runtime::create(arch);
Expand Down Expand Up @@ -122,7 +123,8 @@ Program::Program(Arch desired_arch) {
}
}

TI_TRACE("Program arch={}", arch_name(arch));
TI_TRACE("Program ({}) arch={} initialized.", fmt::ptr(this),
arch_name(arch));
}

FunctionType Program::compile(Kernel &kernel) {
Expand Down Expand Up @@ -470,6 +472,7 @@ Kernel &Program::get_snode_writer(SNode *snode) {
}

void Program::finalize() {
TI_TRACE("Program finalizing...");
if (runtime)
runtime->set_profiler(nullptr);
synchronize();
Expand All @@ -481,6 +484,7 @@ void Program::finalize() {
#endif
finalized = true;
num_instances -= 1;
TI_TRACE("Program ({}) finalized.", fmt::ptr(this));
}

void Program::launch_async(Kernel *kernel) {
Expand Down

0 comments on commit 1c9f669

Please sign in to comment.