Geant4-11
ThreadPool.hh
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class header file
21//
22// Class Description:
23//
24// This file creates a class for an efficient thread-pool that
25// accepts work in the form of tasks.
26//
27// ---------------------------------------------------------------
28// Author: Jonathan Madsen (Feb 13th 2018)
29// ---------------------------------------------------------------
30
31#pragma once
32
33#include "PTL/AutoLock.hh"
34#include "PTL/ThreadData.hh"
35#include "PTL/Threading.hh"
36#include "PTL/VTask.hh"
37#include "PTL/VUserTaskQueue.hh"
38
39#if defined(PTL_USE_TBB)
40# if !defined(TBB_SUPPRESS_DEPRECATED_MESSAGES)
41# define TBB_SUPPRESS_DEPRECATED_MESSAGES 1
42# endif
43# if !defined(TBB_PREVIEW_GLOBAL_CONTROL)
44# define TBB_PREVIEW_GLOBAL_CONTROL 1
45# endif
46# include <tbb/global_control.h>
47# include <tbb/tbb.h>
48#endif
49
50// C
51#include <cstdint>
52#include <cstdlib>
53#include <cstring>
54// C++
55#include <atomic>
56#include <deque>
57#include <iostream>
58#include <map>
59#include <memory>
60#include <queue>
61#include <set>
62#include <stack>
63#include <unordered_map>
64#include <vector>
65
66namespace PTL
67{
69{
70public:
71 template <typename KeyT, typename MappedT, typename HashT = KeyT>
72 using uomap = std::unordered_map<KeyT, MappedT, std::hash<HashT>>;
73
74 // pod-types
75 using size_type = size_t;
76 using task_count_type = std::shared_ptr<std::atomic_uintmax_t>;
77 using atomic_int_type = std::shared_ptr<std::atomic_uintmax_t>;
78 using pool_state_type = std::shared_ptr<std::atomic_short>;
79 using atomic_bool_type = std::shared_ptr<std::atomic_bool>;
80 // objects
82 using lock_t = std::shared_ptr<Mutex>;
83 using condition_t = std::shared_ptr<Condition>;
84 using task_pointer = std::shared_ptr<task_type>;
86 // containers
87 typedef std::deque<ThreadId> thread_list_t;
88 typedef std::vector<bool> bool_list_t;
89 typedef std::map<ThreadId, uintmax_t> thread_id_map_t;
90 typedef std::map<uintmax_t, ThreadId> thread_index_map_t;
91 using thread_vec_t = std::vector<Thread>;
92 using thread_data_t = std::vector<std::shared_ptr<ThreadData>>;
93 // functions
95 typedef std::function<intmax_t(intmax_t)> affinity_func_t;
96
97public:
98 // Constructor and Destructors
99 ThreadPool(const size_type& pool_size, VUserTaskQueue* task_queue = nullptr,
100 bool _use_affinity = GetEnv<bool>("PTL_CPU_AFFINITY", false),
101 affinity_func_t = [](intmax_t) {
102 static std::atomic<intmax_t> assigned;
103 intmax_t _assign = assigned++;
104 return _assign % Thread::hardware_concurrency();
105 });
106 // Virtual destructors are required by abstract classes
107 // so add it by default, just in case
108 virtual ~ThreadPool();
109 ThreadPool(const ThreadPool&) = delete;
110 ThreadPool(ThreadPool&&) = default;
111 ThreadPool& operator=(const ThreadPool&) = delete;
113
114public:
115 // Public functions
116 size_type initialize_threadpool(size_type); // start the threads
117 size_type destroy_threadpool(); // destroy the threads
119
120 template <typename FuncT>
121 void execute_on_all_threads(FuncT&& _func);
122
123 template <typename FuncT>
124 void execute_on_specific_threads(const std::set<std::thread::id>& _tid,
125 FuncT&& _func);
126
129
130 bool is_tbb_threadpool() const { return m_tbb_tp; }
131
132public:
133 // Public functions related to TBB
134 static bool using_tbb();
135 // enable using TBB if available
136 static void set_use_tbb(bool val);
137
138public:
139 // add tasks for threads to process
140 size_type add_task(task_pointer&& task, int bin = -1);
141 // size_type add_thread_task(ThreadId id, task_pointer&& task);
142 // add a generic container with iterator
143 template <typename ListT>
144 size_type add_tasks(ListT&);
145
147 Thread* get_thread(std::thread::id id) const;
148
149 // only relevant when compiled with PTL_USE_TBB
151
154 {
155 auto f = []() {};
156 m_init_func = f;
157 }
158
159public:
160 // get the pool state
161 const pool_state_type& state() const { return m_pool_state; }
162 // see how many main task threads there are
163 size_type size() const { return m_pool_size; }
164 // set the thread pool size
165 void resize(size_type _n);
166 // affinity assigns threads to cores, assignment at constructor
167 bool using_affinity() const { return m_use_affinity; }
168 bool is_alive() { return m_alive_flag->load(); }
169 void notify();
170 void notify_all();
171 void notify(size_type);
172 bool is_initialized() const;
174 {
175 return (m_thread_awake) ? m_thread_awake->load() : 0;
176 }
177
179 void set_affinity(intmax_t i, Thread&);
180
181 void set_verbose(int n) { m_verbose = n; }
182 int get_verbose() const { return m_verbose; }
183 bool is_main() const { return ThisThread::get_id() == m_main_tid; }
184
186
187public:
188 // read FORCE_NUM_THREADS environment variable
189 static const thread_id_map_t& get_thread_ids();
190 static uintmax_t get_this_thread_id();
191 static uintmax_t add_thread_id()
192 {
193 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
194 if(!lock.owns_lock())
195 lock.lock();
196 auto _tid = ThisThread::get_id();
197 if(f_thread_ids.find(_tid) == f_thread_ids.end())
198 {
199 auto _idx = f_thread_ids.size();
200 f_thread_ids[_tid] = _idx;
202 }
203 return f_thread_ids.at(_tid);
204 }
205
206protected:
207 void execute_thread(VUserTaskQueue*); // function thread sits in
208 int insert(task_pointer&&, int = -1);
210
211protected:
212 // called in THREAD INIT
213 static void start_thread(ThreadPool*, thread_data_t*, intmax_t = -1);
214
216 {
218 ++(*m_thread_active);
219 }
220
222 {
224 --(*m_thread_active);
225 }
226
227private:
228 // Private variables
229 // random
230 bool m_use_affinity = false;
231 bool m_tbb_tp = false;
233 int m_verbose = GetEnv<int>("PTL_VERBOSE", 0);
235 ThreadId m_main_tid = ThisThread::get_id();
236 atomic_bool_type m_alive_flag = std::make_shared<std::atomic_bool>(false);
237 pool_state_type m_pool_state = std::make_shared<std::atomic_short>(0);
238 atomic_int_type m_thread_awake = std::make_shared<std::atomic_uintmax_t>();
239 atomic_int_type m_thread_active = std::make_shared<std::atomic_uintmax_t>();
240
241 // locks
242 lock_t m_task_lock = std::make_shared<Mutex>();
243 // conditions
244 condition_t m_task_cond = std::make_shared<Condition>();
245
246 // containers
247 bool_list_t m_is_joined{}; // join list
248 bool_list_t m_is_stopped{}; // lets thread know to stop
249 thread_list_t m_main_threads{}; // storage for active threads
250 thread_list_t m_stop_threads{}; // storage for stopped threads
253
254 // task queue
258
259 // functions
262
263private:
264 // Private static variables
266 PTL_DLL static bool f_use_tbb;
267};
268
269//--------------------------------------------------------------------------------------//
270inline void
272{
273 // wake up one thread that is waiting for a task to be available
275 {
277 m_task_cond->notify_one();
278 }
279}
280//--------------------------------------------------------------------------------------//
281inline void
283{
284 // wake all threads
286 m_task_cond->notify_all();
287}
288//--------------------------------------------------------------------------------------//
289inline void
291{
292 if(ntasks == 0)
293 return;
294
295 // wake up as many threads that tasks just added
297 {
299 if(ntasks < this->size())
300 {
301 for(size_type i = 0; i < ntasks; ++i)
302 m_task_cond->notify_one();
303 }
304 else
305 {
306 m_task_cond->notify_all();
307 }
308 }
309}
310//--------------------------------------------------------------------------------------//
311// local function for getting the tbb task scheduler
314{
315 static thread_local tbb_global_control_t* _instance = nullptr;
316 return _instance;
317}
318//--------------------------------------------------------------------------------------//
319// task arena
320inline tbb_task_arena_t*
322{
323#if defined(PTL_USE_TBB)
324 // create a task arena
326 {
327 auto _sz = (tbb_global_control())
330 : size();
331 m_tbb_task_arena = new tbb_task_arena_t(::tbb::task_arena::attach{});
333 }
334#else
337#endif
338 return m_tbb_task_arena;
339}
340//--------------------------------------------------------------------------------------//
341inline void
343{
344 if(_n == m_pool_size)
345 return;
347 m_task_queue->resize(static_cast<intmax_t>(_n));
348}
349//--------------------------------------------------------------------------------------//
350inline int
352{
353 auto&& _func = [_task]() { (*_task)(); };
354
356 {
357 auto _arena = get_task_arena();
358 _arena->execute([this, _func]() { this->m_tbb_task_group->run(_func); });
359 }
360 else
361 {
362 _func();
363 }
364 // return the number of tasks added to task-list
365 return 0;
366}
367//--------------------------------------------------------------------------------------//
368inline int
370{
371 static thread_local ThreadData* _data = ThreadData::GetInstance();
372
373 // pass the task to the queue
374 auto ibin = get_valid_queue(m_task_queue)->InsertTask(std::move(task), _data, bin);
375 notify();
376 return ibin;
377}
378//--------------------------------------------------------------------------------------//
381{
382 // if not native (i.e. TBB) or we haven't built thread-pool, just execute
383 if(m_tbb_tp || !task->is_native_task() || !m_alive_flag->load())
384 return static_cast<size_type>(run_on_this(std::move(task)));
385
386 return static_cast<size_type>(insert(std::move(task), bin));
387}
388//--------------------------------------------------------------------------------------//
389template <typename ListT>
392{
393 if(!m_alive_flag) // if we haven't built thread-pool, just execute
394 {
395 for(auto& itr : c)
396 run(itr);
397 c.clear();
398 return 0;
399 }
400
401 // TODO: put a limit on how many tasks can be added at most
402 auto c_size = c.size();
403 for(auto& itr : c)
404 {
405 if(!itr->is_native_task())
406 --c_size;
407 else
408 {
409 //++(m_task_queue);
411 }
412 }
413 c.clear();
414
415 // notify sleeping threads
416 notify(c_size);
417
418 return c_size;
419}
420//--------------------------------------------------------------------------------------//
421template <typename FuncT>
422inline void
424{
426 {
427#if defined(PTL_USE_TBB)
428 // TBB lazily activates threads to process tasks and the main thread
429 // participates in processing the tasks so getting a specific
430 // function to execute only on the worker threads requires some trickery
431 //
432 std::set<std::thread::id> _first{};
433 Mutex _mutex{};
434 // init function which executes function and returns 1 only once
435 auto _init = [&]() {
436 int _once = 0;
437 _mutex.lock();
438 if(_first.find(std::this_thread::get_id()) == _first.end())
439 {
440 // we need to reset this thread-local static for multiple invocations
441 // of the same template instantiation
442 _once = 1;
443 _first.insert(std::this_thread::get_id());
444 }
445 _mutex.unlock();
446 if(_once != 0)
447 {
448 _func();
449 return 1;
450 }
451 return 0;
452 };
453 // this will collect the number of threads which have
454 // executed the _init function above
455 std::atomic<size_t> _total_init{ 0 };
456 // max parallelism by TBB
457 size_t _maxp = tbb_global_control()->active_value(
459 // create a task arean
460 auto _arena = get_task_arena();
461 // size of the thread-pool
462 size_t _sz = size();
463 // number of cores
464 size_t _ncore = Threading::GetNumberOfCores();
465 // maximum depth for recursion
466 size_t _dmax = std::max<size_t>(_ncore, 4);
467 // how many threads we need to initialize
468 size_t _num = std::min(_maxp, std::min(_sz, _ncore));
469 // this is the task passed to the task-group
470 std::function<void()> _init_task;
471 _init_task = [&]() {
473 static thread_local size_type _depth = 0;
474 int _ret = 0;
475 // don't let the main thread execute the function
476 if(!is_main())
477 {
478 // execute the function
479 _ret = _init();
480 // add the result
481 _total_init += _ret;
482 }
483 // if the function did not return anything, recursively execute
484 // two more tasks
485 ++_depth;
486 if(_ret == 0 && _depth < _dmax && _total_init.load() < _num)
487 {
488 tbb::task_group tg{};
489 tg.run([&]() { _init_task(); });
490 tg.run([&]() { _init_task(); });
491 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
492 tg.wait();
493 }
494 --_depth;
495 };
496
497 // TBB won't oversubscribe so we need to limit by ncores - 1
498 size_t nitr = 0;
499 auto _fname = __FUNCTION__;
500 auto _write_info = [&]() {
501 std::cout << "[" << _fname << "]> Total initalized: " << _total_init
502 << ", expected: " << _num << ", max-parallel: " << _maxp
503 << ", size: " << _sz << ", ncore: " << _ncore << std::endl;
504 };
505 while(_total_init < _num)
506 {
507 auto _n = 2 * _num;
508 while(--_n > 0)
509 {
510 _arena->execute(
511 [&]() { m_tbb_task_group->run([&]() { _init_task(); }); });
512 }
513 _arena->execute([&]() { m_tbb_task_group->wait(); });
514 // don't loop infinitely but use a strict condition
515 if(nitr++ > 2 * (_num + 1) && (_total_init - 1) == _num)
516 {
517 _write_info();
518 break;
519 }
520 // at this point we need to exit
521 if(nitr > 4 * (_ncore + 1))
522 {
523 _write_info();
524 break;
525 }
526 }
527 if(get_verbose() > 3)
528 _write_info();
529#endif
530 }
531 else if(get_queue())
532 {
533 get_queue()->ExecuteOnAllThreads(this, std::forward<FuncT>(_func));
534 }
535}
536
537//--------------------------------------------------------------------------------------//
538
539template <typename FuncT>
540inline void
541ThreadPool::execute_on_specific_threads(const std::set<std::thread::id>& _tids,
542 FuncT&& _func)
543{
545 {
546#if defined(PTL_USE_TBB)
547 // TBB lazily activates threads to process tasks and the main thread
548 // participates in processing the tasks so getting a specific
549 // function to execute only on the worker threads requires some trickery
550 //
551 std::set<std::thread::id> _first{};
552 Mutex _mutex{};
553 // init function which executes function and returns 1 only once
554 auto _exec = [&]() {
555 int _once = 0;
556 _mutex.lock();
557 if(_first.find(std::this_thread::get_id()) == _first.end())
558 {
559 // we need to reset this thread-local static for multiple invocations
560 // of the same template instantiation
561 _once = 1;
562 _first.insert(std::this_thread::get_id());
563 }
564 _mutex.unlock();
565 if(_once != 0)
566 {
567 _func();
568 return 1;
569 }
570 return 0;
571 };
572 // this will collect the number of threads which have
573 // executed the _exec function above
574 std::atomic<size_t> _total_exec{ 0 };
575 // number of cores
576 size_t _ncore = Threading::GetNumberOfCores();
577 // maximum depth for recursion
578 size_t _dmax = std::max<size_t>(_ncore, 4);
579 // how many threads we need to initialize
580 size_t _num = _tids.size();
581 // create a task arena
582 auto _arena = get_task_arena();
583 // this is the task passed to the task-group
584 std::function<void()> _exec_task;
585 _exec_task = [&]() {
587 static thread_local size_type _depth = 0;
588 int _ret = 0;
589 auto _this_tid = std::this_thread::get_id();
590 // don't let the main thread execute the function
591 if(_tids.count(_this_tid) > 0)
592 {
593 // execute the function
594 _ret = _exec();
595 // add the result
596 _total_exec += _ret;
597 }
598 // if the function did not return anything, recursively execute
599 // two more tasks
600 ++_depth;
601 if(_ret == 0 && _depth < _dmax && _total_exec.load() < _num)
602 {
603 tbb::task_group tg{};
604 tg.run([&]() { _exec_task(); });
605 tg.run([&]() { _exec_task(); });
606 ThisThread::sleep_for(std::chrono::milliseconds{ 1 });
607 tg.wait();
608 }
609 --_depth;
610 };
611
612 // TBB won't oversubscribe so we need to limit by ncores - 1
613 size_t nitr = 0;
614 auto _fname = __FUNCTION__;
615 auto _write_info = [&]() {
616 std::cout << "[" << _fname << "]> Total executed: " << _total_exec
617 << ", expected: " << _num << ", size: " << size() << std::endl;
618 };
619 while(_total_exec < _num)
620 {
621 auto _n = 2 * _num;
622 while(--_n > 0)
623 {
624 _arena->execute(
625 [&]() { m_tbb_task_group->run([&]() { _exec_task(); }); });
626 }
627 _arena->execute([&]() { m_tbb_task_group->wait(); });
628 // don't loop infinitely but use a strict condition
629 if(nitr++ > 2 * (_num + 1) && (_total_exec - 1) == _num)
630 {
631 _write_info();
632 break;
633 }
634 // at this point we need to exit
635 if(nitr > 8 * (_num + 1))
636 {
637 _write_info();
638 break;
639 }
640 }
641 if(get_verbose() > 3)
642 _write_info();
643#endif
644 }
645 else if(get_queue())
646 {
647 get_queue()->ExecuteOnSpecificThreads(_tids, this, std::forward<FuncT>(_func));
648 }
649}
650
651//======================================================================================//
652
653} // namespace PTL
G4double(* function)(G4double)
#define PTL_DLL
Definition: Types.hh:52
static ThreadData *& GetInstance()
Definition: ThreadData.cc:35
std::vector< std::shared_ptr< ThreadData > > thread_data_t
Definition: ThreadPool.hh:92
static PTL_DLL thread_id_map_t f_thread_ids
Definition: ThreadPool.hh:265
void set_verbose(int n)
Definition: ThreadPool.hh:181
std::shared_ptr< Condition > condition_t
Definition: ThreadPool.hh:83
static void start_thread(ThreadPool *, thread_data_t *, intmax_t=-1)
Definition: ThreadPool.cc:72
std::shared_ptr< std::atomic_uintmax_t > task_count_type
Definition: ThreadPool.hh:76
bool is_main() const
Definition: ThreadPool.hh:183
std::map< uintmax_t, ThreadId > thread_index_map_t
Definition: ThreadPool.hh:90
task_queue_t * m_task_queue
Definition: ThreadPool.hh:255
ThreadPool(ThreadPool &&)=default
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:122
int insert(task_pointer &&, int=-1)
Definition: ThreadPool.hh:369
void record_entry()
Definition: ThreadPool.hh:215
initialize_func_t m_init_func
Definition: ThreadPool.hh:260
const pool_state_type & state() const
Definition: ThreadPool.hh:161
size_type add_task(task_pointer &&task, int bin=-1)
Definition: ThreadPool.hh:380
size_type m_pool_size
Definition: ThreadPool.hh:234
ThreadPool(const ThreadPool &)=delete
int run_on_this(task_pointer &&)
Definition: ThreadPool.hh:351
thread_list_t m_stop_threads
Definition: ThreadPool.hh:250
thread_list_t m_main_threads
Definition: ThreadPool.hh:249
static bool using_tbb()
Definition: ThreadPool.cc:94
task_queue_t *& get_valid_queue(task_queue_t *&) const
Definition: ThreadPool.cc:536
std::vector< Thread > thread_vec_t
Definition: ThreadPool.hh:91
std::function< intmax_t(intmax_t)> affinity_func_t
Definition: ThreadPool.hh:95
size_type add_tasks(ListT &)
Definition: ThreadPool.hh:391
thread_data_t m_thread_data
Definition: ThreadPool.hh:252
void execute_on_specific_threads(const std::set< std::thread::id > &_tid, FuncT &&_func)
Definition: ThreadPool.hh:541
size_t size_type
Definition: ThreadPool.hh:75
void resize(size_type _n)
Definition: ThreadPool.hh:342
static void set_use_tbb(bool val)
Definition: ThreadPool.cc:102
virtual ~ThreadPool()
Definition: ThreadPool.cc:162
std::shared_ptr< Mutex > lock_t
Definition: ThreadPool.hh:82
void record_exit()
Definition: ThreadPool.hh:221
bool is_tbb_threadpool() const
Definition: ThreadPool.hh:130
std::shared_ptr< task_type > task_pointer
Definition: ThreadPool.hh:84
void set_affinity(affinity_func_t f)
Definition: ThreadPool.hh:178
size_type stop_thread()
Definition: ThreadPool.cc:495
ThreadPool & operator=(const ThreadPool &)=delete
std::shared_ptr< std::atomic_uintmax_t > atomic_int_type
Definition: ThreadPool.hh:77
int get_active_threads_count() const
Definition: ThreadPool.hh:173
Thread * get_thread(std::thread::id id) const
tbb_task_group_t * m_tbb_task_group
Definition: ThreadPool.hh:257
std::shared_ptr< std::atomic_short > pool_state_type
Definition: ThreadPool.hh:78
bool_list_t m_is_joined
Definition: ThreadPool.hh:247
static tbb_global_control_t *& tbb_global_control()
Definition: ThreadPool.hh:313
ThreadPool & operator=(ThreadPool &&)=default
task_queue_t * get_queue() const
Definition: ThreadPool.hh:127
std::function< void()> initialize_func_t
Definition: ThreadPool.hh:94
Thread * get_thread(size_type _n) const
lock_t m_task_lock
Definition: ThreadPool.hh:242
std::shared_ptr< std::atomic_bool > atomic_bool_type
Definition: ThreadPool.hh:79
int get_verbose() const
Definition: ThreadPool.hh:182
tbb_task_arena_t * get_task_arena()
Definition: ThreadPool.hh:321
condition_t m_task_cond
Definition: ThreadPool.hh:244
bool using_affinity() const
Definition: ThreadPool.hh:167
void execute_on_all_threads(FuncT &&_func)
Definition: ThreadPool.hh:423
pool_state_type m_pool_state
Definition: ThreadPool.hh:237
static const thread_id_map_t & get_thread_ids()
Definition: ThreadPool.cc:114
std::map< ThreadId, uintmax_t > thread_id_map_t
Definition: ThreadPool.hh:89
tbb_task_arena_t * m_tbb_task_arena
Definition: ThreadPool.hh:256
std::deque< ThreadId > thread_list_t
Definition: ThreadPool.hh:87
affinity_func_t m_affinity_func
Definition: ThreadPool.hh:261
size_type size() const
Definition: ThreadPool.hh:163
atomic_int_type m_thread_awake
Definition: ThreadPool.hh:238
ThreadId m_main_tid
Definition: ThreadPool.hh:235
std::vector< bool > bool_list_t
Definition: ThreadPool.hh:88
size_type destroy_threadpool()
Definition: ThreadPool.cc:364
size_type initialize_threadpool(size_type)
Definition: ThreadPool.cc:213
void execute_thread(VUserTaskQueue *)
Definition: ThreadPool.cc:546
atomic_bool_type m_alive_flag
Definition: ThreadPool.hh:236
bool m_delete_task_queue
Definition: ThreadPool.hh:232
bool is_initialized() const
Definition: ThreadPool.cc:183
static PTL_DLL bool f_use_tbb
Definition: ThreadPool.hh:266
atomic_int_type m_thread_active
Definition: ThreadPool.hh:239
thread_vec_t m_threads
Definition: ThreadPool.hh:251
std::unordered_map< KeyT, MappedT, std::hash< HashT > > uomap
Definition: ThreadPool.hh:72
void set_initialization(initialize_func_t f)
Definition: ThreadPool.hh:152
void reset_initialization()
Definition: ThreadPool.hh:153
ThreadPool(const size_type &pool_size, VUserTaskQueue *task_queue=nullptr, bool _use_affinity=GetEnv< bool >("PTL_CPU_AFFINITY", false), affinity_func_t=[](intmax_t) { static std::atomic< intmax_t > assigned;intmax_t _assign=assigned++;return _assign % Thread::hardware_concurrency();})
Definition: ThreadPool.cc:140
VUserTaskQueue task_queue_t
Definition: ThreadPool.hh:85
static uintmax_t add_thread_id()
Definition: ThreadPool.hh:191
bool_list_t m_is_stopped
Definition: ThreadPool.hh:248
VTask is the abstract class stored in thread_pool.
Definition: VTask.hh:54
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f)=0
virtual intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) PTL_NO_SANITIZE_THREAD=0
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f)=0
virtual void resize(intmax_t)=0
static size_t active_value(parameter param)
void initialize(int max_concurrency=automatic, unsigned reserved_for_masters=1)
void run(FuncT f)
Definition: ThreadData.hh:65
T min(const T t1, const T t2)
brief Return the smallest of the two arguments
unsigned GetNumberOfCores()
Definition: Threading.cc:60
void SetThreadId(int aNewValue)
Definition: Threading.cc:68
Definition: AutoLock.hh:254
std::mutex Mutex
Definition: Threading.hh:77
std::thread Thread
Definition: Threading.hh:128
tbb::task_arena tbb_task_arena_t
Definition: ThreadData.hh:121
Thread::id ThreadId
Definition: Threading.hh:139
Definition: run.py:1