Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Infra] Support Timelines as a multithreading profiler #2164

Merged
merged 14 commits into from
Jan 19, 2021
Merged
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ _build
*.bc
*.yml
*.dot
*.json
2 changes: 2 additions & 0 deletions python/taichi/lang/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@
kernel_profiler_clear = lambda: get_runtime().prog.kernel_profiler_clear()
kernel_profiler_total_time = lambda: get_runtime(
).prog.kernel_profiler_total_time()
timeline_clear = lambda: get_runtime().prog.timeline_clear()
timeline_save = lambda fn: get_runtime().prog.timeline_save(fn)

# Legacy API
type_factory_ = core.get_type_factory_instance()
Expand Down
2 changes: 2 additions & 0 deletions taichi/backends/cuda/cuda_driver_functions.inc.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,9 @@ PER_CUDA_FUNCTION(stream_synchronize, cuStreamSynchronize, void *);

// Event management
PER_CUDA_FUNCTION(event_create, cuEventCreate, void **, uint32)
PER_CUDA_FUNCTION(event_destroy, cuEventDestroy, void *)
PER_CUDA_FUNCTION(event_record, cuEventRecord, void *, void *)
PER_CUDA_FUNCTION(event_synchronize, cuEventSynchronize, void *);
PER_CUDA_FUNCTION(event_elapsed_time, cuEventElapsedTime, float *, void *, void *);

// clang-format on
79 changes: 49 additions & 30 deletions taichi/program/async_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include "taichi/program/kernel.h"
#include "taichi/program/program.h"
#include "taichi/system/timeline.h"
#include "taichi/backends/cpu/codegen_cpu.h"
#include "taichi/util/testing.h"
#include "taichi/util/statistics.h"
Expand All @@ -17,8 +18,9 @@

TLANG_NAMESPACE_BEGIN

