diff --git a/FWCore/Services/interface/thread_pool_TBBQueueBlocking.h b/FWCore/Services/interface/thread_pool_TBBQueueBlocking.h index 7926ed04a1363..55d7b1d4ce81d 100644 --- a/FWCore/Services/interface/thread_pool_TBBQueueBlocking.h +++ b/FWCore/Services/interface/thread_pool_TBBQueueBlocking.h @@ -1,4 +1,8 @@ -/** +/* +CMSSW CUDA management and thread pool Service +Author: Konstantinos Samaras-Tsakiris, kisamara@auth.gr +*//* + --> Thread Pool: Copyright (c) 2012 Jakob Progsch, Václav Zeman This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages @@ -16,7 +20,6 @@ freely, subject to the following restrictions: distribution. --> This is an altered version of the original code. -Editor: Konstantinos Samaras-Tsakiris, kisamara@auth.gr */ #ifndef Thread_Pool_Service_H @@ -81,34 +84,42 @@ class ThreadPoolService { // Launch kernel function with args // Configure execution policy before launch! template - inline std::future::type> + inline std::future cudaLaunchManaged(const cudaConfig::ExecutionPolicy& execPol, F&& f, Args&&... args) { - return getFuture([&](){ -#ifdef __NVCC__ - f<<>>(args...); -#endif - //std::cout<<"[In task]: Launched\n"; - cudaStreamSynchronize(cudaStreamPerThread); - //std::cout<<"[In task]: Synced\n"; - }); + using packaged_task_t = std::packaged_task; + + std::shared_ptr task(new packaged_task_t([&](){ + #ifdef __NVCC__ + f<<>>( + std::forward(args)...); + #endif + //std::cout<<"[In task]: Launched\n"; + cudaStreamSynchronize(cudaStreamPerThread); + //std::cout<<"[In task]: Synced\n"; + })); + std::future resultFut = task->get_future(); + tasks_.emplace([task](){ (*task)(); }); + return resultFut; } - // Overload: differentiate between managed-nonmanaged args - template class NM, - typename... MArgs, template class M> - typename std::enable_if,utils::NonManagedArgs>::value - && std::is_same,utils::ManagedArgs>::value, - std::future>>::type - cudaLaunchManaged(const cudaConfig::ExecutionPolicy& execPol, - F&& f, NM&& nonManaged, M&& managed) - ;/*{ - std::cout<<"Separate managed-unmanaged args!\n"; - //return cudaLaunchManaged(std::forward(f), nonManaged.forward(), managed.forward()); - return unpackArgsTuple(typename GenSeq::type(), std::forward(f), - merge(nonManaged,managed)); - }*/ + + /*template + ??? attachManagedMemory(T&& arg) + { + IFcudaPointer(std::forward(first)); + *//*struct CudaPtrArg{ + void operate(){ + + } + }; + struct NonCudaPtrArg{ + void operate(){} + }; + std::conditional, CudaPtrArg, NonCudaPtrArg>:: + type::operate(std::forward(first));*/ + //} + template static cudaConfig::ExecutionPolicy configureLaunch(int totalThreads, F&& f){ cudaConfig::ExecutionPolicy execPol; @@ -126,11 +137,18 @@ class ThreadPoolService { //!< @brief Joins all threads void stopWorkers(); private: - /*template class NM, - typename... MArgs, template class M> - void unpackArgsTuple(Seq, F&& f, NM&& nonMan, M&& man){ - - }*/ + template::value>::type> + auto preprocessManagedMem(T&& cudaPtr) -> decltype(cudaPtr.p){ + std::cout<<"[memAttach]: Managed arg!\n"; + cudaPtr.attachStream(); + return cudaPtr.p; + } + template::value>::type> + auto preprocessManagedMem(T&& valueArg) -> decltype(valueArg){ + //Do nothing + std::cout<<"[memAttach]: value arg\n"; + return valueArg; + } // need to keep track of threads so we can join them std::vector< std::thread > workers_; // the task concurrent queue diff --git a/FWCore/Services/interface/utils/cuda_pointer.h b/FWCore/Services/interface/utils/cuda_pointer.h index 23f55cd7062a3..87305b78c2a03 100644 --- a/FWCore/Services/interface/utils/cuda_pointer.h +++ b/FWCore/Services/interface/utils/cuda_pointer.h @@ -19,18 +19,28 @@ // std::unique_ptr p; // }; +class cudaPtrBase {}; + template -class cudaPointer{ +class cudaPointer: cudaPtrBase{ public: //flag: cudaMemAttachGlobal | cudaMemAttachHost - cudaPointer(int elementN, unsigned flag=cudaMemAttachGlobal): p(new T){ + cudaPointer(int elementN, unsigned flag=cudaMemAttachGlobal): p(new T), attachment(flag){ cudaMallocManaged(&p, elementN*sizeof(T), flag); } //p must retain ownership until here! ~cudaPointer(){ cudaFree(p); } + //Only call default if on a new thread + void attachStream(cudaStream_t stream= cudaStreamPerThread){ + attachment= cudaMemAttachSingle; + cudaStreamAttachMemAsync(stream, p, 0, attachment); + } + operator T*(){ return p; } //public! T* p; +private: + unsigned attachment; }; diff --git a/FWCore/Services/test/test_threadPool_service.cppunit.cu b/FWCore/Services/test/test_threadPool_service.cppunit.cu index cc1676823ff30..b52b2a62f9cf8 100644 --- a/FWCore/Services/test/test_threadPool_service.cppunit.cu +++ b/FWCore/Services/test/test_threadPool_service.cppunit.cu @@ -241,8 +241,7 @@ void TestThreadPoolService::CUDAAutolaunchCUDAPTRTest() cout<<"Launching auto...\n"; // Auto launch config cudaConfig::ExecutionPolicy execPol((*poolPtr)->configureLaunch(n, longKernel)); - (*poolPtr)->cudaLaunchManaged(execPol, longKernel, (int)n,(int)times, - const_cast(in.p),out.p).get(); + (*poolPtr)->cudaLaunchManaged(execPol, longKernel, (int)n,(int)times, in,out).get(); for(int i=0; iTOLERANCE || times*in.p[i]-out.p[i]<-TOLERANCE){ cout<<"ERROR: i="<cudaLaunchManaged(execPol, longKernel, (int)n,(int)times, - const_cast(in.p),out.p).get(); + (*poolPtr)->cudaLaunchManaged(execPol, longKernel, (int)n,(int)times, in,out).get(); for(int i=0; iTOLERANCE || times*in.p[i]-out.p[i]<-TOLERANCE){ cout<<"ERROR: i="<