Skip to content

Commit

Permalink
Update threadpool. Add ThreadLevel parameter.
Browse files Browse the repository at this point in the history
Signed-off-by: jpsdr <jpsdr.psx@free.fr>
  • Loading branch information
jpsdr committed May 29, 2019
1 parent e0a13d3 commit 29de861
Show file tree
Hide file tree
Showing 9 changed files with 254 additions and 118 deletions.
18 changes: 15 additions & 3 deletions nnedi3 - Readme.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
|
nnedi3 for Avisynth by tritical |
modified by JPSDR |
v0.9.4.51 (27/05/2018) |
v0.9.4.52 (30/05/2019) |
HELP FILE |
-----------------------------------------------------------------------------------------------------
-----------------------------------------------------------------------------------------------------
Expand All @@ -21,13 +21,13 @@ INFO:

nnedi3(int field, bool dh, bool Y, bool U, bool V, int nsize, int nns, int qual, int etype,
int pscrn, int threads, int opt, int fapprox, bool logicalCores, bool MaxPhysCore, bool SetAffinity,
bool A, bool sleep, int prefetch, int range)
bool A, bool sleep, int prefetch, int range,int ThreadLevel)

nnedi3_rpow2(int rfactor, int nsize, int nns, int qual, int etype, int pscrn, string cshift,
int fwidth, int fheight, float ep0, float ep1, int threads, int opt, int fapprox,
bool csresize, bool mpeg2,bool logicalCores, bool MaxPhysCore, bool SetAffinity,
int threads_rs,bool logicalCores_rs, bool MaxPhysCore_rs, bool SetAffinity_rs,
bool sleep, int prefetch, int range)
bool sleep, int prefetch, int range,int ThreadLevel)



Expand Down Expand Up @@ -272,6 +272,18 @@ PARAMETERS (nnedi3):

Default: 1

ThreadLevel -
This parameter will set the priority level of the threads created for the processing (internal
multithreading). No effect if threads=1.
1 : Idle level.
2 : Lowest level.
3 : Below level.
4 : Normal level.
5 : Above level.
6 : Highest level.
7 : Time critical level (WARNING !!! use this level at your own risk)

Default : 6

The logicalCores, MaxPhysCore, SetAffinity and sleep are parameters to specify how the pool of thread
will be created and handled, allowing if necessary each people to tune according his configuration.
Expand Down
132 changes: 90 additions & 42 deletions nnedi3/ThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Threadpool
*
* Create and manage a threadpool.
* Copyright (C) 2017 JPSDR
* Copyright (C) 2016 JPSDR
*
* Threadpool is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -20,14 +20,17 @@
*
*/

// ThreadPoolInterface.cpp : Define the exported functions for using with the threadpool.
// ThreadPoolDLL.cpp : Define the exported functions for using with the threadpool.
// Kind of API.

#include "ThreadPool.h"

#define myfree(ptr) if (ptr!=NULL) { free(ptr); ptr=NULL;}
#define myCloseHandle(ptr) if (ptr!=NULL) { CloseHandle(ptr); ptr=NULL;}

static const int TabThreadLevel[8]={THREAD_PRIORITY_NORMAL,THREAD_PRIORITY_IDLE,THREAD_PRIORITY_LOWEST,
THREAD_PRIORITY_BELOW_NORMAL,THREAD_PRIORITY_NORMAL,THREAD_PRIORITY_ABOVE_NORMAL,
THREAD_PRIORITY_HIGHEST,THREAD_PRIORITY_TIME_CRITICAL};

// Helper function to count set bits in the processor mask.
static uint8_t CountSetBits(ULONG_PTR bitMask)
Expand Down Expand Up @@ -226,7 +229,6 @@ DWORD WINAPI ThreadPool::StaticThreadpool(LPVOID lpParam )
}



ThreadPool::ThreadPool(void): Status_Ok(true)
{
int16_t i;
Expand All @@ -243,12 +245,13 @@ ThreadPool::ThreadPool(void): Status_Ok(true)
thds[i]=NULL;
ThreadSleep[i]=true;
}
nPriority=NormalThreadLevel;
TotalThreadsRequested=0;
CurrentThreadsAllocated=0;
CurrentThreadsUsed=0;

