46 return static_cast<intmax_t
>(Thread::hardware_concurrency());
66bool ThreadPool::f_use_tbb =
false;
74 auto _thr_data = std::make_shared<ThreadData>(
tp);
76 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
83 _data->emplace_back(_thr_data);
104#if defined(PTL_USE_TBB)
107 ConsumeParameters<bool>(enable);
124 auto _tid = ThisThread::get_id();
126 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
127 if(!lock.owns_lock())
142: m_use_affinity(_use_affinity)
143, m_pool_state(
std::make_shared<
std::atomic_short>(thread_pool::state::
NONINIT))
144, m_task_queue(task_queue)
145, m_affinity_func(
std::move(_affinity_func))
149 std::cerr <<
"ThreadPool created on non-master slave" << std::endl;
166 std::cerr <<
"Warning! ThreadPool was not properly destroyed! Call "
167 "destroy_threadpool() before deleting the ThreadPool object to "
168 "eliminate this message."
199 std::cout <<
"Setting pin affinity for thread " << _thread.get_id() <<
" to "
200 << _pin << std::endl;
203 }
catch(std::runtime_error& e)
205 std::cout <<
"Error setting pin affinity" << std::endl;
206 std::cerr << e.what() << std::endl;
217 if(proposed_size < 1)
225#if defined(PTL_USE_TBB)
236 delete _global_control;
237 _global_control =
nullptr;
246 std::cout <<
"ThreadPool [TBB] initialized with " <<
m_pool_size
247 <<
" threads." << std::endl;
270 std::cout <<
"ThreadPool initialized with " <<
m_pool_size <<
" threads."
284 std::cout <<
"ThreadPool initialized with " <<
m_pool_size <<
" threads."
326 }
catch(std::runtime_error& e)
328 std::cerr << e.what() << std::endl;
330 }
catch(std::bad_alloc& e)
332 std::cerr << e.what() << std::endl;
344 std::stringstream ss;
345 ss <<
"ThreadPool::initialize_threadpool - boolean is_joined vector "
346 <<
"is a different size than threads vector: " <<
m_is_joined.size() <<
" vs. "
347 <<
m_main_threads.size() <<
" (tid: " << std::this_thread::get_id() <<
")";
349 throw std::runtime_error(ss.str());
354 std::cout <<
"ThreadPool initialized with " <<
m_pool_size <<
" threads."
375#if defined(PTL_USE_TBB)
394 delete _global_control;
395 _global_control =
nullptr;
397 std::cout <<
"ThreadPool [TBB] destroyed" << std::endl;
413 std::stringstream ss;
414 ss <<
" ThreadPool::destroy_thread_pool - boolean is_joined vector "
415 <<
"is a different size than threads vector: " <<
m_is_joined.size() <<
" vs. "
416 <<
m_main_threads.size() <<
" (tid: " << std::this_thread::get_id() <<
")";
418 throw std::runtime_error(ss.str());
458 auto start = std::chrono::steady_clock::now();
459 auto elapsed = std::chrono::duration<double>{};
463 std::this_thread::sleep_for(std::chrono::milliseconds(50));
464 elapsed = std::chrono::steady_clock::now() - start;
473 std::cout <<
"ThreadPool destroyed" << std::endl;
477 std::cout <<
"ThreadPool destroyed but " << _active
478 <<
" threads might still be active (and cause a termination error)"
556 ThreadId tid = ThisThread::get_id();
561 auto start = std::chrono::steady_clock::now();
562 auto elapsed = std::chrono::duration<double>{};
564 while(!_task_queue && elapsed.count() < 60)
566 elapsed = std::chrono::steady_clock::now() - start;
574 throw std::runtime_error(
"No task queue was found after 60 seconds!");
584 auto _task = _task_queue->
GetTask();
595 static thread_local auto p_task_lock =
m_task_lock;
599 AutoLock _task_lock(*p_task_lock, std::defer_lock);
602 auto leave_pool = [&]() {
603 auto _state = [&]() {
return static_cast<int>(
m_pool_state->load()); };
604 auto _pool_state = _state();
610 if(_task_lock.owns_lock())
617 if(!_task_lock.owns_lock())
623 if(_task_lock.owns_lock())
628 if(_task_lock.owns_lock())
641 while(_task_queue->
empty())
643 auto _state = [&]() {
return static_cast<int>(
m_pool_state->load()); };
644 auto _size = [&]() {
return _task_queue->
true_size(); };
645 auto _empty = [&]() {
return _task_queue->
empty(); };
646 auto _wake = [&]() {
return (!_empty() || _size() > 0 || _state() > 0); };
657 if(!_task_lock.owns_lock())
669 if(_task_lock.owns_lock())
681 if(_task_lock.owns_lock())
695 while(!_task_queue->
empty())
697 auto _task = _task_queue->
GetTask();
VUserTaskQueue * current_queue
std::vector< std::shared_ptr< ThreadData > > thread_data_t
static PTL_DLL thread_id_map_t f_thread_ids
static void start_thread(ThreadPool *, thread_data_t *, intmax_t=-1)
task_queue_t * m_task_queue
static uintmax_t get_this_thread_id()
initialize_func_t m_init_func
thread_list_t m_stop_threads
thread_list_t m_main_threads
task_queue_t *& get_valid_queue(task_queue_t *&) const
std::function< intmax_t(intmax_t)> affinity_func_t
thread_data_t m_thread_data
static void set_use_tbb(bool val)
void set_affinity(affinity_func_t f)
tbb_task_group_t * m_tbb_task_group
static tbb_global_control_t *& tbb_global_control()
pool_state_type m_pool_state
static const thread_id_map_t & get_thread_ids()
std::map< ThreadId, uintmax_t > thread_id_map_t
tbb_task_arena_t * m_tbb_task_arena
affinity_func_t m_affinity_func
atomic_int_type m_thread_awake
size_type destroy_threadpool()
size_type initialize_threadpool(size_type)
void execute_thread(VUserTaskQueue *)
atomic_bool_type m_alive_flag
bool is_initialized() const
static PTL_DLL bool f_use_tbb
atomic_int_type m_thread_active
ThreadPool(const size_type &pool_size, VUserTaskQueue *task_queue=nullptr, bool _use_affinity=GetEnv< bool >("PTL_CPU_AFFINITY", false), affinity_func_t=[](intmax_t) { static std::atomic< intmax_t > assigned;intmax_t _assign=assigned++;return _assign % Thread::hardware_concurrency();})
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual size_type true_size() const
virtual bool empty() const =0
@ max_allowed_parallelism
auto execute(FuncT &&_func) -> decltype(_func())
std::map< G4String, G4AttDef > * GetInstance(const G4String &storeKey, G4bool &isNew)
void SetThreadId(int aNewValue)
bool SetPinAffinity(int idx, NativeThread &at)
static const short PARTIAL
static const short NONINIT
static const short STOPPED
static const short STARTED
std::thread::native_handle_type NativeThread
tbb::task_group tbb_task_group_t
tbb::global_control tbb_global_control_t
ThreadData *& thread_data()