Geant4-11
ThreadPool.cc
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class implementation
21//
22// Class Description:
23//
24// This file creates a class for an efficient thread-pool that
25// accepts work in the form of tasks.
26//
27// ---------------------------------------------------------------
28// Author: Jonathan Madsen (Feb 13th 2018)
29// ---------------------------------------------------------------
30
31#include "PTL/ThreadPool.hh"
32#include "PTL/Globals.hh"
33#include "PTL/ThreadData.hh"
34#include "PTL/UserTaskQueue.hh"
35#include "PTL/VUserTaskQueue.hh"
36
37#include <cstdlib>
38
39using namespace PTL;
40
41//======================================================================================//
42
43inline intmax_t
45{
46 return static_cast<intmax_t>(Thread::hardware_concurrency());
47}
48
49//======================================================================================//
50
51ThreadPool::thread_id_map_t ThreadPool::f_thread_ids;
52
53//======================================================================================//
54
55namespace
56{
59{
61}
62} // namespace
63
64//======================================================================================//
65
66bool ThreadPool::f_use_tbb = false;
67
68//======================================================================================//
69// static member function that calls the member function we want the thread to
70// run
71void
72ThreadPool::start_thread(ThreadPool* tp, thread_data_t* _data, intmax_t _idx)
73{
74 auto _thr_data = std::make_shared<ThreadData>(tp);
75 {
76 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
77 if(!lock.owns_lock())
78 lock.lock();
79 if(_idx < 0)
80 _idx = f_thread_ids.size();
81 f_thread_ids[std::this_thread::get_id()] = _idx;
83 _data->emplace_back(_thr_data);
84 }
85 thread_data() = _thr_data.get();
86 tp->record_entry();
87 tp->execute_thread(thread_data()->current_queue);
88 tp->record_exit();
89}
90
91//======================================================================================//
92// static member function that checks enabling of tbb library
93bool
95{
96 return f_use_tbb;
97}
98
99//======================================================================================//
100// static member function that initialized tbb library
101void
103{
104#if defined(PTL_USE_TBB)
105 f_use_tbb = enable;
106#else
107 ConsumeParameters<bool>(enable);
108#endif
109}
110
111//======================================================================================//
112
115{
116 return f_thread_ids;
117}
118
119//======================================================================================//
120
121uintmax_t
123{
124 auto _tid = ThisThread::get_id();
125 {
126 AutoLock lock(TypeMutex<ThreadPool>(), std::defer_lock);
127 if(!lock.owns_lock())
128 lock.lock();
129 if(f_thread_ids.find(_tid) == f_thread_ids.end())
130 {
131 auto _idx = f_thread_ids.size();
132 f_thread_ids[_tid] = _idx;
133 }
134 }
135 return f_thread_ids[_tid];
136}
137
138//======================================================================================//
139
140ThreadPool::ThreadPool(const size_type& pool_size, VUserTaskQueue* task_queue,
141 bool _use_affinity, affinity_func_t _affinity_func)
142: m_use_affinity(_use_affinity)
143, m_pool_state(std::make_shared<std::atomic_short>(thread_pool::state::NONINIT))
144, m_task_queue(task_queue)
145, m_affinity_func(std::move(_affinity_func))
146{
147 auto master_id = get_this_thread_id();
148 if(master_id != 0 && m_verbose > 1)
149 std::cerr << "ThreadPool created on non-master slave" << std::endl;
150
151 thread_data() = new ThreadData(this);
152
153 // initialize after get_this_thread_id so master is zero
154 this->initialize_threadpool(pool_size);
155
156 if(!m_task_queue)
158}
159
160//======================================================================================//
161
163{
164 if(m_alive_flag->load())
165 {
166 std::cerr << "Warning! ThreadPool was not properly destroyed! Call "
167 "destroy_threadpool() before deleting the ThreadPool object to "
168 "eliminate this message."
169 << std::endl;
171 m_task_lock->lock();
172 m_task_cond->notify_all();
173 m_task_lock->unlock();
174 for(auto& itr : m_threads)
175 itr.join();
176 m_threads.clear();
177 }
178}
179
180//======================================================================================//
181
182bool
184{
185 return !(m_pool_state->load() == thread_pool::state::NONINIT);
186}
187
188//======================================================================================//
189
190void
191ThreadPool::set_affinity(intmax_t i, Thread& _thread)
192{
193 try
194 {
195 NativeThread native_thread = _thread.native_handle();
196 intmax_t _pin = m_affinity_func(i);
197 if(m_verbose > 0)
198 {
199 std::cout << "Setting pin affinity for thread " << _thread.get_id() << " to "
200 << _pin << std::endl;
201 }
202 Threading::SetPinAffinity(_pin, native_thread);
203 } catch(std::runtime_error& e)
204 {
205 std::cout << "Error setting pin affinity" << std::endl;
206 std::cerr << e.what() << std::endl; // issue assigning affinity
207 }
208}
209
210//======================================================================================//
211
214{
215 //--------------------------------------------------------------------//
216 // return before initializing
217 if(proposed_size < 1)
218 return 0;
219
220 //--------------------------------------------------------------------//
221 // store that has been started
222 if(!m_alive_flag->load())
224
225#if defined(PTL_USE_TBB)
226 //--------------------------------------------------------------------//
227 // handle tbb task scheduler
228 if(f_use_tbb || m_tbb_tp)
229 {
230 m_tbb_tp = true;
231 m_pool_size = proposed_size;
232 tbb_global_control_t*& _global_control = tbb_global_control();
233 // delete if wrong size
234 if(m_pool_size != proposed_size)
235 {
236 delete _global_control;
237 _global_control = nullptr;
238 }
239
240 if(!_global_control)
241 {
242 _global_control = new tbb_global_control_t(
244 if(m_verbose > 0)
245 {
246 std::cout << "ThreadPool [TBB] initialized with " << m_pool_size
247 << " threads." << std::endl;
248 }
249 }
250
251 // create task group (used for async)
254 return m_pool_size;
255 }
256#endif
257
258 m_alive_flag->store(true);
259
260 //--------------------------------------------------------------------//
261 // if started, stop some thread if smaller or return if equal
263 {
264 if(m_pool_size > proposed_size)
265 {
266 while(stop_thread() > proposed_size)
267 ;
268 if(m_verbose > 0)
269 {
270 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
271 << std::endl;
272 }
273 if(!m_task_queue)
274 {
275 m_delete_task_queue = true;
277 }
278 return m_pool_size;
279 }
280 else if(m_pool_size == proposed_size) // NOLINT
281 {
282 if(m_verbose > 0)
283 {
284 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
285 << std::endl;
286 }
287 if(!m_task_queue)
288 {
289 m_delete_task_queue = true;
291 }
292 return m_pool_size;
293 }
294 }
295
296 //--------------------------------------------------------------------//
297 // reserve enough space to prevent realloc later
298 {
299 AutoLock _task_lock(*m_task_lock);
300 m_is_joined.reserve(proposed_size);
301 }
302
303 if(!m_task_queue)
304 m_task_queue = new UserTaskQueue(proposed_size);
305
306 auto this_tid = get_this_thread_id();
307 for(size_type i = m_pool_size; i < proposed_size; ++i)
308 {
309 // add the threads
310 try
311 {
312 // create thread
314 this_tid + i + 1 };
315 // only reaches here if successful creation of thread
316 ++m_pool_size;
317 // store thread
318 m_main_threads.push_back(thr.get_id());
319 // list of joined thread booleans
320 m_is_joined.push_back(false);
321 // set the affinity
323 set_affinity(i, thr);
324 // store
325 m_threads.emplace_back(std::move(thr));
326 } catch(std::runtime_error& e)
327 {
328 std::cerr << e.what() << std::endl; // issue creating thread
329 continue;
330 } catch(std::bad_alloc& e)
331 {
332 std::cerr << e.what() << std::endl;
333 continue;
334 }
335 }
336 //------------------------------------------------------------------------//
337
338 AutoLock _task_lock(*m_task_lock);
339
340 // thread pool size doesn't match with join vector
341 // this will screw up joining later
342 if(m_is_joined.size() != m_main_threads.size())
343 {
344 std::stringstream ss;
345 ss << "ThreadPool::initialize_threadpool - boolean is_joined vector "
346 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
347 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
348
349 throw std::runtime_error(ss.str());
350 }
351
352 if(m_verbose > 0)
353 {
354 std::cout << "ThreadPool initialized with " << m_pool_size << " threads."
355 << std::endl;
356 }
357
358 return m_main_threads.size();
359}
360
361//======================================================================================//
362
365{
366 // Note: this is not for synchronization, its for thread communication!
367 // destroy_threadpool() will only be called from the main thread, yet
368 // the modified m_pool_state may not show up to other threads until its
369 // modified in a lock!
370 //------------------------------------------------------------------------//
372
373 //--------------------------------------------------------------------//
374 // handle tbb task scheduler
375#if defined(PTL_USE_TBB)
377 {
378 auto _func = [&]() { m_tbb_task_group->wait(); };
381 else
382 _func();
383 delete m_tbb_task_group;
384 m_tbb_task_group = nullptr;
385 }
387 {
388 delete m_tbb_task_arena;
389 m_tbb_task_arena = nullptr;
390 }
392 {
393 tbb_global_control_t*& _global_control = tbb_global_control();
394 delete _global_control;
395 _global_control = nullptr;
396 m_tbb_tp = false;
397 std::cout << "ThreadPool [TBB] destroyed" << std::endl;
398 }
399#endif
400
401 if(!m_alive_flag->load())
402 return 0;
403
404 //------------------------------------------------------------------------//
405 // notify all threads we are shutting down
406 m_task_lock->lock();
407 m_task_cond->notify_all();
408 m_task_lock->unlock();
409 //------------------------------------------------------------------------//
410
411 if(m_is_joined.size() != m_main_threads.size())
412 {
413 std::stringstream ss;
414 ss << " ThreadPool::destroy_thread_pool - boolean is_joined vector "
415 << "is a different size than threads vector: " << m_is_joined.size() << " vs. "
416 << m_main_threads.size() << " (tid: " << std::this_thread::get_id() << ")";
417
418 throw std::runtime_error(ss.str());
419 }
420
421 for(size_type i = 0; i < m_is_joined.size(); i++)
422 {
423 //--------------------------------------------------------------------//
424 //
425 if(i < m_threads.size())
426 m_threads.at(i).join();
427
428 //--------------------------------------------------------------------//
429 // if its joined already, nothing else needs to be done
430 if(m_is_joined.at(i))
431 continue;
432
433 //--------------------------------------------------------------------//
434 // join
435 if(std::this_thread::get_id() == m_main_threads[i])
436 continue;
437
438 //--------------------------------------------------------------------//
439 // thread id and index
440 auto _tid = m_main_threads[i];
441
442 //--------------------------------------------------------------------//
443 // erase thread from thread ID list
444 if(f_thread_ids.find(_tid) != f_thread_ids.end())
445 f_thread_ids.erase(f_thread_ids.find(_tid));
446
447 //--------------------------------------------------------------------//
448 // it's joined
449 m_is_joined.at(i) = true;
450 }
451
452 m_thread_data.clear();
453 m_threads.clear();
454 m_main_threads.clear();
455 m_is_joined.clear();
456 m_alive_flag->store(false);
457
458 auto start = std::chrono::steady_clock::now();
459 auto elapsed = std::chrono::duration<double>{};
460 // wait maximum of 30 seconds for threads to exit
461 while(m_thread_active->load() > 0 && elapsed.count() < 30)
462 {
463 std::this_thread::sleep_for(std::chrono::milliseconds(50));
464 elapsed = std::chrono::steady_clock::now() - start;
465 }
466
467 auto _active = m_thread_active->load();
468
469 if(get_verbose() >= 0)
470 {
471 if(_active == 0)
472 {
473 std::cout << "ThreadPool destroyed" << std::endl;
474 }
475 else
476 {
477 std::cout << "ThreadPool destroyed but " << _active
478 << " threads might still be active (and cause a termination error)"
479 << std::endl;
480 }
481 }
482
484 {
485 delete m_task_queue;
486 m_task_queue = nullptr;
487 }
488
489 return 0;
490}
491
492//======================================================================================//
493
496{
497 if(!m_alive_flag->load() || m_pool_size == 0)
498 return 0;
499
500 //------------------------------------------------------------------------//
501 // notify all threads we are shutting down
502 m_task_lock->lock();
503 m_is_stopped.push_back(true);
504 m_task_cond->notify_one();
505 m_task_lock->unlock();
506 //------------------------------------------------------------------------//
507
508 // lock up the task queue
509 AutoLock _task_lock(*m_task_lock);
510
511 while(!m_stop_threads.empty())
512 {
513 auto tid = m_stop_threads.front();
514 // remove from stopped
515 m_stop_threads.pop_front();
516 // remove from main
517 for(auto itr = m_main_threads.begin(); itr != m_main_threads.end(); ++itr)
518 {
519 if(*itr == tid)
520 {
521 m_main_threads.erase(itr);
522 break;
523 }
524 }
525 // remove from join list
526 m_is_joined.pop_back();
527 }
528
530 return m_main_threads.size();
531}
532
533//======================================================================================//
534
537{
538 if(!_queue)
539 _queue = new UserTaskQueue{ static_cast<intmax_t>(m_pool_size) };
540 return _queue;
541}
542
543//======================================================================================//
544
545void
547{
548 // how long the thread waits on condition variable
549 // static int wait_time = GetEnv<int>("PTL_POOL_WAIT_TIME", 5);
550
551 ++(*m_thread_awake);
552
553 // initialization function
554 m_init_func();
555
556 ThreadId tid = ThisThread::get_id();
557 ThreadData* data = thread_data();
558 // auto thread_bin = _task_queue->GetThreadBin();
559 // auto workers = _task_queue->workers();
560
561 auto start = std::chrono::steady_clock::now();
562 auto elapsed = std::chrono::duration<double>{};
563 // check for updates for 60 seconds max
564 while(!_task_queue && elapsed.count() < 60)
565 {
566 elapsed = std::chrono::steady_clock::now() - start;
567 data->update();
568 _task_queue = data->current_queue;
569 }
570
571 if(!_task_queue)
572 {
573 --(*m_thread_awake);
574 throw std::runtime_error("No task queue was found after 60 seconds!");
575 }
576
577 assert(data->current_queue != nullptr);
578 assert(_task_queue == data->current_queue);
579
580 // essentially a dummy run
581 if(_task_queue)
582 {
583 data->within_task = true;
584 auto _task = _task_queue->GetTask();
585 if(_task)
586 {
587 (*_task)();
588 }
589 data->within_task = false;
590 }
591
592 // threads stay in this loop forever until thread-pool destroyed
593 while(true)
594 {
595 static thread_local auto p_task_lock = m_task_lock;
596
597 //--------------------------------------------------------------------//
598 // Try to pick a task
599 AutoLock _task_lock(*p_task_lock, std::defer_lock);
600 //--------------------------------------------------------------------//
601
602 auto leave_pool = [&]() {
603 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
604 auto _pool_state = _state();
605 if(_pool_state > 0)
606 {
607 // stop whole pool
608 if(_pool_state == thread_pool::state::STOPPED)
609 {
610 if(_task_lock.owns_lock())
611 _task_lock.unlock();
612 return true;
613 }
614 // single thread stoppage
615 else if(_pool_state == thread_pool::state::PARTIAL) // NOLINT
616 {
617 if(!_task_lock.owns_lock())
618 _task_lock.lock();
619 if(!m_is_stopped.empty() && m_is_stopped.back())
620 {
621 m_stop_threads.push_back(tid);
622 m_is_stopped.pop_back();
623 if(_task_lock.owns_lock())
624 _task_lock.unlock();
625 // exit entire function
626 return true;
627 }
628 if(_task_lock.owns_lock())
629 _task_lock.unlock();
630 }
631 }
632 return false;
633 };
634
635 // We need to put condition.wait() in a loop for two reasons:
636 // 1. There can be spurious wake-ups (due to signal/ENITR)
637 // 2. When mutex is released for waiting, another thread can be woken up
638 // from a signal/broadcast and that thread can mess up the condition.
639 // So when the current thread wakes up the condition may no longer be
640 // actually true!
641 while(_task_queue->empty())
642 {
643 auto _state = [&]() { return static_cast<int>(m_pool_state->load()); };
644 auto _size = [&]() { return _task_queue->true_size(); };
645 auto _empty = [&]() { return _task_queue->empty(); };
646 auto _wake = [&]() { return (!_empty() || _size() > 0 || _state() > 0); };
647
648 if(leave_pool())
649 return;
650
651 if(_task_queue->true_size() == 0)
652 {
653 if(m_thread_awake && m_thread_awake->load() > 0)
654 --(*m_thread_awake);
655
656 // lock before sleeping on condition
657 if(!_task_lock.owns_lock())
658 _task_lock.lock();
659
660 // Wait until there is a task in the queue
661 // Unlocks mutex while waiting, then locks it back when signaled
662 // use lambda to control waking
663 m_task_cond->wait(_task_lock, _wake);
664
665 if(_state() == thread_pool::state::STOPPED)
666 return;
667
668 // unlock if owned
669 if(_task_lock.owns_lock())
670 _task_lock.unlock();
671
672 // notify that is awake
674 ++(*m_thread_awake);
675 }
676 else
677 break;
678 }
679
680 // release the lock
681 if(_task_lock.owns_lock())
682 _task_lock.unlock();
683
684 //----------------------------------------------------------------//
685
686 // leave pool if conditions dictate it
687 if(leave_pool())
688 return;
689
690 // activate guard against recursive deadlock
691 data->within_task = true;
692 //----------------------------------------------------------------//
693
694 // execute the task(s)
695 while(!_task_queue->empty())
696 {
697 auto _task = _task_queue->GetTask();
698 if(_task)
699 {
700 (*_task)();
701 }
702 }
703 //----------------------------------------------------------------//
704
705 // disable guard against recursive deadlock
706 data->within_task = false;
707 //----------------------------------------------------------------//
708 }
709}
710
711//======================================================================================//
intmax_t ncores()
Definition: ThreadPool.cc:44
VUserTaskQueue * current_queue
Definition: ThreadData.hh:148
std::vector< std::shared_ptr< ThreadData > > thread_data_t
Definition: ThreadPool.hh:92
static PTL_DLL thread_id_map_t f_thread_ids
Definition: ThreadPool.hh:265
static void start_thread(ThreadPool *, thread_data_t *, intmax_t=-1)
Definition: ThreadPool.cc:72
task_queue_t * m_task_queue
Definition: ThreadPool.hh:255
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:122
initialize_func_t m_init_func
Definition: ThreadPool.hh:260
size_type m_pool_size
Definition: ThreadPool.hh:234
thread_list_t m_stop_threads
Definition: ThreadPool.hh:250
thread_list_t m_main_threads
Definition: ThreadPool.hh:249
static bool using_tbb()
Definition: ThreadPool.cc:94
task_queue_t *& get_valid_queue(task_queue_t *&) const
Definition: ThreadPool.cc:536
std::function< intmax_t(intmax_t)> affinity_func_t
Definition: ThreadPool.hh:95
thread_data_t m_thread_data
Definition: ThreadPool.hh:252
size_t size_type
Definition: ThreadPool.hh:75
static void set_use_tbb(bool val)
Definition: ThreadPool.cc:102
virtual ~ThreadPool()
Definition: ThreadPool.cc:162
void set_affinity(affinity_func_t f)
Definition: ThreadPool.hh:178
size_type stop_thread()
Definition: ThreadPool.cc:495
tbb_task_group_t * m_tbb_task_group
Definition: ThreadPool.hh:257
bool_list_t m_is_joined
Definition: ThreadPool.hh:247
static tbb_global_control_t *& tbb_global_control()
Definition: ThreadPool.hh:313
lock_t m_task_lock
Definition: ThreadPool.hh:242
int get_verbose() const
Definition: ThreadPool.hh:182
condition_t m_task_cond
Definition: ThreadPool.hh:244
pool_state_type m_pool_state
Definition: ThreadPool.hh:237
static const thread_id_map_t & get_thread_ids()
Definition: ThreadPool.cc:114
std::map< ThreadId, uintmax_t > thread_id_map_t
Definition: ThreadPool.hh:89
tbb_task_arena_t * m_tbb_task_arena
Definition: ThreadPool.hh:256
affinity_func_t m_affinity_func
Definition: ThreadPool.hh:261
atomic_int_type m_thread_awake
Definition: ThreadPool.hh:238
size_type destroy_threadpool()
Definition: ThreadPool.cc:364
size_type initialize_threadpool(size_type)
Definition: ThreadPool.cc:213
void execute_thread(VUserTaskQueue *)
Definition: ThreadPool.cc:546
atomic_bool_type m_alive_flag
Definition: ThreadPool.hh:236
bool m_delete_task_queue
Definition: ThreadPool.hh:232
bool is_initialized() const
Definition: ThreadPool.cc:183
static PTL_DLL bool f_use_tbb
Definition: ThreadPool.hh:266
atomic_int_type m_thread_active
Definition: ThreadPool.hh:239
thread_vec_t m_threads
Definition: ThreadPool.hh:251
ThreadPool(const size_type &pool_size, VUserTaskQueue *task_queue=nullptr, bool _use_affinity=GetEnv< bool >("PTL_CPU_AFFINITY", false), affinity_func_t=[](intmax_t) { static std::atomic< intmax_t > assigned;intmax_t _assign=assigned++;return _assign % Thread::hardware_concurrency();})
Definition: ThreadPool.cc:140
bool_list_t m_is_stopped
Definition: ThreadPool.hh:248
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1)=0
virtual size_type true_size() const
virtual bool empty() const =0
auto execute(FuncT &&_func) -> decltype(_func())
Definition: ThreadData.hh:111
std::map< G4String, G4AttDef > * GetInstance(const G4String &storeKey, G4bool &isNew)
void SetThreadId(int aNewValue)
Definition: Threading.cc:68
bool SetPinAffinity(int idx, NativeThread &at)
Definition: Threading.cc:82
static const short PARTIAL
Definition: Globals.hh:213
static const short NONINIT
Definition: Globals.hh:215
static const short STOPPED
Definition: Globals.hh:214
static const short STARTED
Definition: Globals.hh:212
Definition: AutoLock.hh:254
std::thread::native_handle_type NativeThread
Definition: Threading.hh:129
tbb::task_group tbb_task_group_t
Definition: ThreadData.hh:120
tbb::global_control tbb_global_control_t
Definition: ThreadData.hh:119
std::thread Thread
Definition: Threading.hh:128
Thread::id ThreadId
Definition: Threading.hh:139