Get_CPU_Info(CPU);
if ((CPU.NbLogicCPU==0) || (CPU.NbPhysCore==0)) Status_Ok=false;
Status_Ok=!(((CPU.NbLogicCPU==0) || (CPU.NbPhysCore==0)));
}


Expand All @@ -259,10 +262,13 @@ void ThreadPool::FreeThreadPool(void)

if (TotalThreadsRequested>0)
{
const int nPr=TabThreadLevel[AboveThreadLevel];

for (i=TotalThreadsRequested-1; i>=0; i--)
{
if (thds[i]!=NULL)
{
SetThreadPriority(thds[i],nPr);
if (ThreadSleep[i]) ResumeThread(thds[i]);
MT_Thread[i].f_process=255;
SetEvent(nextJob[i]);
Expand All @@ -283,6 +289,7 @@ void ThreadPool::FreeThreadPool(void)
}
}

nPriority=NormalThreadLevel;
TotalThreadsRequested=0;
CurrentThreadsAllocated=0;
CurrentThreadsUsed=0;
Expand Down Expand Up @@ -335,32 +342,24 @@ uint8_t ThreadPool::GetThreadNumber(uint8_t thread_number,bool logical)
{
const uint8_t nCPU=(logical) ? CPU.NbLogicCPU:CPU.NbPhysCore;

if (thread_number==0) return((nCPU>MAX_MT_THREADS) ? MAX_MT_THREADS:nCPU);
else return(thread_number);
return((thread_number==0) ? ((nCPU>MAX_MT_THREADS) ? MAX_MT_THREADS:nCPU):thread_number);
}


bool ThreadPool::AllocateThreads(uint8_t thread_number,uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,bool SetAffinity,bool sleep)
bool ThreadPool::AllocateThreads(uint8_t thread_number,uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,
bool SetAffinity,bool sleep,ThreadLevelName priority)
{
if ((!Status_Ok) || (thread_number==0)) return(false);

if (thread_number>CurrentThreadsAllocated)
{
TotalThreadsRequested=thread_number;
CreateThreadPool(offset_core,offset_ht,UseMaxPhysCore,SetAffinity,sleep);
CreateThreadPool(offset_core,offset_ht,UseMaxPhysCore,SetAffinity,sleep,priority);
}

return(Status_Ok);
}

bool ThreadPool::ChangeThreadsAffinity(uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,bool SetAffinity,bool sleep)
{
if ((!Status_Ok) || (CurrentThreadsAllocated==0)) return(false);

CreateThreadPool(offset_core,offset_ht,UseMaxPhysCore,SetAffinity,sleep);

return(Status_Ok);
}

bool ThreadPool::DeAllocateThreads(void)
{
Expand All @@ -372,34 +371,74 @@ bool ThreadPool::DeAllocateThreads(void)
}


bool ThreadPool::ChangeThreadsAffinity(uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,bool SetAffinity)
{
if ((!Status_Ok) || (CurrentThreadsAllocated==0)) return(false);

void ThreadPool::CreateThreadPool(uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,bool SetAffinity,bool sleep)
CreateThreadsMasks(CPU,ThreadMask,TotalThreadsRequested,offset_core,offset_ht,UseMaxPhysCore);

for(uint8_t i=0; i<CurrentThreadsAllocated; i++)
SetThreadAffinityMask(thds[i],SetAffinity?ThreadMask[i]:CPU.FullMask);

return(true);
}


bool ThreadPool::ChangeThreadsLevel(ThreadLevelName priority)
{
int16_t i;
if ((!Status_Ok) || (CurrentThreadsAllocated==0)) return(false);

for(i=0; i<CurrentThreadsAllocated; i++)
if (priority!=NoneThreadLevel)
{
SuspendThread(thds[i]);
ThreadSleep[i]=true;
const int nPr=TabThreadLevel[priority];

nPriority=priority;
for(int16_t i=0; i<(int16_t)CurrentThreadsUsed; i++)
SetThreadPriority(thds[i],nPr);
}

return(true);
}


void ThreadPool::CreateThreadPool(uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,
bool SetAffinity,bool sleep,ThreadLevelName priority)
{
int16_t i;

CreateThreadsMasks(CPU,ThreadMask,TotalThreadsRequested,offset_core,offset_ht,UseMaxPhysCore);

for(i=0; i<CurrentThreadsAllocated; i++)
if (sleep)
{
if (SetAffinity) SetThreadAffinityMask(thds[i],ThreadMask[i]);
else SetThreadAffinityMask(thds[i],CPU.FullMask);
if (!sleep)
for(i=0; i<(int16_t)CurrentThreadsAllocated; i++)
{
ResumeThread(thds[i]);
ThreadSleep[i]=false;
SetThreadAffinityMask(thds[i],SetAffinity?ThreadMask[i]:CPU.FullMask);
if (!ThreadSleep[i])
{
SuspendThread(thds[i]);
ThreadSleep[i]=true;
}
}
}
else
{
for(i=0; i<(int16_t)CurrentThreadsAllocated; i++)
{
SetThreadAffinityMask(thds[i],SetAffinity?ThreadMask[i]:CPU.FullMask);
if (ThreadSleep[i])
{
ResumeThread(thds[i]);
ThreadSleep[i]=false;
}
}
}

if (priority!=NoneThreadLevel) nPriority=priority;

if (CurrentThreadsAllocated==TotalThreadsRequested) return;

i=CurrentThreadsAllocated;
while ((i<TotalThreadsRequested) && Status_Ok)
i=(int16_t)CurrentThreadsAllocated;
while ((i<(int16_t)TotalThreadsRequested) && Status_Ok)
{
jobFinished[i]=CreateEvent(NULL,TRUE,TRUE,NULL);
nextJob[i]=CreateEvent(NULL,TRUE,FALSE,NULL);
Expand All @@ -414,15 +453,17 @@ void ThreadPool::CreateThreadPool(uint8_t offset_core,uint8_t offset_ht,bool Use
return;
}

i=CurrentThreadsAllocated;
while ((i<TotalThreadsRequested) && Status_Ok)
const int nPr=TabThreadLevel[IdleThreadLevel];

i=(int16_t)CurrentThreadsAllocated;
while ((i<(int16_t)TotalThreadsRequested) && Status_Ok)
{
thds[i]=CreateThread(NULL,0,(LPTHREAD_START_ROUTINE)StaticThreadpool,&MT_Thread[i],CREATE_SUSPENDED,&tids[i]);
Status_Ok=Status_Ok && (thds[i]!=NULL);
if (Status_Ok)
{
if (SetAffinity) SetThreadAffinityMask(thds[i],ThreadMask[i]);
else SetThreadAffinityMask(thds[i],CPU.FullMask);
SetThreadAffinityMask(thds[i],SetAffinity?ThreadMask[i]:CPU.FullMask);
SetThreadPriority(thds[i],nPr);
if (!sleep)
{
ResumeThread(thds[i]);
Expand All @@ -437,14 +478,16 @@ void ThreadPool::CreateThreadPool(uint8_t offset_core,uint8_t offset_ht,bool Use
}



bool ThreadPool::RequestThreadPool(uint8_t thread_number,Public_MT_Data_Thread *Data)
bool ThreadPool::RequestThreadPool(uint8_t thread_number,Public_MT_Data_Thread *Data,ThreadLevelName priority)
{
if ((!Status_Ok) || (thread_number>CurrentThreadsAllocated)) return(false);

const int nPr=TabThreadLevel[(priority!=NoneThreadLevel)?priority:nPriority];

for(uint8_t i=0; i<thread_number; i++)
{
MT_Thread[i].MTData=Data+i;
SetThreadPriority(thds[i],nPr);
if (ThreadSleep[i])
{
ResumeThread(thds[i]);
Expand All @@ -462,16 +505,22 @@ bool ThreadPool::ReleaseThreadPool(bool sleep)
{
if (!Status_Ok) return(false);

for(uint8_t i=0; i<CurrentThreadsUsed; i++)
if (CurrentThreadsUsed>0)
{
if (sleep)
const int nPr=TabThreadLevel[IdleThreadLevel];

for(uint8_t i=0; i<CurrentThreadsUsed; i++)
{
SuspendThread(thds[i]);
ThreadSleep[i]=true;
if (sleep)
{
SuspendThread(thds[i]);
ThreadSleep[i]=true;
}
SetThreadPriority(thds[i],nPr);
MT_Thread[i].MTData=NULL;
}
MT_Thread[i].MTData=NULL;
CurrentThreadsUsed=0;
}
CurrentThreadsUsed=0;

return(true);
}
Expand Down Expand Up @@ -503,4 +552,3 @@ bool ThreadPool::WaitThreadsEnd(void)

return(true);
}

16 changes: 10 additions & 6 deletions nnedi3/ThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Threadpool
*
* Create and manage a threadpool.
* Copyright (C) 2017 JPSDR
* Copyright (C) 2016 JPSDR
*
* Threadpool is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand All @@ -27,7 +27,7 @@

#include "ThreadPoolDef.h"

#define THREADPOOL_VERSION "ThreadPool 1.3.4"
#define THREADPOOL_VERSION "ThreadPool 1.4.0"

typedef struct _MT_Data_Thread
{
Expand Down Expand Up @@ -59,10 +59,12 @@ class ThreadPool
public :

uint8_t GetThreadNumber(uint8_t thread_number,bool logical);
bool AllocateThreads(uint8_t thread_number,uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,bool SetAffinity,bool sleep);
bool ChangeThreadsAffinity(uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,bool SetAffinity,bool sleep);
bool AllocateThreads(uint8_t thread_number,uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,
bool SetAffinity,bool sleep,ThreadLevelName priority);
bool DeAllocateThreads(void);
bool RequestThreadPool(uint8_t thread_number,Public_MT_Data_Thread *Data);
bool ChangeThreadsAffinity(uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,bool SetAffinity);
bool ChangeThreadsLevel(ThreadLevelName priority);
bool RequestThreadPool(uint8_t thread_number,Public_MT_Data_Thread *Data,ThreadLevelName priority);
bool ReleaseThreadPool(bool sleep);
bool StartThreads(void);
bool WaitThreadsEnd(void);
Expand All @@ -80,13 +82,15 @@ class ThreadPool
DWORD tids[MAX_MT_THREADS];
ULONG_PTR ThreadMask[MAX_MT_THREADS];
volatile bool ThreadSleep[MAX_MT_THREADS];
volatile ThreadLevelName nPriority;

volatile bool Status_Ok;
volatile uint8_t TotalThreadsRequested,CurrentThreadsAllocated,CurrentThreadsUsed;

void FreeThreadPool(void);
void DestroyThreadPool(void);
void CreateThreadPool(uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,bool SetAffinity,bool sleep);
void CreateThreadPool(uint8_t offset_core,uint8_t offset_ht,bool UseMaxPhysCore,bool SetAffinity,
bool sleep,ThreadLevelName priority);

private :

Expand Down
4 changes: 3 additions & 1 deletion nnedi3/ThreadPoolDef.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* Threadpool
*
* Create and manage a threadpool.
* Copyright (C) 2017 JPSDR
* Copyright (C) 2016 JPSDR
*
* Threadpool is free software; you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
Expand Down Expand Up @@ -31,6 +31,8 @@

typedef void (*ThreadPoolFunction)(void *ptr);

enum ThreadLevelName {NoneThreadLevel,IdleThreadLevel,LowestThreadLevel,BelowThreadLevel,
NormalThreadLevel,AboveThreadLevel,HighestThreadLevel,CriticalThreadLevel};

typedef struct _Public_MT_Data_Thread
{
Expand Down
Loading

0 comments on commit 29de861

Please sign in to comment.