Geant4-11
TaskGroup.icc
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19//
20// ---------------------------------------------------------------
21// Tasking class header file
22//
23// Class Description:
24//
25// This file creates the a class for handling a group of tasks that
26// can be independently joined
27//
28// ---------------------------------------------------------------
29// Author: Jonathan Madsen (Feb 13th 2018)
30// ---------------------------------------------------------------
31
32#include "PTL/Types.hh"
33
34namespace PTL
35{
36template <typename Tp, typename Arg, intmax_t MaxDepth>
37template <typename Func>
38TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Func&& _join, ThreadPool* _tp)
39: m_join{ std::forward<Func>(_join) }
40, m_pool{ _tp }
41{
42 internal_update();
43}
44
45template <typename Tp, typename Arg, intmax_t MaxDepth>
46template <typename Up>
47TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(ThreadPool* _tp,
48 enable_if_t<std::is_void<Up>::value, int>)
49: m_join{ []() {} }
50, m_pool{ _tp }
51{
52 internal_update();
53}
54
55// Destructor
56template <typename Tp, typename Arg, intmax_t MaxDepth>
57TaskGroup<Tp, Arg, MaxDepth>::~TaskGroup()
58{
59 {
60 // task will decrement counter and then acquire the lock to notify
61 // condition variable so acquiring lock here will prevent the
62 // task group from being destroyed before this is completed
63 AutoLock _lk{ m_task_lock, std::defer_lock };
64 if(!_lk.owns_lock())
65 _lk.lock();
66 }
67
68 if(m_tbb_task_group)
69 {
70 auto* _arena = m_pool->get_task_arena();
71 _arena->execute([this]() { this->m_tbb_task_group->wait(); });
72 }
73 delete m_tbb_task_group;
74 this->clear();
75}
76
77template <typename Tp, typename Arg, intmax_t MaxDepth>
78template <typename Up>
79std::shared_ptr<Up>
80TaskGroup<Tp, Arg, MaxDepth>::operator+=(std::shared_ptr<Up>&& _task)
81{
82 // thread-safe increment of tasks in task group
83 operator++();
84 // copy the shared pointer to abstract instance
85 m_task_list.push_back(_task);
86 // return the derived instance
87 return std::move(_task);
88}
89
90template <typename Tp, typename Arg, intmax_t MaxDepth>
91void
92TaskGroup<Tp, Arg, MaxDepth>::wait()
93{
94 auto _dtor = ScopeDestructor{ [&]() {
95 if(m_tbb_task_group)
96 {
97 auto* _arena = m_pool->get_task_arena();
98 _arena->execute([this]() { this->m_tbb_task_group->wait(); });
99 }
100 } };
101
102 ThreadData* data = ThreadData::GetInstance();
103 if(!data)
104 return;
105 // if no pool was initially present at creation
106 if(!m_pool)
107 {
108 // check for master MT run-manager
109 m_pool = internal::get_default_threadpool();
110
111 // if no thread pool created
112 if(!m_pool)
113 {
114 if(f_verbose > 0)
115 {
116 fprintf(stderr, "%s @ %i :: Warning! nullptr to thread-pool (%p)\n",
117 __FUNCTION__, __LINE__, static_cast<void*>(m_pool));
118 std::cerr << __FUNCTION__ << "@" << __LINE__ << " :: Warning! "
119 << "nullptr to thread pool!" << std::endl;
120 }
121 return;
122 }
123 }
124
125 ThreadPool* tpool = (m_pool) ? m_pool : data->thread_pool;
126 VUserTaskQueue* taskq = (tpool) ? tpool->get_queue() : data->current_queue;
127
128 bool _is_main = data->is_main;
129 bool _within_task = data->within_task;
130
131 auto is_active_state = [&]() {
132 return (tpool->state()->load(std::memory_order_relaxed) !=
133 thread_pool::state::STOPPED);
134 };
135
136 auto execute_this_threads_tasks = [&]() {
137 if(!taskq)
138 return;
139
140 // only want to process if within a task
141 if((!_is_main || tpool->size() < 2) && _within_task)
142 {
143 int bin = static_cast<int>(taskq->GetThreadBin());
144 // const auto nitr = (tpool) ? tpool->size() :
145 // Thread::hardware_concurrency();
146 while(this->pending() > 0)
147 {
148 if(!taskq->empty())
149 {
150 auto _task = taskq->GetTask(bin);
151 if(_task)
152 (*_task)();
153 }
154 }
155 }
156 };
157
158 // checks for validity
159 if(!is_native_task_group())
160 {
161 // for external threads
162 if(!_is_main || tpool->size() < 2)
163 return;
164 }
165 else if(f_verbose > 0)
166 {
167 if(!tpool || !taskq)
168 {
169 // something is wrong, didn't create thread-pool?
170 fprintf(stderr,
171 "%s @ %i :: Warning! nullptr to thread data (%p) or task-queue "
172 "(%p)\n",
173 __FUNCTION__, __LINE__, static_cast<void*>(tpool),
174 static_cast<void*>(taskq));
175 }
176 // return if thread pool isn't built
177 else if(is_native_task_group() && !tpool->is_alive())
178 {
179 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not alive!\n",
180 __FUNCTION__, __LINE__);
181 }
182 else if(!is_active_state())
183 {
184 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not active!\n",
185 __FUNCTION__, __LINE__);
186 }
187 }
188
189 intmax_t wake_size = 2;
190 AutoLock _lock(m_task_lock, std::defer_lock);
191
192 while(is_active_state())
193 {
194 execute_this_threads_tasks();
195
196 // while loop protects against spurious wake-ups
197 while(_is_main && pending() > 0 && is_active_state())
198 {
199 // auto _wake = [&]() { return (wake_size > pending() ||
200 // !is_active_state());
201 // };
202
203 // lock before sleeping on condition
204 if(!_lock.owns_lock())
205 _lock.lock();
206
207 // Wait until signaled that a task has been competed
208 // Unlock mutex while wait, then lock it back when signaled
209 // when true, this wakes the thread
210 if(pending() >= wake_size)
211 {
212 m_task_cond.wait(_lock);
213 }
214 else
215 {
216 m_task_cond.wait_for(_lock, std::chrono::microseconds(100));
217 }
218 // unlock
219 if(_lock.owns_lock())
220 _lock.unlock();
221 }
222
223 // if pending is not greater than zero, we are joined
224 if(pending() <= 0)
225 break;
226 }
227
228 if(_lock.owns_lock())
229 _lock.unlock();
230
231 intmax_t ntask = this->task_count().load();
232 if(ntask > 0)
233 {
234 std::stringstream ss;
235 ss << "\nWarning! Join operation issue! " << ntask << " tasks still "
236 << "are running!" << std::endl;
237 std::cerr << ss.str();
238 this->wait();
239 }
240}
241
242template <typename Tp, typename Arg, intmax_t MaxDepth>
243ScopeDestructor
244TaskGroup<Tp, Arg, MaxDepth>::get_scope_destructor()
245{
246 auto& _counter = m_tot_task_count;
247 auto& _task_cond = task_cond();
248 auto& _task_lock = task_lock();
249 return ScopeDestructor{ [&_task_cond, &_task_lock, &_counter]() {
250 auto _count = --(_counter);
251 if(_count < 1)
252 {
253 AutoLock _lk{ _task_lock };
254 _task_cond.notify_all();
255 }
256 } };
257}
258
259template <typename Tp, typename Arg, intmax_t MaxDepth>
260void
261TaskGroup<Tp, Arg, MaxDepth>::notify()
262{
263 AutoLock _lk{ m_task_lock };
264 m_task_cond.notify_one();
265}
266
267template <typename Tp, typename Arg, intmax_t MaxDepth>
268void
269TaskGroup<Tp, Arg, MaxDepth>::notify_all()
270{
271 AutoLock _lk{ m_task_lock };
272 m_task_cond.notify_all();
273}
274
275template <typename Tp, typename Arg, intmax_t MaxDepth>
276template <typename Func, typename... Args, typename Up>
277enable_if_t<std::is_void<Up>::value, void>
278TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args)
279{
280 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
281 ThreadData::GetInstance()->task_depth > MaxDepth)
282 {
283 local_exec<Tp>(std::move(func), std::move(args)...);
284 }
285 else
286 {
287 auto& _counter = m_tot_task_count;
288 auto& _task_cond = task_cond();
289 auto& _task_lock = task_lock();
290 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
291 auto _tdata = ThreadData::GetInstance();
292 if(_tdata)
293 ++(_tdata->task_depth);
294 func(args...);
295 auto _count = --(_counter);
296 if(_tdata)
297 --(_tdata->task_depth);
298 if(_count < 1)
299 {
300 AutoLock _lk{ _task_lock };
301 _task_cond.notify_all();
302 }
303 });
304
305 if(m_tbb_task_group)
306 {
307 auto* _arena = m_pool->get_task_arena();
308 auto* _tbb_task_group = m_tbb_task_group;
309 auto* _ptask = _task.get();
310 _arena->execute([_tbb_task_group, _ptask]() {
311 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
312 });
313 }
314 else
315 {
316 m_pool->add_task(std::move(_task));
317 }
318 }
319}
320template <typename Tp, typename Arg, intmax_t MaxDepth>
321template <typename Func, typename... Args, typename Up>
322enable_if_t<!std::is_void<Up>::value, void>
323TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args)
324{
325 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
326 ThreadData::GetInstance()->task_depth > MaxDepth)
327 {
328 local_exec<Tp>(std::move(func), std::move(args)...);
329 }
330 else
331 {
332 auto& _counter = m_tot_task_count;
333 auto& _task_cond = task_cond();
334 auto& _task_lock = task_lock();
335 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
336 auto _tdata = ThreadData::GetInstance();
337 if(_tdata)
338 ++(_tdata->task_depth);
339 auto&& _ret = func(args...);
340 auto _count = --(_counter);
341 if(_tdata)
342 --(_tdata->task_depth);
343 if(_count < 1)
344 {
345 AutoLock _lk{ _task_lock };
346 _task_cond.notify_all();
347 }
348 return std::forward<decltype(_ret)>(_ret);
349 });
350
351 if(m_tbb_task_group)
352 {
353 auto* _arena = m_pool->get_task_arena();
354 auto* _tbb_task_group = m_tbb_task_group;
355 auto* _ptask = _task.get();
356 _arena->execute([_tbb_task_group, _ptask]() {
357 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
358 });
359 }
360 else
361 {
362 m_pool->add_task(std::move(_task));
363 }
364 }
365}
366
367template <typename Tp, typename Arg, intmax_t MaxDepth>
368template <typename Up, typename Func, typename... Args>
369enable_if_t<std::is_void<Up>::value, void>
370TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args)
371{
372 auto _tdata = ThreadData::GetInstance();
373 if(_tdata)
374 ++(_tdata->task_depth);
375 promise_type _p{};
376 m_future_list.emplace_back(_p.get_future());
377 func(args...);
378 _p.set_value();
379 if(_tdata)
380 --(_tdata->task_depth);
381}
382
383template <typename Tp, typename Arg, intmax_t MaxDepth>
384template <typename Up, typename Func, typename... Args>
385enable_if_t<!std::is_void<Up>::value, void>
386TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args)
387{
388 auto _tdata = ThreadData::GetInstance();
389 if(_tdata)
390 ++(_tdata->task_depth);
391 promise_type _p{};
392 m_future_list.emplace_back(_p.get_future());
393 _p.set_value(func(args...));
394 if(_tdata)
395 --(_tdata->task_depth);
396}
397
398template <typename Tp, typename Arg, intmax_t MaxDepth>
399template <typename Up, enable_if_t<!std::is_void<Up>::value, int>>
400inline Up
401TaskGroup<Tp, Arg, MaxDepth>::join(Up accum)
402{
403 this->wait();
404 for(auto& itr : m_task_list)
405 {
406 using RetT = decay_t<decltype(itr->get())>;
407 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr->get())));
408 }
409 for(auto& itr : m_future_list)
410 {
411 using RetT = decay_t<decltype(itr.get())>;
412 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr.get())));
413 }
414 this->clear();
415 return accum;
416}
417
418template <typename Tp, typename Arg, intmax_t MaxDepth>
419template <typename Up, typename Rp,
420 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int>>
421inline void
422TaskGroup<Tp, Arg, MaxDepth>::join()
423{
424 this->wait();
425 for(auto& itr : m_task_list)
426 itr->get();
427 for(auto& itr : m_future_list)
428 itr.get();
429 m_join();
430 this->clear();
431}
432
433template <typename Tp, typename Arg, intmax_t MaxDepth>
434template <typename Up, typename Rp,
435 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int>>
436inline void
437TaskGroup<Tp, Arg, MaxDepth>::join()
438{
439 this->wait();
440 for(auto& itr : m_task_list)
441 {
442 using RetT = decay_t<decltype(itr->get())>;
443 m_join(std::forward<RetT>(itr->get()));
444 }
445 for(auto& itr : m_future_list)
446 {
447 using RetT = decay_t<decltype(itr.get())>;
448 m_join(std::forward<RetT>(itr.get()));
449 }
450 this->clear();
451}
452
453template <typename Tp, typename Arg, intmax_t MaxDepth>
454void
455TaskGroup<Tp, Arg, MaxDepth>::clear()
456{
457 m_future_list.clear();
458 m_task_list.clear();
459}
460
461template <typename Tp, typename Arg, intmax_t MaxDepth>
462void
463TaskGroup<Tp, Arg, MaxDepth>::internal_update()
464{
465 if(!m_pool)
466 m_pool = internal::get_default_threadpool();
467
468 if(!m_pool)
469 {
470 std::stringstream ss{};
471 ss << "[TaskGroup]> " << __FUNCTION__ << "@" << __LINE__
472 << " :: nullptr to thread pool";
473 throw std::runtime_error(ss.str());
474 }
475
476 if(m_pool->is_tbb_threadpool())
477 {
478 m_tbb_task_group = new tbb_task_group_t{};
479 }
480}
481
482template <typename Tp, typename Arg, intmax_t MaxDepth>
483int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = GetEnv<int>("PTL_VERBOSE", 0);
484
485} // namespace PTL