From 1c9f669a792c7d9a8c676dbdc98e065d46b69b3c Mon Sep 17 00:00:00 2001 From: Yuanming Hu Date: Sat, 18 Apr 2020 21:19:43 -0400 Subject: [PATCH] [async] Parallel compilation infrastructure (#816) --- misc/benchmark_parallel_compilation.py | 71 ++++++++++++++++++++++++++ taichi/llvm/llvm_context.h | 4 +- taichi/program/async_engine.cpp | 29 +++++++++-- taichi/program/async_engine.h | 31 ++++++----- taichi/program/program.cpp | 6 ++- 5 files changed, 122 insertions(+), 19 deletions(-) create mode 100644 misc/benchmark_parallel_compilation.py diff --git a/misc/benchmark_parallel_compilation.py b/misc/benchmark_parallel_compilation.py new file mode 100644 index 0000000000000..24e31d16e6c53 --- /dev/null +++ b/misc/benchmark_parallel_compilation.py @@ -0,0 +1,71 @@ +# This file has a kernel with 16 equal offloaded tasks. + +import taichi as ti +ti.init(arch=ti.x64) +quality = 1 # Use a larger value for higher-res simulations +n_particles, n_grid = 9000 * quality**2, 128 * quality +dx, inv_dx = 1 / n_grid, float(n_grid) +dt = 1e-4 / quality +p_vol, p_rho = (dx * 0.5)**2, 1 +p_mass = p_vol * p_rho +E, nu = 0.1e4, 0.2 # Young's modulus and Poisson's ratio +mu_0, lambda_0 = E / (2 * (1 + nu)), E * nu / ( + (1 + nu) * (1 - 2 * nu)) # Lame parameters +x = ti.Vector(2, dt=ti.f32, shape=n_particles) # position +v = ti.Vector(2, dt=ti.f32, shape=n_particles) # velocity +C = ti.Matrix(2, 2, dt=ti.f32, shape=n_particles) # affine velocity field +F = ti.Matrix(2, 2, dt=ti.f32, shape=n_particles) # deformation gradient +material = ti.var(dt=ti.i32, shape=n_particles) # material id +Jp = ti.var(dt=ti.f32, shape=n_particles) # plastic deformation +grid_v = ti.Vector(2, dt=ti.f32, + shape=(n_grid, n_grid)) # grid node momentum/velocity +grid_m = ti.var(dt=ti.f32, shape=(n_grid, n_grid)) # grid node mass + + +@ti.kernel +def substep(): + for K in ti.static(range(16)): + for p in x: + base = (x[p] * inv_dx - 0.5).cast(int) + fx = x[p] * inv_dx - base.cast(float) + w = [ + 0.5 * ti.sqr(1.5 - fx), 0.75 - ti.sqr(fx - 1), + 0.5 * ti.sqr(fx - 0.5) + ] + F[p] = (ti.Matrix.identity(ti.f32, 2) + dt * C[p]) @ F[p] + h = ti.exp(10 * (1.0 - Jp[p])) + if material[p] == 1: + h = 0.3 + mu, la = mu_0 * h, lambda_0 * h + if material[p] == 0: # liquid + mu = 0.0 + U, sig, V = ti.svd(F[p]) + J = 1.0 + for d in ti.static(range(2)): + new_sig = sig[d, d] + if material[p] == 2: # Snow + new_sig = min(max(sig[d, d], 1 - 2.5e-2), 1 + 4.5e-3) + Jp[p] *= sig[d, d] / new_sig + sig[d, d] = new_sig + J *= new_sig + if material[p] == 0: + F[p] = ti.Matrix.identity(ti.f32, 2) * ti.sqrt(J) + elif material[p] == 2: + F[p] = U @ sig @ V.T() + stress = 2 * mu * (F[p] - U @ V.T()) @ F[p].T( + ) + ti.Matrix.identity(ti.f32, 2) * la * J * (J - 1) + stress = (-dt * p_vol * 4 * inv_dx * inv_dx) * stress + affine = stress + p_mass * C[p] + for i, j in ti.static(ti.ndrange(3, 3)): + offset = ti.Vector([i, j]) + dpos = (offset.cast(float) - fx) * dx + weight = w[i][0] * w[j][1] + grid_v[base + + offset] += weight * (p_mass * v[p] + affine @ dpos) + grid_m[base + offset] += weight * p_mass + + +substep() + +ti.profiler_print() +ti.core.print_profile_info() diff --git a/taichi/llvm/llvm_context.h b/taichi/llvm/llvm_context.h index 8e682598210c5..4975d95f74380 100644 --- a/taichi/llvm/llvm_context.h +++ b/taichi/llvm/llvm_context.h @@ -4,10 +4,11 @@ // in charge of creating & JITing arch-specific LLVM modules, // and invoking compiled functions (kernels). +#include #include #include "taichi/lang_util.h" -#include "llvm_fwd.h" +#include "taichi/llvm/llvm_fwd.h" #include "taichi/ir/snode.h" #include "taichi/jit/jit_session.h" @@ -20,6 +21,7 @@ class TaichiLLVMContext { std::unique_ptr jit; std::unique_ptr runtime_module, struct_module; JITModule *runtime_jit_module; + std::mutex mut; Arch arch; SNodeAttributes snode_attr; diff --git a/taichi/program/async_engine.cpp b/taichi/program/async_engine.cpp index 91fe77f6429bd..e2cbbba99fd09 100644 --- a/taichi/program/async_engine.cpp +++ b/taichi/program/async_engine.cpp @@ -31,13 +31,34 @@ void ExecutionQueue::enqueue(KernelLaunchRecord ker) { } void ExecutionQueue::synchronize() { + TI_INFO("Flushing execution queue with {} tasks", task_queue.size()); + std::mutex mut; + + std::unordered_set to_be_compiled; + + for (int i = 0; i < (int)task_queue.size(); i++) { + auto ker = task_queue[i]; + auto h = hash(ker.stmt); + if (compiled_func.find(h) == compiled_func.end() && + to_be_compiled.find(h) == to_be_compiled.end()) { + to_be_compiled.insert(h); + compilation_workers.enqueue([&, ker, h, this]() { + { + auto func = CodeGenCPU(ker.kernel, ker.stmt).codegen(); + std::lock_guard _(mut); + compiled_func[h] = func; + } + }); + } + } + + auto t = Time::get_time(); + compilation_workers.flush(); + TI_WARN("Flushing time {:.3f} ms", (Time::get_time() - t) * 1000); + while (!task_queue.empty()) { auto ker = task_queue.front(); - std::string serialized; auto h = hash(ker.stmt); - if (compiled_func.find(h) == compiled_func.end()) { - compiled_func[h] = CodeGenCPU(ker.kernel, ker.stmt).codegen(); - } compiled_func[h](ker.context); task_queue.pop_front(); } diff --git a/taichi/program/async_engine.h b/taichi/program/async_engine.h index c11ebcc518355..715c0bbc84d3c 100644 --- a/taichi/program/async_engine.h +++ b/taichi/program/async_engine.h @@ -1,6 +1,7 @@ #include #include #include +#include #define TI_RUNTIME_HOST #include "taichi/ir/ir.h" @@ -16,14 +17,10 @@ class ParallelExecutor { public: using TaskType = std::function; - enum class ExecutorStatus { - uninitialized, - initialized, - finalized, - }; - explicit ParallelExecutor(int num_threads) - : num_threads(num_threads), status(ExecutorStatus::uninitialized) { + : num_threads(num_threads), + status(ExecutorStatus::uninitialized), + running_threads(0) { auto _ = std::lock_guard(mut); for (int i = 0; i < num_threads; i++) { @@ -41,7 +38,7 @@ class ParallelExecutor { void flush() { while (true) { std::unique_lock lock(mut); - if (task_queue.empty()) { + if (task_queue.empty() && running_threads == 0) { break; } else { lock.unlock(); @@ -66,6 +63,12 @@ class ParallelExecutor { } private: + enum class ExecutorStatus { + uninitialized, + initialized, + finalized, + }; + void task() { TI_DEBUG("Starting worker thread."); while (true) { @@ -75,19 +78,20 @@ class ParallelExecutor { Time::sleep(1e-6); continue; // wait until initialized } - if (status == ExecutorStatus::finalized) { + if (status == ExecutorStatus::finalized && task_queue.empty()) { break; // finalized, exit } // initialized and not finalized. Do work. if (!task_queue.empty()) { auto task = task_queue.front(); task_queue.pop_front(); + running_threads++; lock.unlock(); // Run the task task(); + running_threads--; } } - TI_DEBUG("Exiting worker thread."); } int num_threads; @@ -96,6 +100,7 @@ class ParallelExecutor { std::vector threads; std::deque task_queue; + std::atomic running_threads; }; class KernelLaunchRecord { @@ -112,12 +117,12 @@ class ExecutionQueue { public: std::deque task_queue; - std::vector compilation_workers; // parallel - std::thread launch_worker; // serial + ParallelExecutor compilation_workers; // parallel compilation + std::thread launch_worker; // serial launching std::unordered_map compiled_func; - ExecutionQueue() { + ExecutionQueue() : compilation_workers(4) { // TODO: remove 4 } void enqueue(KernelLaunchRecord ker); diff --git a/taichi/program/program.cpp b/taichi/program/program.cpp index 0639e5d9256a0..cf45c6e4e6c3d 100644 --- a/taichi/program/program.cpp +++ b/taichi/program/program.cpp @@ -40,6 +40,7 @@ Program *current_program = nullptr; std::atomic Program::num_instances; Program::Program(Arch desired_arch) { + TI_TRACE("Program initializing..."); auto arch = desired_arch; if (arch == Arch::cuda) { runtime = Runtime::create(arch); @@ -122,7 +123,8 @@ Program::Program(Arch desired_arch) { } } - TI_TRACE("Program arch={}", arch_name(arch)); + TI_TRACE("Program ({}) arch={} initialized.", fmt::ptr(this), + arch_name(arch)); } FunctionType Program::compile(Kernel &kernel) { @@ -470,6 +472,7 @@ Kernel &Program::get_snode_writer(SNode *snode) { } void Program::finalize() { + TI_TRACE("Program finalizing..."); if (runtime) runtime->set_profiler(nullptr); synchronize(); @@ -481,6 +484,7 @@ void Program::finalize() { #endif finalized = true; num_instances -= 1; + TI_TRACE("Program ({}) finalized.", fmt::ptr(this)); } void Program::launch_async(Kernel *kernel) {