Skip to content

Commit

Permalink
Merge pull request #752 from CHIP-SPV/WaitForPerThread
Browse files Browse the repository at this point in the history
Refactor WaitForThreadExit
  • Loading branch information
pvelesko authored Jan 18, 2024
2 parents ffebe4d + f205395 commit b18a3af
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 63 deletions.
30 changes: 18 additions & 12 deletions scripts/check.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,17 @@

# execute a command and return the output along with the return code
def run_cmd(cmd):
cmd_hash = hashlib.md5(cmd.encode()).hexdigest()[0:10]
# get current milliseconds
cur_ms = subprocess.check_output("date +%s%3N", shell=True).decode('utf-8').strip()
cmd_hash = hashlib.sha256(cmd.encode('utf-8')+ cur_ms.encode('utf-8')).hexdigest()

file_name = f"/tmp/{cmd_hash}_cmd.txt"
cmd = f"{cmd} | tee {file_name}"
if args.verbose:
print(f"check.py: {cmd}")
if args.dry_run:
print(cmd)
return "", 0
return_code = subprocess.call(cmd, shell=True)
with open(file_name, "r") as f:
return f.read(), return_code
Expand Down Expand Up @@ -86,8 +92,15 @@ def run_cmd(cmd):

cmd = f"{modules} {env_vars} ./hipInfo"
out, _ = run_cmd(cmd)
texture_support = 0 < int(out.split("maxTexture1DLinear:")[1].split("\n")[0].strip())
double_support = 0 < int(out.split("arch.hasDoubles:")[1].split("\n")[0].strip())

texture_support = False
if ("maxTexture1DLinear:" in out):
texture_support = 0 < int(out.split("maxTexture1DLinear:")[1].split("\n")[0].strip())

double_support = False
if ("arch.hasDoubles:" in out):
double_support = 0 < int(out.split("arch.hasDoubles:")[1].split("\n")[0].strip())

if double_support:
double_cmd = ""
else:
Expand All @@ -104,12 +117,7 @@ def run_cmd(cmd):
cmd_graph = f"{modules} {env_vars} ctest --output-on-failure --timeout {args.timeout} --repeat until-fail:{args.num_tries} -j 100 -E \"`cat ./test_lists/{args.device_type}_{args.backend}_failed_{level0_cmd_list}tests.txt`{texture_cmd}{double_cmd}\" -R \"[Gg]raph\" -O checkpy_{args.device_type}_{args.backend}_graph.txt"
cmd_single = f"{modules} {env_vars} ctest --output-on-failure --timeout {args.timeout} --repeat until-fail:{args.num_tries} -j 1 -E \"`cat ./test_lists/{args.device_type}_{args.backend}_failed_{level0_cmd_list}tests.txt`{texture_cmd}{double_cmd}\" -R \"`cat ./test_lists/non_parallel_tests.txt`\" -O checkpy_{args.device_type}_{args.backend}_single.txt"
cmd_other = f"{modules} {env_vars} ctest --output-on-failure --timeout {args.timeout} --repeat until-fail:{args.num_tries} -j {args.num_threads} -E \"`cat ./test_lists/{args.device_type}_{args.backend}_failed_{level0_cmd_list}tests.txt`{texture_cmd}{double_cmd}|deviceFunc|[Gg]raph|`cat ./test_lists/non_parallel_tests.txt`\" -O checkpy_{args.device_type}_{args.backend}_other.txt"
if(args.dry_run):
print(cmd_deviceFunc)
print(cmd_graph)
print(cmd_single)
print(cmd_other)
exit(0)

res_deviceFunc, err = run_cmd(cmd_deviceFunc)
res_graph, err = run_cmd(cmd_graph)
res_single, err = run_cmd(cmd_single)
Expand All @@ -121,9 +129,7 @@ def run_cmd(cmd):
exit(1)
else:
cmd = f"{modules} {env_vars} ctest --output-on-failure --timeout {args.timeout} --repeat until-fail:{args.num_tries} -j {args.num_threads} -E \"`cat ./test_lists/{args.device_type}_{args.backend}_failed_{level0_cmd_list}tests.txt`{texture_cmd}\" -O checkpy_{args.device_type}_{args.backend}.txt"
if(args.dry_run):
print(cmd)
exit(0)

