3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
20// ---------------------------------------------------------------
21// Tasking class header file
25// This file creates the a class for handling a group of tasks that
26// can be independently joined
28// ---------------------------------------------------------------
29// Author: Jonathan Madsen (Feb 13th 2018)
30// ---------------------------------------------------------------
32#include "PTL/Types.hh"
36template <typename Tp, typename Arg, intmax_t MaxDepth>
37template <typename Func>
38TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(Func&& _join, ThreadPool* _tp)
39: m_join{ std::forward<Func>(_join) }
45template <typename Tp, typename Arg, intmax_t MaxDepth>
47TaskGroup<Tp, Arg, MaxDepth>::TaskGroup(ThreadPool* _tp,
48 enable_if_t<std::is_void<Up>::value, int>)
56template <typename Tp, typename Arg, intmax_t MaxDepth>
57TaskGroup<Tp, Arg, MaxDepth>::~TaskGroup()
60 // task will decrement counter and then acquire the lock to notify
61 // condition variable so acquiring lock here will prevent the
62 // task group from being destroyed before this is completed
63 AutoLock _lk{ m_task_lock, std::defer_lock };
70 auto* _arena = m_pool->get_task_arena();
71 _arena->execute([this]() { this->m_tbb_task_group->wait(); });
73 delete m_tbb_task_group;
77template <typename Tp, typename Arg, intmax_t MaxDepth>
80TaskGroup<Tp, Arg, MaxDepth>::operator+=(std::shared_ptr<Up>&& _task)
82 // thread-safe increment of tasks in task group
84 // copy the shared pointer to abstract instance
85 m_task_list.push_back(_task);
86 // return the derived instance
87 return std::move(_task);
90template <typename Tp, typename Arg, intmax_t MaxDepth>
92TaskGroup<Tp, Arg, MaxDepth>::wait()
94 auto _dtor = ScopeDestructor{ [&]() {
97 auto* _arena = m_pool->get_task_arena();
98 _arena->execute([this]() { this->m_tbb_task_group->wait(); });
102 ThreadData* data = ThreadData::GetInstance();
105 // if no pool was initially present at creation
108 // check for master MT run-manager
109 m_pool = internal::get_default_threadpool();
111 // if no thread pool created
116 fprintf(stderr, "%s @ %i :: Warning! nullptr to thread-pool (%p)\n",
117 __FUNCTION__, __LINE__, static_cast<void*>(m_pool));
118 std::cerr << __FUNCTION__ << "@" << __LINE__ << " :: Warning! "
119 << "nullptr to thread pool!" << std::endl;
125 ThreadPool* tpool = (m_pool) ? m_pool : data->thread_pool;
126 VUserTaskQueue* taskq = (tpool) ? tpool->get_queue() : data->current_queue;
128 bool _is_main = data->is_main;
129 bool _within_task = data->within_task;
131 auto is_active_state = [&]() {
132 return (tpool->state()->load(std::memory_order_relaxed) !=
133 thread_pool::state::STOPPED);
136 auto execute_this_threads_tasks = [&]() {
140 // only want to process if within a task
141 if((!_is_main || tpool->size() < 2) && _within_task)
143 int bin = static_cast<int>(taskq->GetThreadBin());
144 // const auto nitr = (tpool) ? tpool->size() :
145 // Thread::hardware_concurrency();
146 while(this->pending() > 0)
150 auto _task = taskq->GetTask(bin);
158 // checks for validity
159 if(!is_native_task_group())
161 // for external threads
162 if(!_is_main || tpool->size() < 2)
165 else if(f_verbose > 0)
169 // something is wrong, didn't create thread-pool?
171 "%s @ %i :: Warning! nullptr to thread data (%p) or task-queue "
173 __FUNCTION__, __LINE__, static_cast<void*>(tpool),
174 static_cast<void*>(taskq));
176 // return if thread pool isn't built
177 else if(is_native_task_group() && !tpool->is_alive())
179 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not alive!\n",
180 __FUNCTION__, __LINE__);
182 else if(!is_active_state())
184 fprintf(stderr, "%s @ %i :: Warning! thread-pool is not active!\n",
185 __FUNCTION__, __LINE__);
189 intmax_t wake_size = 2;
190 AutoLock _lock(m_task_lock, std::defer_lock);
192 while(is_active_state())
194 execute_this_threads_tasks();
196 // while loop protects against spurious wake-ups
197 while(_is_main && pending() > 0 && is_active_state())
199 // auto _wake = [&]() { return (wake_size > pending() ||
200 // !is_active_state());
203 // lock before sleeping on condition
204 if(!_lock.owns_lock())
207 // Wait until signaled that a task has been competed
208 // Unlock mutex while wait, then lock it back when signaled
209 // when true, this wakes the thread
210 if(pending() >= wake_size)
212 m_task_cond.wait(_lock);
216 m_task_cond.wait_for(_lock, std::chrono::microseconds(100));
219 if(_lock.owns_lock())
223 // if pending is not greater than zero, we are joined
228 if(_lock.owns_lock())
231 intmax_t ntask = this->task_count().load();
234 std::stringstream ss;
235 ss << "\nWarning! Join operation issue! " << ntask << " tasks still "
236 << "are running!" << std::endl;
237 std::cerr << ss.str();
242template <typename Tp, typename Arg, intmax_t MaxDepth>
244TaskGroup<Tp, Arg, MaxDepth>::get_scope_destructor()
246 auto& _counter = m_tot_task_count;
247 auto& _task_cond = task_cond();
248 auto& _task_lock = task_lock();
249 return ScopeDestructor{ [&_task_cond, &_task_lock, &_counter]() {
250 auto _count = --(_counter);
253 AutoLock _lk{ _task_lock };
254 _task_cond.notify_all();
259template <typename Tp, typename Arg, intmax_t MaxDepth>
261TaskGroup<Tp, Arg, MaxDepth>::notify()
263 AutoLock _lk{ m_task_lock };
264 m_task_cond.notify_one();
267template <typename Tp, typename Arg, intmax_t MaxDepth>
269TaskGroup<Tp, Arg, MaxDepth>::notify_all()
271 AutoLock _lk{ m_task_lock };
272 m_task_cond.notify_all();
275template <typename Tp, typename Arg, intmax_t MaxDepth>
276template <typename Func, typename... Args, typename Up>
277enable_if_t<std::is_void<Up>::value, void>
278TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args)
280 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
281 ThreadData::GetInstance()->task_depth > MaxDepth)
283 local_exec<Tp>(std::move(func), std::move(args)...);
287 auto& _counter = m_tot_task_count;
288 auto& _task_cond = task_cond();
289 auto& _task_lock = task_lock();
290 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
291 auto _tdata = ThreadData::GetInstance();
293 ++(_tdata->task_depth);
295 auto _count = --(_counter);
297 --(_tdata->task_depth);
300 AutoLock _lk{ _task_lock };
301 _task_cond.notify_all();
307 auto* _arena = m_pool->get_task_arena();
308 auto* _tbb_task_group = m_tbb_task_group;
309 auto* _ptask = _task.get();
310 _arena->execute([_tbb_task_group, _ptask]() {
311 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
316 m_pool->add_task(std::move(_task));
320template <typename Tp, typename Arg, intmax_t MaxDepth>
321template <typename Func, typename... Args, typename Up>
322enable_if_t<!std::is_void<Up>::value, void>
323TaskGroup<Tp, Arg, MaxDepth>::exec(Func func, Args... args)
325 if(MaxDepth > 0 && !m_tbb_task_group && ThreadData::GetInstance() &&
326 ThreadData::GetInstance()->task_depth > MaxDepth)
328 local_exec<Tp>(std::move(func), std::move(args)...);
332 auto& _counter = m_tot_task_count;
333 auto& _task_cond = task_cond();
334 auto& _task_lock = task_lock();
335 auto _task = wrap([&_task_cond, &_task_lock, &_counter, func, args...]() {
336 auto _tdata = ThreadData::GetInstance();
338 ++(_tdata->task_depth);
339 auto&& _ret = func(args...);
340 auto _count = --(_counter);
342 --(_tdata->task_depth);
345 AutoLock _lk{ _task_lock };
346 _task_cond.notify_all();
348 return std::forward<decltype(_ret)>(_ret);
353 auto* _arena = m_pool->get_task_arena();
354 auto* _tbb_task_group = m_tbb_task_group;
355 auto* _ptask = _task.get();
356 _arena->execute([_tbb_task_group, _ptask]() {
357 _tbb_task_group->run([_ptask]() { (*_ptask)(); });
362 m_pool->add_task(std::move(_task));
367template <typename Tp, typename Arg, intmax_t MaxDepth>
368template <typename Up, typename Func, typename... Args>
369enable_if_t<std::is_void<Up>::value, void>
370TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args)
372 auto _tdata = ThreadData::GetInstance();
374 ++(_tdata->task_depth);
376 m_future_list.emplace_back(_p.get_future());
380 --(_tdata->task_depth);
383template <typename Tp, typename Arg, intmax_t MaxDepth>
384template <typename Up, typename Func, typename... Args>
385enable_if_t<!std::is_void<Up>::value, void>
386TaskGroup<Tp, Arg, MaxDepth>::local_exec(Func func, Args... args)
388 auto _tdata = ThreadData::GetInstance();
390 ++(_tdata->task_depth);
392 m_future_list.emplace_back(_p.get_future());
393 _p.set_value(func(args...));
395 --(_tdata->task_depth);
398template <typename Tp, typename Arg, intmax_t MaxDepth>
399template <typename Up, enable_if_t<!std::is_void<Up>::value, int>>
401TaskGroup<Tp, Arg, MaxDepth>::join(Up accum)
404 for(auto& itr : m_task_list)
406 using RetT = decay_t<decltype(itr->get())>;
407 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr->get())));
409 for(auto& itr : m_future_list)
411 using RetT = decay_t<decltype(itr.get())>;
412 accum = std::move(m_join(std::ref(accum), std::forward<RetT>(itr.get())));
418template <typename Tp, typename Arg, intmax_t MaxDepth>
419template <typename Up, typename Rp,
420 enable_if_t<std::is_void<Up>::value && std::is_void<Rp>::value, int>>
422TaskGroup<Tp, Arg, MaxDepth>::join()
425 for(auto& itr : m_task_list)
427 for(auto& itr : m_future_list)
433template <typename Tp, typename Arg, intmax_t MaxDepth>
434template <typename Up, typename Rp,
435 enable_if_t<std::is_void<Up>::value && !std::is_void<Rp>::value, int>>
437TaskGroup<Tp, Arg, MaxDepth>::join()
440 for(auto& itr : m_task_list)
442 using RetT = decay_t<decltype(itr->get())>;
443 m_join(std::forward<RetT>(itr->get()));
445 for(auto& itr : m_future_list)
447 using RetT = decay_t<decltype(itr.get())>;
448 m_join(std::forward<RetT>(itr.get()));
453template <typename Tp, typename Arg, intmax_t MaxDepth>
455TaskGroup<Tp, Arg, MaxDepth>::clear()
457 m_future_list.clear();
461template <typename Tp, typename Arg, intmax_t MaxDepth>
463TaskGroup<Tp, Arg, MaxDepth>::internal_update()
466 m_pool = internal::get_default_threadpool();
470 std::stringstream ss{};
471 ss << "[TaskGroup]> " << __FUNCTION__ << "@" << __LINE__
472 << " :: nullptr to thread pool";
473 throw std::runtime_error(ss.str());
476 if(m_pool->is_tbb_threadpool())
478 m_tbb_task_group = new tbb_task_group_t{};
482template <typename Tp, typename Arg, intmax_t MaxDepth>
483int TaskGroup<Tp, Arg, MaxDepth>::f_verbose = GetEnv<int>("PTL_VERBOSE", 0);