Skip to content

Commit

Permalink
Fixed: Jobs deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
richardbiely committed Nov 23, 2024
1 parent 75a35cf commit e49868f
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 195 deletions.
13 changes: 11 additions & 2 deletions include/gaia/mt/jobcommon.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,29 @@
namespace gaia {
namespace mt {
enum class JobPriority : uint8_t {
//! High priority job. If available it should target the CPU's performance cores
//! High priority job. If available it should target the CPU's performance cores.
High = 0,
//! Low priority job. If available it should target the CPU's efficiency cores
//! Low priority job. If available it should target the CPU's efficiency cores.
Low = 1
};
static inline constexpr uint32_t JobPriorityCnt = 2;

enum JobCreationFlags : uint8_t {
None = 0,
//! The job is automatically deleted after it is executed. Can't be waited on.
AutoDelete = 0x01,
//! The job can wait for other job (one not set as dependency).
CanWait = 0x02
};

struct JobAllocCtx {
JobPriority priority;
};

struct Job {
std::function<void()> func;
JobPriority priority = JobPriority::High;
JobCreationFlags flags = JobCreationFlags::AutoDelete;
};

struct JobArgs {
Expand Down
13 changes: 7 additions & 6 deletions include/gaia/mt/jobmanager.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ namespace gaia {
std::atomic_uint32_t state;
//! Job priority
JobPriority prio;
//! If the the job can be waited on, false otherwise.
bool canWait;
//! Job flags
JobCreationFlags flags;
//! Dependency graph
JobEdges edges;
//! Function to execute when running the job
Expand All @@ -84,7 +84,7 @@ namespace gaia {
JobContainer(const JobContainer& other): cnt::ilist_item(other) {
state = other.state.load();
prio = other.prio;
canWait = other.canWait;
flags = other.flags;
edges = other.edges;
func = other.func;
}
Expand All @@ -93,7 +93,7 @@ namespace gaia {
cnt::ilist_item::operator=(other);
state = other.state.load();
prio = other.prio;
canWait = other.canWait;
flags = other.flags;
edges = other.edges;
func = other.func;
return *this;
Expand All @@ -102,7 +102,7 @@ namespace gaia {
JobContainer(JobContainer&& other): cnt::ilist_item(GAIA_MOV(other)) {
state = other.state.load();
prio = other.prio;
canWait = other.canWait;
flags = other.flags;
func = GAIA_MOV(other.func);

// if (edges.depCnt > 0)
Expand All @@ -115,7 +115,7 @@ namespace gaia {
cnt::ilist_item::operator=(GAIA_MOV(other));
state = other.state.load();
prio = other.prio;
canWait = other.canWait;
flags = other.flags;
func = GAIA_MOV(other.func);

// if (edges.depCnt > 0)
Expand Down Expand Up @@ -171,6 +171,7 @@ namespace gaia {
j.prio = ctx.priority;
j.state.store(0);
j.func = job.func;
j.flags = job.flags;
return handle;
}

Expand Down
118 changes: 62 additions & 56 deletions include/gaia/mt/threadpool.h
Original file line number Diff line number Diff line change
Expand Up @@ -242,41 +242,32 @@ namespace gaia {
//! Creates a threadpool job from \param job.
//! \warning Must be used from the main thread.
//! \return Job handle of the scheduled job.
JobHandle add(Job& job) {
template <typename TJob>
JobHandle add(TJob&& job) {
GAIA_ASSERT(main_thread());

// Don't add new jobs once stop was requested
if GAIA_UNLIKELY (m_stop)
return JobNull;
// Allocs are done only from the main thread while there are no jobs running.
// Freeing can happen at any point from any thread. Therefore, we need to lock this point.

job.priority = final_prio(job);

// auto& mtx = GAIA_PROF_EXTRACT_MUTEX(std::mutex, m_jobAllocMtx);
// std::lock_guard lock(mtx);
return m_jobManager.alloc_job(job);
auto& mtx = GAIA_PROF_EXTRACT_MUTEX(std::mutex, m_jobAllocMtx);
std::lock_guard lock(mtx);
return m_jobManager.alloc_job(GAIA_FWD(job));
}

//! Deletes a job handle \param jobHandle from the threadpool.
//! \warning Must be used from the main thread.
//! \warning Job handle must not be used by any worker thread and can not be used
//! by any active job handles as a dependency.
// TODO: Figure out how to do this automatically without user intervention.
// Only the private free_job should be used.
void del(JobHandle jobHandle) {
m_jobManager.free_job(jobHandle);
}
private:
void add_n(JobPriority prio, std::span<JobHandle> jobHandles) {
GAIA_ASSERT(main_thread());
// Allocs are done only from the main thread while there are no jobs running.
// Freeing can happen at any point from any thread. Therefore, we need to lock this point.

//! Deletes job handles \param jobHandles from the threadpool.
//! \warning Must be used from the main thread.
//! \warning Job handles must not be used by any worker thread and can not be used
//! by any active job handles as a dependency.
// TODO: Figure out how to do this automatically without user intervention.
// Only the private free_job should be used.
void del(std::span<JobHandle> jobHandles) {
for (auto jobHandle: jobHandles)
m_jobManager.free_job(jobHandle);
auto& mtx = GAIA_PROF_EXTRACT_MUTEX(std::mutex, m_jobAllocMtx);
std::lock_guard lock(mtx);
for (auto& jobHandles: jobHandles)
jobHandles = m_jobManager.alloc_job({{}, prio, JobCreationFlags::AutoDelete});
}

public:
//! Pushes \param jobHandles into the internal queue so worker threads
//! can pick them up and execute them.
//! If there are more jobs than the queue can handle it puts the calling
Expand Down Expand Up @@ -399,32 +390,42 @@ namespace gaia {
// is not worth of being scheduled via sched_par. However, we can never know for sure what
// the reason for that is so let's stay silent.
if (jobs == 1) {
auto handle = m_jobManager.alloc_job({groupJobFunc, prio});
auto handle = add(Job{groupJobFunc, prio, JobCreationFlags::None});
submit(handle);
return handle;
}

// Multiple jobs need to be parallelized.
// Create a sync job and assign it as their dependency.
auto* pHandles = (JobHandle*)alloca(sizeof(JobHandle) * (jobs + 1));
std::span<JobHandle> handles(pHandles, jobs + 1);

// Sync job
auto syncHandle = m_jobManager.alloc_job({{}, prio});
GAIA_ASSERT(m_jobManager.is_clear(syncHandle));
add_n(prio, handles);

#if GAIA_ASSERT_ENABLED
for (auto jobHandle: handles)
GAIA_ASSERT(m_jobManager.is_clear(jobHandle));
#endif

// Work jobs
for (jobIndex = 0; jobIndex < jobs; ++jobIndex) {
pHandles[jobIndex] = m_jobManager.alloc_job({groupJobFunc, prio});
GAIA_ASSERT(m_jobManager.is_clear(pHandles[jobIndex]));
auto& jobData = m_jobManager.data(pHandles[jobIndex]);
jobData.func = groupJobFunc;
jobData.prio = prio;
}
// Sync job
{
auto& jobData = m_jobManager.data(pHandles[jobs]);
jobData.prio = prio;
jobData.flags = JobCreationFlags::None;
}
pHandles[jobs] = syncHandle;

// Assign the sync jobs as a dependency for work jobs
dep(std::span(pHandles, jobs), pHandles[jobs]);
dep(handles.subspan(0, jobs), pHandles[jobs]);

// Sumbit the jobs to the threadpool.
// This is a point of no return. After this point no more changes to jobs are possible.
submit(std::span(pHandles, jobs + 1));
submit(handles);
return pHandles[jobs];
}

Expand All @@ -442,16 +443,16 @@ namespace gaia {

// Waiting for a job that has not been initialized is nonsense.
GAIA_ASSERT(state != 0);
// Can't wait for an auto-delete job
GAIA_ASSERT((jobData.flags & JobCreationFlags::AutoDelete) == 0);

// Wait until done
for (; (state & JobState::STATE_BITS_MASK) != JobState::Done; state = jobData.state.load()) {
// The job we are waiting for is not finished yet, try running some other job in the meantime
JobHandle otherJobHandle;
if (try_fetch_job(*ctx, otherJobHandle)) {
if (run(otherJobHandle, ctx)) {
free_job(otherJobHandle);
if (run(otherJobHandle, ctx))
continue;
}
}

// The job we are waiting for is already running.
Expand All @@ -474,7 +475,7 @@ namespace gaia {
}

// Deallocate the job itself
free_job(jobHandle);
del(jobHandle);
}

//! Uses the main thread to help with jobs processing.
Expand Down Expand Up @@ -784,7 +785,7 @@ namespace gaia {
break;

if (run(jobHandle, &ctx))
free_job(jobHandle);
del(jobHandle);
}
}

Expand Down Expand Up @@ -839,8 +840,7 @@ namespace gaia {
if (!try_fetch_job(ctx, jobHandle))
break;

if (run(jobHandle, detail::tl_workerCtx))
free_job(jobHandle);
(void)run(jobHandle, detail::tl_workerCtx);
}

// Check if the worker can keep running
Expand All @@ -866,8 +866,7 @@ namespace gaia {
// Finish remaining jobs
JobHandle jobHandle;
while (try_fetch_job(*ctx, jobHandle)) {
if (run(jobHandle, ctx))
free_job(jobHandle);
run(jobHandle, ctx);
}

detail::tl_workerCtx = nullptr;
Expand All @@ -891,12 +890,16 @@ namespace gaia {
}

private:
void free_job([[maybe_unused]] JobHandle jobHandle) {
//! Deletes a job handle \param jobHandle from the threadpool.
//! \warning Job handle must not be used by any worker thread and can not be used
//! by any active job handles as a dependency.
void del([[maybe_unused]] JobHandle jobHandle) {
// Allocs are done only from the main thread while there are no jobs running.
// Freeing can happen at any point from any thread. Therefore, we need to lock it.
// auto& mtx = GAIA_PROF_EXTRACT_MUTEX(std::mutex, m_jobAllocMtx);
// std::lock_guard lock(mtx);
// m_jobManager.free_job(jobHandle);
// Freeing can happen at any point from any thread. Therefore, we need to lock this point.

auto& mtx = GAIA_PROF_EXTRACT_MUTEX(std::mutex, m_jobAllocMtx);
std::lock_guard lock(mtx);
m_jobManager.free_job(jobHandle);
}

void signal_edges(JobContainer& jobData) {
Expand Down Expand Up @@ -959,10 +962,9 @@ namespace gaia {
// need to wait for N jobs, rather than waiting for each of them
// separately you make them a dependency of a dummy/sync job and
// wait just for that one.
if (!jobData.func.operator bool()) {
if (run(handle, ctx))
free_job(handle);
} else
if (!jobData.func.operator bool())
(void)run(handle, ctx);
else
pHandles[handlesCnt++] = handle;
}

Expand Down Expand Up @@ -990,8 +992,7 @@ namespace gaia {
handles = handles.subspan(pushed);
if (!handles.empty()) {
// The queue was full. Execute the job right away.
if (run(handles[0], ctx))
free_job(handles[0]);
run(handles[0], ctx);
handles = handles.subspan(1);
}
}
Expand All @@ -1002,7 +1003,9 @@ namespace gaia {
return false;

auto& jobData = m_jobManager.data(jobHandle);
const bool canWait = jobData.canWait;
const bool autoDelete = (jobData.flags & JobCreationFlags::AutoDelete) != 0U;
const bool canWait = (jobData.flags & JobCreationFlags::CanWait) != 0U;

m_jobManager.executing(jobData, ctx->workerIdx);

if (m_blockedInWorkUntil.load() != 0) {
Expand All @@ -1027,6 +1030,9 @@ namespace gaia {
Futex::wake(pFutexValue, detail::WaitMaskAll);
}

if (autoDelete)
del(jobHandle);

return true;
}
};
Expand Down
Loading

0 comments on commit e49868f

Please sign in to comment.