39#if defined(PTL_USE_TBB)
40# if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES)
41# define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
43# if !defined(TBB_PREVIEW_GLOBAL_CONTROL)
44# define TBB_PREVIEW_GLOBAL_CONTROL 1
46# include <tbb/global_control.h>
63#include <unordered_map>
71 template <
typename KeyT,
typename MappedT,
typename HashT = KeyT>
72 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
82 using lock_t = std::shared_ptr<Mutex>;
100 bool _use_affinity = GetEnv<bool>(
"PTL_CPU_AFFINITY",
false),
102 static std::atomic<intmax_t> assigned;
103 intmax_t _assign = assigned++;
104 return _assign % Thread::hardware_concurrency();
120 template <
typename FuncT>
123 template <
typename FuncT>
143 template <
typename ListT>
193 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
194 if(!lock.owns_lock())
196 auto _tid = ThisThread::get_id();
218 ++(*m_thread_active);
224 --(*m_thread_active);
299 if(ntasks < this->
size())
323#if defined(PTL_USE_TBB)
353 auto&& _func = [_task]() { (*_task)(); };
389template <
typename ListT>
402 auto c_size = c.size();
405 if(!itr->is_native_task())
421template <
typename FuncT>
427#if defined(PTL_USE_TBB)
432 std::set<std::thread::id> _first{};
438 if(_first.find(std::this_thread::get_id()) == _first.end())
443 _first.insert(std::this_thread::get_id());
455 std::atomic<size_t> _total_init{ 0 };
466 size_t _dmax = std::max<size_t>(_ncore, 4);
473 static thread_local size_type _depth = 0;
486 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
489 tg.
run([&]() { _init_task(); });
490 tg.run([&]() { _init_task(); });
491 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
499 auto _fname = __FUNCTION__;
500 auto _write_info = [&]() {
501 std::cout <<
"[" << _fname <<
"]> Total initalized: " << _total_init
502 <<
", expected: " << _num <<
", max-parallel: " << _maxp
503 <<
", size: " << _sz <<
", ncore: " << _ncore << std::endl;
505 while(_total_init < _num)
515 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
521 if(nitr > 4 * (_ncore + 1))
539template <
typename FuncT>
546#if defined(PTL_USE_TBB)
551 std::set<std::thread::id> _first{};
557 if(_first.find(std::this_thread::get_id()) == _first.end())
562 _first.insert(std::this_thread::get_id());
574 std::atomic<size_t> _total_exec{ 0 };
578 size_t _dmax = std::max<size_t>(_ncore, 4);
580 size_t _num = _tids.size();
587 static thread_local size_type _depth = 0;
589 auto _this_tid = std::this_thread::get_id();
591 if(_tids.count(_this_tid) > 0)
601 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
604 tg.
run([&]() { _exec_task(); });
605 tg.run([&]() { _exec_task(); });
606 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
614 auto _fname = __FUNCTION__;
615 auto _write_info = [&]() {
616 std::cout <<
"[" << _fname <<
"]> Total executed: " << _total_exec
617 <<
", expected: " << _num <<
", size: " <<
size() << std::endl;
619 while(_total_exec < _num)
629 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
635 if(nitr > 8 * (_num + 1))
G4double(* function)(G4double)
static ThreadData *& GetInstance()
std::vector< std::shared_ptr< ThreadData > > thread_data_t
static PTL_DLL thread_id_map_t f_thread_ids
std::shared_ptr< Condition > condition_t
static void start_thread(ThreadPool *, thread_data_t *, intmax_t=-1)
std::shared_ptr< std::atomic_uintmax_t > task_count_type
std::map< uintmax_t, ThreadId > thread_index_map_t
task_queue_t * m_task_queue
ThreadPool(ThreadPool &&)=default
static uintmax_t get_this_thread_id()
int insert(task_pointer &&, int=-1)
initialize_func_t m_init_func
const pool_state_type & state() const
size_type add_task(task_pointer &&task, int bin=-1)
ThreadPool(const ThreadPool &)=delete
int run_on_this(task_pointer &&)
thread_list_t m_stop_threads
thread_list_t m_main_threads
task_queue_t *& get_valid_queue(task_queue_t *&) const
std::vector< Thread > thread_vec_t
std::function< intmax_t(intmax_t)> affinity_func_t
size_type add_tasks(ListT &)
thread_data_t m_thread_data
void execute_on_specific_threads(const std::set< std::thread::id > &_tid, FuncT &&_func)
void resize(size_type _n)
static void set_use_tbb(bool val)
std::shared_ptr< Mutex > lock_t
bool is_tbb_threadpool() const
std::shared_ptr< task_type > task_pointer
void set_affinity(affinity_func_t f)
ThreadPool & operator=(const ThreadPool &)=delete
std::shared_ptr< std::atomic_uintmax_t > atomic_int_type
int get_active_threads_count() const
Thread * get_thread(std::thread::id id) const
tbb_task_group_t * m_tbb_task_group
std::shared_ptr< std::atomic_short > pool_state_type
static tbb_global_control_t *& tbb_global_control()
ThreadPool & operator=(ThreadPool &&)=default
task_queue_t * get_queue() const
std::function< void()> initialize_func_t
Thread * get_thread(size_type _n) const
std::shared_ptr< std::atomic_bool > atomic_bool_type
tbb_task_arena_t * get_task_arena()
bool using_affinity() const
void execute_on_all_threads(FuncT &&_func)
pool_state_type m_pool_state
static const thread_id_map_t & get_thread_ids()
std::map< ThreadId, uintmax_t > thread_id_map_t
tbb_task_arena_t * m_tbb_task_arena
std::deque< ThreadId > thread_list_t
affinity_func_t m_affinity_func
atomic_int_type m_thread_awake
std::vector< bool > bool_list_t
size_type destroy_threadpool()
size_type initialize_threadpool(size_type)
void execute_thread(VUserTaskQueue *)
atomic_bool_type m_alive_flag
bool is_initialized() const
static PTL_DLL bool f_use_tbb
atomic_int_type m_thread_active
std::unordered_map< KeyT, MappedT, std::hash< HashT > > uomap
void set_initialization(initialize_func_t f)
void reset_initialization()
ThreadPool(const size_type &pool_size, VUserTaskQueue *task_queue=nullptr, bool _use_affinity=GetEnv< bool >("PTL_CPU_AFFINITY", false), affinity_func_t=[](intmax_t) { static std::atomic< intmax_t > assigned;intmax_t _assign=assigned++;return _assign % Thread::hardware_concurrency();})
VUserTaskQueue task_queue_t
static uintmax_t add_thread_id()
VTask is the abstract class stored in thread_pool.
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f)=0
virtual intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) PTL_NO_SANITIZE_THREAD=0
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f)=0
virtual void resize(intmax_t)=0
@ max_allowed_parallelism
static size_t active_value(parameter param)
void initialize(int max_concurrency=automatic, unsigned reserved_for_masters=1)
T min(const T t1, const T t2)
brief Return the smallest of the two arguments
unsigned GetNumberOfCores()
void SetThreadId(int aNewValue)
tbb::task_arena tbb_task_arena_t