diff --git a/cpp/daal/include/services/env_detect.h b/cpp/daal/include/services/env_detect.h index f561de5ae2c..d132c55794d 100644 --- a/cpp/daal/include/services/env_detect.h +++ b/cpp/daal/include/services/env_detect.h @@ -198,6 +198,10 @@ class DAAL_EXPORT Environment : public Base void initNumberOfThreads(); env _env; + // Pointer to the oneapi::tbb::task_scheduler_handle class object, global for oneDAL. + // The oneapi::tbb::task_scheduler_handle and the oneapi::tbb::finalize function + // allow user to wait for completion of worker threads. + void * _schedulerHandle; void * _globalControl; SharedPtr _executionContext; }; diff --git a/cpp/daal/src/externals/core_threading_win_dll.cpp b/cpp/daal/src/externals/core_threading_win_dll.cpp index bfd7ac01a32..37c4f7d0e2b 100644 --- a/cpp/daal/src/externals/core_threading_win_dll.cpp +++ b/cpp/daal/src/externals/core_threading_win_dll.cpp @@ -143,7 +143,9 @@ typedef void (*_daal_wait_task_group_t)(void * taskGroupPtr); typedef bool (*_daal_is_in_parallel_t)(); typedef void (*_daal_tbb_task_scheduler_free_t)(void *& globalControl); +typedef void (*_daal_tbb_task_scheduler_handle_free_t)(void *& schedulerHandle); typedef size_t (*_setNumberOfThreads_t)(const size_t, void **); +typedef size_t (*_setSchedulerHandle_t)(void **); typedef void * (*_daal_threader_env_t)(); typedef void (*_daal_parallel_sort_int32_t)(int *, int *); @@ -205,10 +207,12 @@ static _daal_del_task_group_t _daal_del_task_group_ptr = NULL; static _daal_run_task_group_t _daal_run_task_group_ptr = NULL; static _daal_wait_task_group_t _daal_wait_task_group_ptr = NULL; -static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL; -static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL; -static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL; -static _daal_threader_env_t _daal_threader_env_ptr = NULL; +static _daal_is_in_parallel_t _daal_is_in_parallel_ptr = NULL; +static _daal_tbb_task_scheduler_free_t _daal_tbb_task_scheduler_free_ptr = NULL; +static _daal_tbb_task_scheduler_handle_free_t _daal_tbb_task_scheduler_handle_free_ptr = NULL; +static _setNumberOfThreads_t _setNumberOfThreads_ptr = NULL; +static _setSchedulerHandle_t _setSchedulerHandle_ptr = NULL; +static _daal_threader_env_t _daal_threader_env_ptr = NULL; static _daal_parallel_sort_int32_t _daal_parallel_sort_int32_ptr = NULL; static _daal_parallel_sort_uint64_t _daal_parallel_sort_uint64_ptr = NULL; @@ -657,6 +661,16 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& init) return _daal_tbb_task_scheduler_free_ptr(init); } +DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& init) +{ + load_daal_thr_dll(); + if (_daal_tbb_task_scheduler_handle_free_ptr == NULL) + { + _daal_tbb_task_scheduler_handle_free_ptr = (_daal_tbb_task_scheduler_handle_free_t)load_daal_thr_func("_daal_tbb_task_scheduler_handle_free"); + } + return _daal_tbb_task_scheduler_handle_free_ptr(init); +} + DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** init) { load_daal_thr_dll(); @@ -667,6 +681,16 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** init) return _setNumberOfThreads_ptr(numThreads, init); } +DAAL_EXPORT size_t _setSchedulerHandle(void ** init) +{ + load_daal_thr_dll(); + if (_setSchedulerHandle_ptr == NULL) + { + _setSchedulerHandle_ptr = (_setSchedulerHandle_t)load_daal_thr_func("_setSchedulerHandle"); + } + return _setSchedulerHandle_ptr(init); +} + DAAL_EXPORT void * _daal_threader_env() { load_daal_thr_dll(); diff --git a/cpp/daal/src/services/env_detect.cpp b/cpp/daal/src/services/env_detect.cpp index 6698ede0d3a..286416ed571 100644 --- a/cpp/daal/src/services/env_detect.cpp +++ b/cpp/daal/src/services/env_detect.cpp @@ -125,7 +125,7 @@ DAAL_EXPORT void daal::services::Environment::setDynamicLibraryThreadingTypeOnWi initNumberOfThreads(); } -DAAL_EXPORT daal::services::Environment::Environment() : _globalControl {} +DAAL_EXPORT daal::services::Environment::Environment() : _schedulerHandle {}, _globalControl {} { _env.cpuid_init_flag = false; _env.cpuid = -1; @@ -137,7 +137,14 @@ DAAL_EXPORT daal::services::Environment::Environment(const Environment & e) : da DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() { if (isInit) return; - + // Initializes global oneapi::tbb::task_scheduler_handle object in oneDAL to prevent the unexpected + // destruction of the calling thread. + // When the oneapi::tbb::finalize function is called with an oneapi::tbb::task_scheduler_handle + // instance, it blocks the calling thread until the completion of all worker + // threads that were implicitly created by the library. +#if defined(TARGET_X86_64) + daal::setSchedulerHandle(&_schedulerHandle); +#endif /* if HT enabled - set _numThreads to physical cores num */ if (daal::internal::ServiceInst::serv_get_ht()) { @@ -156,7 +163,6 @@ DAAL_EXPORT void daal::services::Environment::initNumberOfThreads() DAAL_EXPORT daal::services::Environment::~Environment() { daal::services::daal_free_buffers(); - _daal_tbb_task_scheduler_free(_globalControl); } void daal::services::Environment::_cpu_detect(int enable) @@ -171,6 +177,9 @@ void daal::services::Environment::_cpu_detect(int enable) DAAL_EXPORT void daal::services::Environment::setNumberOfThreads(const size_t numThreads) { isInit = true; +#if defined(TARGET_X86_64) + daal::setSchedulerHandle(&_schedulerHandle); +#endif daal::setNumberOfThreads(numThreads, &_globalControl); } diff --git a/cpp/daal/src/threading/service_thread_pinner.cpp b/cpp/daal/src/threading/service_thread_pinner.cpp old mode 100755 new mode 100644 index 069a163c0a5..9cc55190a37 --- a/cpp/daal/src/threading/service_thread_pinner.cpp +++ b/cpp/daal/src/threading/service_thread_pinner.cpp @@ -236,6 +236,9 @@ class thread_pinner_impl_t : public tbb::task_scheduler_observer thread_pinner_impl_t::thread_pinner_impl_t(void (*read_topo)(int &, int &, int &, int **), void (*deleter)(void *)) : pinner_arena(nthreads = daal::threader_get_threads_number()), tbb::task_scheduler_observer(pinner_arena), topo_deleter(deleter) { + #if defined(TARGET_X86_64) + pinner_arena.initialize(); + #endif do_pinning = (nthreads > 0) ? true : false; is_pinning.set(0); diff --git a/cpp/daal/src/threading/threading.cpp b/cpp/daal/src/threading/threading.cpp index 7fa0127a5ab..f1bb8bd6162 100644 --- a/cpp/daal/src/threading/threading.cpp +++ b/cpp/daal/src/threading/threading.cpp @@ -75,6 +75,19 @@ DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl) #endif } +DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle) +{ +#if defined(TARGET_X86_64) + #if defined(__DO_TBB_LAYER__) + if (schedulerHandle) + { + delete reinterpret_cast(schedulerHandle); + schedulerHandle = nullptr; + } + #endif +#endif +} + DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalControl) { #if defined(__DO_TBB_LAYER__) @@ -92,6 +105,18 @@ DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalCo return 1; } +DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle) +{ +#if defined(TARGET_X86_64) + #if defined(__DO_TBB_LAYER__) + *schedulerHandle = reinterpret_cast(new tbb::task_scheduler_handle(tbb::attach {})); + // It is necessary for initializing tbb in cases where DAAL does not use it. + tbb::task_arena {}.initialize(); + #endif +#endif + return 0; +} + DAAL_EXPORT void _daal_threader_for(int n, int threads_request, const void * a, daal::functype func) { #if defined(__DO_TBB_LAYER__) diff --git a/cpp/daal/src/threading/threading.h b/cpp/daal/src/threading/threading.h index 4d00c789494..0b4a9881b97 100644 --- a/cpp/daal/src/threading/threading.h +++ b/cpp/daal/src/threading/threading.h @@ -102,7 +102,9 @@ extern "C" DAAL_EXPORT void _daal_wait_task_group(void * taskGroupPtr); DAAL_EXPORT void _daal_tbb_task_scheduler_free(void *& globalControl); + DAAL_EXPORT void _daal_tbb_task_scheduler_handle_free(void *& schedulerHandle); DAAL_EXPORT size_t _setNumberOfThreads(const size_t numThreads, void ** globalControl); + DAAL_EXPORT size_t _setSchedulerHandle(void ** schedulerHandle); DAAL_EXPORT void * _daal_threader_env(); @@ -183,6 +185,11 @@ inline size_t threader_get_threads_number() return threader_env()->getNumberOfThreads(); } +inline size_t setSchedulerHandle(void ** schedulerHandle) +{ + return _setSchedulerHandle(schedulerHandle); +} + inline size_t setNumberOfThreads(const size_t numThreads, void ** globalControl) { return _setNumberOfThreads(numThreads, globalControl);