ParallelExecutor::ParallelExecutor(int num_threads)
: num_threads(num_threads),
ParallelExecutor::ParallelExecutor(const std::string &name, int num_threads)
: name_(name),
num_threads(num_threads),
status(ExecutorStatus::uninitialized),
running_threads(0) {
{
Expand Down Expand Up @@ -69,6 +71,13 @@ bool ParallelExecutor::flush_cv_cond() {

void ParallelExecutor::worker_loop() {
TI_DEBUG("Starting worker thread.");
auto thread_id = thread_counter++;

std::string thread_name = name_;
if (num_threads != 1)
thread_name += fmt::format("_{}", thread_id);
Timeline::get_this_thread_instance().set_name(thread_name);

{
std::unique_lock<std::mutex> lock(mut);
while (status == ExecutorStatus::uninitialized) {
Expand Down Expand Up @@ -114,6 +123,9 @@ void ExecutionQueue::enqueue(const TaskLaunchRecord &ker) {
auto h = ker.ir_handle.hash();
auto *stmt = ker.stmt();
auto kernel = ker.kernel;
// TODO: for now we are using kernel name for task name. It may be helpful to
// use the real task name.
auto kernel_name = kernel->name;

kernel->account_for_offloaded(stmt);

Expand All @@ -132,31 +144,33 @@ void ExecutionQueue::enqueue(const TaskLaunchRecord &ker) {
auto cloned_stmt = ker.ir_handle.clone();
stmt = cloned_stmt->as<OffloadedStmt>();

compilation_workers.enqueue([async_func, stmt, kernel, this]() {
{
// Final lowering
using namespace irpass;

auto config = kernel->program.config;
auto ir = stmt;
offload_to_executable(
ir, config, /*verbose=*/false,
/*lower_global_access=*/true,
/*make_thread_local=*/true,
/*make_block_local=*/
is_extension_supported(config.arch, Extension::bls) &&
config.make_block_local);
}
auto func = this->compile_to_backend_(*kernel, stmt);
async_func->set(func);
});
compilation_workers.enqueue(
[kernel_name, async_func, stmt, kernel, this]() {
TI_TIMELINE(kernel_name);
k-ye marked this conversation as resolved.
Show resolved Hide resolved
// Final lowering
using namespace irpass;

auto config = kernel->program.config;
auto ir = stmt;
offload_to_executable(
ir, config, /*verbose=*/false,
/*lower_global_access=*/true,
/*make_thread_local=*/true,
/*make_block_local=*/
is_extension_supported(config.arch, Extension::bls) &&
config.make_block_local);
auto func = this->compile_to_backend_(*kernel, stmt);
async_func->set(func);
});
ir_bank_->insert_to_trash_bin(std::move(cloned_stmt));
}

launch_worker.enqueue([async_func, context = ker.context]() mutable {
auto func = async_func->get();
func(context);
});
launch_worker.enqueue(
[kernel_name, async_func, context = ker.context]() mutable {
TI_TIMELINE(kernel_name);
auto func = async_func->get();
func(context);
});
}

void ExecutionQueue::synchronize() {
Expand All @@ -167,8 +181,8 @@ void ExecutionQueue::synchronize() {
ExecutionQueue::ExecutionQueue(
IRBank *ir_bank,
const BackendExecCompilationFunc &compile_to_backend)
: compilation_workers(4), // TODO: remove 4
launch_worker(1),
: compilation_workers("compiler", 4), // TODO: remove 4
launch_worker("launcher", 1),
ir_bank_(ir_bank),
compile_to_backend_(compile_to_backend) {
}
Expand All @@ -178,6 +192,7 @@ AsyncEngine::AsyncEngine(Program *program,
: queue(&ir_bank_, compile_to_backend),
program(program),
sfg(std::make_unique<StateFlowGraph>(this, &ir_bank_)) {
Timeline::get_this_thread_instance().set_name("host");
ir_bank_.set_sfg(sfg.get());
}

Expand Down Expand Up @@ -227,6 +242,7 @@ void AsyncEngine::synchronize() {

void AsyncEngine::flush() {
TI_AUTO_PROF;
TI_AUTO_TIMELINE;

bool modified = true;
sfg->reid_nodes();
Expand Down Expand Up @@ -271,10 +287,13 @@ void AsyncEngine::flush() {
sfg->verify();
}
debug_sfg("final");
auto tasks = sfg->extract_to_execute();
TI_TRACE("Ended up with {} nodes", tasks.size());
for (auto &task : tasks) {
queue.enqueue(task);
{
TI_TIMELINE("enqueue");
auto tasks = sfg->extract_to_execute();
TI_TRACE("Ended up with {} nodes", tasks.size());
for (auto &task : tasks) {
queue.enqueue(task);
}
}
flush_counter_++;
}
Expand Down
4 changes: 3 additions & 1 deletion taichi/program/async_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class ParallelExecutor {
public:
using TaskType = std::function<void()>;

explicit ParallelExecutor(int num_threads);
explicit ParallelExecutor(const std::string &name, int num_threads);
~ParallelExecutor();

void enqueue(const TaskType &func);
Expand All @@ -47,7 +47,9 @@ class ParallelExecutor {
// Must be called while holding |mut|.
bool flush_cv_cond();

std::string name_;
int num_threads;
std::atomic<int> thread_counter{0};
std::mutex mut;

// All guarded by |mut|
Expand Down
1 change: 1 addition & 0 deletions taichi/program/compile_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ struct CompileConfig {
bool use_llvm;
bool verbose_kernel_launches;
bool kernel_profiler;
bool timeline{false};
bool verbose;
bool fast_math;
bool async_mode;
Expand Down
62 changes: 58 additions & 4 deletions taichi/program/kernel_profiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "taichi/system/timer.h"
#include "taichi/backends/cuda/cuda_driver.h"
#include "taichi/system/timeline.h"

TLANG_NAMESPACE_BEGIN

Expand Down Expand Up @@ -120,6 +121,35 @@ class KernelProfilerCUDA : public KernelProfilerBase {
CUDADriver::get_instance().event_create(&stop, CU_EVENT_DEFAULT);
CUDADriver::get_instance().event_record(start, 0);
outstanding_events[kernel_name].push_back(std::make_pair(start, stop));

if (!base_event_) {
// Note that CUDA driver API only allows querying relative time difference
// between two events, therefore we need to manually build a mapping
// between GPU and CPU time.
// TODO: periodically reinitialize for more accuracy.
int n_iters = 100;
// Warm up CUDA driver, and use the final event as the base event.
for (int i = 0; i < n_iters; i++) {
void *e;
CUDADriver::get_instance().event_create(&e, CU_EVENT_DEFAULT);
CUDADriver::get_instance().event_record(e, 0);
CUDADriver::get_instance().event_synchronize(e);
auto final_t = Time::get_time();
if (i == n_iters - 1) {
base_event_ = e;
// TODO: figure out a better way to synchronize CPU and GPU time.
constexpr float64 cuda_time_offset = 3e-4;
// Since event recording and synchronization can take 5 us, it's hard
// to exactly measure the real event time. Also note there seems to be
// a systematic time offset on CUDA, so adjust for that using a 3e-4 s
// magic number.
base_time_ = final_t + cuda_time_offset;
} else {
CUDADriver::get_instance().event_destroy(e);
}
}
}

return stop;
#else
TI_NOT_IMPLEMENTED;
Expand All @@ -141,21 +171,41 @@ class KernelProfilerCUDA : public KernelProfilerBase {
void sync() override {
#if defined(TI_WITH_CUDA)
CUDADriver::get_instance().stream_synchronize(nullptr);
auto &timeline = Timeline::get_this_thread_instance();
for (auto &map_elem : outstanding_events) {
auto &list = map_elem.second;
for (auto &item : list) {
auto start = item.first, stop = item.second;
float ms;
CUDADriver::get_instance().event_elapsed_time(&ms, start, stop);
float kernel_time;
CUDADriver::get_instance().event_elapsed_time(&kernel_time, start,
stop);

if (Timelines::get_instance().get_enabled()) {
float time_since_base;
CUDADriver::get_instance().event_elapsed_time(&time_since_base,
base_event_, start);
timeline.insert_event({map_elem.first, true,
base_time_ + time_since_base * 1e-3, "cuda"});
timeline.insert_event(
{map_elem.first, false,
base_time_ + (time_since_base + kernel_time) * 1e-3, "cuda"});
}

auto it = std::find_if(
records.begin(), records.end(),
[&](KernelProfileRecord &r) { return r.name == map_elem.first; });
if (it == records.end()) {
records.emplace_back(map_elem.first);
it = std::prev(records.end());
}
it->insert_sample(ms);
total_time_ms += ms;
it->insert_sample(kernel_time);
total_time_ms += kernel_time;

// TODO: the following two lines seem to increases profiler overhead a
// little bit. Is there a way to avoid the overhead while not creating
// too many events?
CUDADriver::get_instance().event_destroy(start);
CUDADriver::get_instance().event_destroy(stop);
}
}
outstanding_events.clear();
Expand All @@ -168,6 +218,10 @@ class KernelProfilerCUDA : public KernelProfilerBase {
static KernelProfilerCUDA profiler;
return profiler;
}

private:
void *base_event_{nullptr};
float64 base_time_;
};
} // namespace

Expand Down
3 changes: 3 additions & 0 deletions taichi/program/program.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "taichi/backends/metal/struct_metal.h"
#include "taichi/backends/opengl/struct_opengl.h"
#include "taichi/system/unified_allocator.h"
#include "taichi/system/timeline.h"
#include "taichi/ir/snode.h"
#include "taichi/ir/frontend_ir.h"
#include "taichi/program/async_engine.h"
Expand Down Expand Up @@ -217,6 +218,8 @@ Program::Program(Arch desired_arch) {

stat.clear();

Timelines::get_instance().set_enabled(config.timeline);

TI_TRACE("Program ({}) arch={} initialized.", fmt::ptr(this),
arch_name(arch));
}
Expand Down
13 changes: 6 additions & 7 deletions taichi/program/state_flow_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include "taichi/ir/transforms.h"
#include "taichi/program/async_engine.h"
#include "taichi/util/statistics.h"
#include "taichi/system/timeline.h"

// Keep this include in the end!
#include "taichi/program/async_profiler_switch.h"
Expand All @@ -21,13 +22,6 @@ namespace {

using LatestStateReaders = StateFlowGraph::LatestStateReaders;

// TODO(k-ye): remove these over-engineering...
LatestStateReaders::iterator find(LatestStateReaders &m, const AsyncState &s) {
return std::find_if(
m.begin(), m.end(),
[&s](const LatestStateReaders::value_type &v) { return v.first == s; });
}

std::pair<LatestStateReaders::value_type::second_type *, bool> insert(
LatestStateReaders &m,
const AsyncState &s) {
Expand Down Expand Up @@ -420,6 +414,7 @@ void StateFlowGraph::insert_edge(Node *from, Node *to, AsyncState state) {

bool StateFlowGraph::optimize_listgen() {
TI_AUTO_PROF;
TI_AUTO_TIMELINE;
bool modified = false;

std::vector<std::pair<int, int>> common_pairs;
Expand Down Expand Up @@ -813,6 +808,7 @@ std::unordered_set<int> StateFlowGraph::fuse_range(int begin, int end) {

bool StateFlowGraph::fuse() {
TI_AUTO_PROF;
TI_AUTO_TIMELINE;
using bit::Bitset;
// Only guarantee to fuse tasks with indices in nodes_ differ by less than
// kMaxFusionDistance if there are too many tasks.
Expand Down Expand Up @@ -852,6 +848,7 @@ bool StateFlowGraph::fuse() {

void StateFlowGraph::rebuild_graph(bool sort) {
TI_AUTO_PROF;
TI_AUTO_TIMELINE;
if (sort)
topo_sort_nodes();
std::vector<TaskLaunchRecord> tasks;
Expand Down Expand Up @@ -1236,6 +1233,7 @@ void StateFlowGraph::delete_nodes(

bool StateFlowGraph::optimize_dead_store() {
TI_AUTO_PROF
TI_AUTO_TIMELINE;
bool modified = false;

auto nodes = get_pending_tasks();
Expand Down Expand Up @@ -1442,6 +1440,7 @@ void StateFlowGraph::verify(bool also_verify_ir) const {

bool StateFlowGraph::demote_activation() {
TI_AUTO_PROF
TI_AUTO_TIMELINE;
bool modified = false;

topo_sort_nodes();
Expand Down
Loading