res, err = run_cmd(cmd)
if "0 tests failed" in res:
exit(0)
Expand Down
85 changes: 38 additions & 47 deletions src/CHIPBackend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -510,6 +510,12 @@ chipstar::Device::Device(chipstar::Context *Ctx, int DeviceIdx)
chipstar::Device::~Device() {
LOCK(DeviceMtx); // chipstar::Device::ChipQueues_
logDebug("~Device() {}", (void *)this);

// Call finish() for PerThreadDefaultQueue to ensure that all
// outstanding work items are completed.
if (PerThreadDefaultQueue)
PerThreadDefaultQueue->finish();

while (this->ChipQueues_.size() > 0) {
delete ChipQueues_[0];
ChipQueues_.erase(ChipQueues_.begin());
Expand Down Expand Up @@ -545,11 +551,6 @@ bool chipstar::Device::isPerThreadStreamUsedNoLock() {
return PerThreadStreamUsed_;
}

void chipstar::Device::setPerThreadStreamUsed(bool Status) {
LOCK(DeviceMtx); // chipstar::Device::PerThreadStreamUsed
PerThreadStreamUsed_ = Status;
}

chipstar::Queue *chipstar::Device::getPerThreadDefaultQueue() {
LOCK(DeviceMtx); // chipstar::Device::PerThreadStreamUsed
return getPerThreadDefaultQueueNoLock();
Expand All @@ -563,6 +564,11 @@ chipstar::Queue *chipstar::Device::getPerThreadDefaultQueueNoLock() {
PerThreadDefaultQueue->setDefaultPerThreadQueue(true);
PerThreadStreamUsed_ = true;
PerThreadDefaultQueue.get()->PerThreadQueueForDevice = this;

// use an atomic operation to increment NumQueuesAlive
// this is used to track the number of threads created
// and to delete the queue when the last thread is destroyed
NumQueuesAlive.fetch_add(1, std::memory_order_relaxed);
}

return PerThreadDefaultQueue.get();
Expand Down Expand Up @@ -1145,17 +1151,6 @@ hipError_t chipstar::Context::free(void *Ptr) {

// Backend
//*************************************************************************************
int chipstar::Backend::getPerThreadQueuesActive() {
LOCK(
::Backend->BackendMtx); // Prevent adding/removing devices while iterating
int Active = 0;
for (auto Dev : getDevices()) {
if (Dev->isPerThreadStreamUsed()) {
Active++;
}
}
return Active;
}
int chipstar::Backend::getQueuePriorityRange() {
assert(MinQueuePriority_);
return MinQueuePriority_;
Expand Down Expand Up @@ -1189,30 +1184,36 @@ void chipstar::Backend::trackEvent(
}

void chipstar::Backend::waitForThreadExit() {
/**
* If the main thread just creates a bunch of other threads and tries to exit
* right away, it could be the case that all those threads are not yet done
* with initialization. In particular, these threads might not have yet
* created their per-thread queues which is how we keep track of threads.
*
* So we just wait for 0.5 seconds before starting to check for thread exit.
*/
// first, we must delay the main thread so that at least all other threads
// have gotten past
// libCHIP.so!chipstar::Device::getPerThreadDefaultQueueNoLock
// libCHIP.so!chipstar::Backend::findQueue
// libCHIP.so!hipMemcpyAsyncInternal
// libCHIP.so!hipMemcpyAsync
pthread_yield();
// TODO fix-255 is there a better way to do this?
unsigned long long int sleepMicroSeconds = 500000;
usleep(sleepMicroSeconds);

while (true) {
{
auto NumPerThreadQueuesActive = ::Backend->getPerThreadQueuesActive();
if (!NumPerThreadQueuesActive)
// go through all devices checking their NumQueuesAlive until all they're all
// 1 or 0 (0 would indicate that hipStreamPerThread was never used and 1 would
// indicate main thread used hipStreamPerThread)
bool AllThreadsExited = false;
while (!AllThreadsExited) {
AllThreadsExited = true;
for (auto &Dev : getDevices()) {
if (Dev->NumQueuesAlive.load(std::memory_order_relaxed) > 1) {
AllThreadsExited = false;
break;
}
}

logDebug("Backend::waitForThreadExit() per-thread queues still active "
"{}. Sleeping for 1s..",
NumPerThreadQueuesActive);
// logDebug and wait for 1 second
if (!AllThreadsExited) {
logWarn("Waiting for all per-thread queues to exit... This condition "
"would indicate that the main thread didn't call "
"join()");
sleep(1);
}
sleep(1);
}

// Cleanup all queues
Expand Down Expand Up @@ -1458,9 +1459,10 @@ chipstar::Queue::Queue(chipstar::Device *ChipDevice, chipstar::QueueFlags Flags)

chipstar::Queue::~Queue() {
updateLastEvent(nullptr);
if (PerThreadQueueForDevice) {
PerThreadQueueForDevice->setPerThreadStreamUsed(false);
}

// atomic decrement for number of threads alive
if (this->isDefaultPerThreadQueue())
this->ChipDevice_->NumQueuesAlive.fetch_sub(1, std::memory_order_relaxed);
};

std::vector<std::shared_ptr<chipstar::Event>>
Expand Down Expand Up @@ -1498,17 +1500,6 @@ chipstar::Queue::getSyncQueuesLastEvents() {
}
}

// if (this->isDefaultLegacyQueue()) {
// // add LastEvent from all other blocking streams
// for (auto &q : Dev->getQueuesNoLock()) {
// if (q->getQueueFlags().isBlocking()) {
// auto Ev = q->getLastEvent();
// if (Ev)
// EventsToWaitOn.push_back(Ev);
// }
// }
// }

return EventsToWaitOn;
}

Expand Down
14 changes: 12 additions & 2 deletions src/CHIPBackend.hh
Original file line number Diff line number Diff line change
Expand Up @@ -1284,6 +1284,10 @@ protected:
bool PerThreadStreamUsed_ = false;

public:
// atomic int for counting number of threads that were created
// for this device
std::atomic<int> NumQueuesAlive = 0;

hipDeviceProp_t getDeviceProps() { return HipDeviceProps_; }
std::mutex DeviceVarMtx;
std::mutex DeviceMtx;
Expand Down Expand Up @@ -1312,7 +1316,6 @@ public:

bool isPerThreadStreamUsed();
bool isPerThreadStreamUsedNoLock();
void setPerThreadStreamUsed(bool Status);

/**
* @brief Get the Default Queue object. If HIP_API_PER_THREAD_DEFAULT_STREAM
Expand Down Expand Up @@ -1806,7 +1809,6 @@ public:
size_t SharedMem,
hipStream_t ChipQueue) = 0;

int getPerThreadQueuesActive();
std::mutex SetActiveMtx;
std::mutex QueueCreateDestroyMtx;
mutable std::mutex BackendMtx;
Expand Down Expand Up @@ -1892,6 +1894,14 @@ public:
/**
* @brief Wait for all per-thread queues to finish
*
* If the main thread just creates a bunch of other threads and tries to exit
* right away without calling join(), destroying the backend would cause a
* crash as those other threads still rely on the shared backend to be alive.
* This function waits for all threads to exit before destroying the backend.
*
* This is done by each Queue constructor incrementing NumQueuesAlive
* atomically and each Queue destructor decrementing it.
*
*/
void waitForThreadExit();
/**
Expand Down
5 changes: 3 additions & 2 deletions src/backend/Level0/CHIPBackendLevel0.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1663,8 +1663,9 @@ chipstar::ExecItem *CHIPBackendLevel0::createExecItem(dim3 GirdDim,
return ExecItem;
};

std::shared_ptr<chipstar::Event> CHIPBackendLevel0::createEventShared(
chipstar::Context *ChipCtx, chipstar::EventFlags Flags) {
std::shared_ptr<chipstar::Event>
CHIPBackendLevel0::createEventShared(chipstar::Context *ChipCtx,
chipstar::EventFlags Flags) {
std::shared_ptr<chipstar::Event> Event;

auto ZeCtx = (CHIPContextLevel0 *)ChipCtx;
Expand Down
1 change: 1 addition & 0 deletions tests/runtime/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ add_hip_runtime_test(TestStlFunctionsDouble.hip)
add_hip_runtime_test(TestHIPMathFunctions.hip)
add_hip_runtime_test(TestAtomics.hip)
add_hip_runtime_test(TestIndirectMappedHostAlloc.hip)
add_hip_runtime_test(TestThreadDetachCleanup.cpp)

add_shell_test(TestAssert.bash)
add_shell_test(TestAssertFail.bash)
Expand Down
33 changes: 33 additions & 0 deletions tests/runtime/TestThreadDetachCleanup.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#include <iostream>
#include <thread>
#include "hip/hip_runtime.h"

static constexpr int numThreads = 1000;

void threadFunction(int threadId) {
int src = threadId; // Source data unique to each thread
int dst;

// Use hipStreamPerThread to get a per-thread default stream
hipMemcpyAsync(&dst, &src, sizeof(int), hipMemcpyHostToHost,
hipStreamPerThread);
// std::cout << "Thread " << threadId << " completed memcpy." << std::endl;
}

int main() {
std::thread threads[numThreads];

for (int i = 0; i < numThreads; ++i) {
threads[i] = std::thread(threadFunction, i);
}

// Main thread exits without waiting for the threads to finish
std::cout << "Main thread exiting." << std::endl;

// Threads are detached, allowing them to continue independently
for (int i = 0; i < numThreads; ++i) {
threads[i].detach();
}

return 0;
}

0 comments on commit b18a3af

Please sign in to comment.