Geant4-11
UserTaskQueue.cc
Go to the documentation of this file.
1//
2// MIT License
3// Copyright (c) 2020 Jonathan R. Madsen
4// Permission is hereby granted, free of charge, to any person obtaining a copy
5// of this software and associated documentation files (the "Software"), to deal
6// in the Software without restriction, including without limitation the rights
7// to use, copy, modify, merge, publish, distribute, sublicense, and
8// copies of the Software, and to permit persons to whom the Software is
9// furnished to do so, subject to the following conditions:
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED
12// "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT
13// LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
14// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
15// HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
16// ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
17// WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
18//
19// ---------------------------------------------------------------
20// Tasking class implementation
21// Class Description:
22// ---------------------------------------------------------------
23// Author: Jonathan Madsen
24// ---------------------------------------------------------------
25
26#include "PTL/UserTaskQueue.hh"
27#include "PTL/Task.hh"
28#include "PTL/TaskGroup.hh"
29#include "PTL/ThreadPool.hh"
30
31#include <cassert>
32
33using namespace PTL;
34
35//======================================================================================//
36
37UserTaskQueue::UserTaskQueue(intmax_t nworkers, UserTaskQueue* parent)
38: VUserTaskQueue(nworkers)
39, m_is_clone((parent) ? true : false)
40, m_thread_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
41, m_insert_bin((parent) ? (ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
42, m_hold((parent) ? parent->m_hold : new std::atomic_bool(false))
43, m_ntasks((parent) ? parent->m_ntasks : new std::atomic_uintmax_t(0))
44, m_subqueues((parent) ? parent->m_subqueues : new TaskSubQueueContainer())
45{
46 // create nthreads + 1 subqueues so there is always a subqueue available
47 if(!parent)
48 {
49 for(intmax_t i = 0; i < nworkers + 1; ++i)
50 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
51 }
52
53#if defined(DEBUG)
54 if(GetEnv<int>("PTL_VERBOSE", 0) > 3)
55 {
56 RecursiveAutoLock l(TypeMutex<decltype(std::cout), RecursiveMutex>());
57 std::stringstream ss;
58 ss << ThreadPool::get_this_thread_id() << "> " << ThisThread::get_id() << " ["
59 << __FUNCTION__ << ":" << __LINE__ << "] "
60 << "this = " << this << ", "
61 << "clone = " << std::boolalpha << m_is_clone << ", "
62 << "thread = " << m_thread_bin << ", "
63 << "insert = " << m_insert_bin << ", "
64 << "hold = " << m_hold->load() << " @ " << m_hold << ", "
65 << "tasks = " << m_ntasks->load() << " @ " << m_ntasks << ", "
66 << "subqueue = " << m_subqueues << ", "
67 << "size = " << true_size() << ", "
68 << "empty = " << true_empty();
69 std::cout << ss.str() << std::endl;
70 }
71#endif
72}
73
74//======================================================================================//
75
77{
78 if(!m_is_clone)
79 {
80 for(auto& itr : *m_subqueues)
81 {
82 assert(itr->empty());
83 delete itr;
84 }
85 m_subqueues->clear();
86 delete m_hold;
87 delete m_ntasks;
88 delete m_subqueues;
89 }
90}
91
92//======================================================================================//
93
94void
96{
98 if(m_workers < n)
99 {
100 while(m_workers < n)
101 {
102 m_subqueues->emplace_back(new TaskSubQueue(m_ntasks));
103 ++m_workers;
104 }
105 }
106 else if(m_workers > n)
107 {
108 while(m_workers > n)
109 {
110 delete m_subqueues->back();
111 m_subqueues->pop_back();
112 --m_workers;
113 }
114 }
115}
116
117//======================================================================================//
118
121{
122 return new UserTaskQueue(workers(), this);
123}
124//======================================================================================//
125
126intmax_t
128{
129 // get a thread id number
130 static thread_local intmax_t tl_bin =
132 return tl_bin;
133}
134
135//======================================================================================//
136
137intmax_t
139{
140 return (++m_insert_bin % (m_workers + 1));
141}
142
143//======================================================================================//
144
147{
148 intmax_t tbin = GetThreadBin();
149 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (m_workers + 1)];
150 task_pointer _task = nullptr;
151
152 //------------------------------------------------------------------------//
153 auto get_task = [&]() {
154 if(task_subq->AcquireClaim())
155 {
156 // run task
157 _task = task_subq->PopTask(true);
158 // release the claim on the bin
159 task_subq->ReleaseClaim();
160 }
161 if(_task)
162 --(*m_ntasks);
163 // return success if valid pointer
164 return (_task != nullptr);
165 };
166 //------------------------------------------------------------------------//
167
168 // while not empty
169 while(!task_subq->empty())
170 {
171 if(get_task())
172 break;
173 }
174 return _task;
175}
176
177//======================================================================================//
178
180UserTaskQueue::GetTask(intmax_t subq, intmax_t nitr)
181{
182 // exit if empty
183 if(this->true_empty())
184 return nullptr;
185
186 // ensure the thread has a bin assignment
187 intmax_t tbin = GetThreadBin();
188 intmax_t n = (subq < 0) ? tbin : subq;
189 if(nitr < 1)
190 nitr = (m_workers + 1); // * m_ntasks->load(std::memory_order_relaxed);
191
192 if(m_hold->load(std::memory_order_relaxed))
193 {
194 return GetThreadBinTask();
195 }
196
197 task_pointer _task = nullptr;
198 //------------------------------------------------------------------------//
199 auto get_task = [&](intmax_t _n) {
200 TaskSubQueue* task_subq = (*m_subqueues)[_n % (m_workers + 1)];
201 // try to acquire a claim for the bin
202 // if acquired, no other threads will access bin until claim is released
203 if(!task_subq->empty() && task_subq->AcquireClaim())
204 {
205 // pop task out of bin
206 _task = task_subq->PopTask(n == tbin);
207 // release the claim on the bin
208 task_subq->ReleaseClaim();
209 }
210 if(_task)
211 --(*m_ntasks);
212 // return success if valid pointer
213 return (_task != nullptr);
214 };
215 //------------------------------------------------------------------------//
216
217 // there are num_workers+1 bins so there is always a bin that is open
218 // execute num_workers+2 iterations so the thread checks its bin twice
219 // while(!empty())
220 {
221 for(intmax_t i = 0; i < nitr; ++i, ++n)
222 {
223 if(get_task(n % (m_workers + 1)))
224 return _task;
225 }
226 }
227
228 // only reached if looped over all bins (and looked in own bin twice)
229 // and found no work so return an empty task and the thread will be put to
230 // sleep if there is still no work by the time it reaches its
231 // condition variable
232 return _task;
233}
234
235//======================================================================================//
236
237intmax_t
239{
240 // increment number of tasks
241 ++(*m_ntasks);
242
243 bool spin = m_hold->load(std::memory_order_relaxed);
244 intmax_t tbin = GetThreadBin();
245
246 if(data && data->within_task)
247 {
248 subq = tbin;
249 // spin = true;
250 }
251
252 // subq is -1 unless specified so unless specified
253 // GetInsertBin() call increments a counter and returns
254 // counter % (num_workers + 1) so that tasks are distributed evenly
255 // among the bins
256 intmax_t n = (subq < 0) ? GetInsertBin() : subq;
257
258 //------------------------------------------------------------------------//
259 auto insert_task = [&](intmax_t _n) {
260 TaskSubQueue* task_subq = (*m_subqueues)[_n];
261 // TaskSubQueue* next_subq = (*m_subqueues)[(_n + 1) % (m_workers + 1)];
262 // if not threads bin and size difference, insert into smaller
263 // if(n != tbin && next_subq->size() < task_subq->size())
264 // task_subq = next_subq;
265 // try to acquire a claim for the bin
266 // if acquired, no other threads will access bin until claim is released
267 if(task_subq->AcquireClaim())
268 {
269 // push the task into the bin
270 task_subq->PushTask(std::move(task));
271 // release the claim on the bin
272 task_subq->ReleaseClaim();
273 // return success
274 return true;
275 }
276 return false;
277 };
278 //------------------------------------------------------------------------//
279
280 // if not in "hold/spin mode", where thread only inserts tasks into
281 // specified bin, then move onto next bin
282 //
283 if(spin)
284 {
285 n = n % (m_workers + 1);
286 while(!insert_task(n))
287 ;
288 return n;
289 }
290
291 // there are num_workers+1 bins so there is always a bin that is open
292 // execute num_workers+2 iterations so the thread checks its bin twice
293 while(true)
294 {
295 auto _n = (n++) % (m_workers + 1);
296 if(insert_task(_n))
297 return _n;
298 }
299 return GetThreadBin();
300}
301
302//======================================================================================//
303
304void
306{
307 using task_group_type = TaskGroup<int, int>;
308 using thread_execute_map_t = std::map<int64_t, bool>;
309
310 if(!tp->is_alive())
311 {
312 func();
313 return;
314 }
315
316 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
317
318 // wait for all threads to finish any work
319 // NOTE: will cause deadlock if called from a task
320 while(tp->get_active_threads_count() > 0)
321 ThisThread::sleep_for(std::chrono::milliseconds(10));
322
323 thread_execute_map_t thread_execute_map{};
324 std::vector<std::shared_ptr<VTask>> _tasks{};
325 _tasks.reserve(m_workers + 1);
326
327 AcquireHold();
328 for(int i = 0; i < (m_workers + 1); ++i)
329 {
330 if(i == GetThreadBin())
331 continue;
332
333 //--------------------------------------------------------------------//
334 auto thread_specific_func = [&]() {
335 ScopeDestructor _dtor = tg.get_scope_destructor();
336 static Mutex _mtx;
337 _mtx.lock();
338 bool& _executed = thread_execute_map[GetThreadBin()];
339 _mtx.unlock();
340 if(!_executed)
341 {
342 func();
343 _executed = true;
344 return 1;
345 }
346 return 0;
347 };
348 //--------------------------------------------------------------------//
349
350 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
351 }
352
353 tp->notify_all();
354 int nexecuted = tg.join();
355 if(nexecuted != m_workers)
356 {
357 std::stringstream msg;
358 msg << "Failure executing routine on all threads! Only " << nexecuted
359 << " threads executed function out of " << m_workers << " workers";
360 std::cerr << msg.str() << std::endl;
361 }
362 ReleaseHold();
363}
364
365//======================================================================================//
366
367void
369 function_type func)
370{
371 using task_group_type = TaskGroup<int, int>;
372 using thread_execute_map_t = std::map<int64_t, bool>;
373
374 task_group_type tg{ [](int& ref, int i) { return (ref += i); }, tp };
375
376 // wait for all threads to finish any work
377 // NOTE: will cause deadlock if called from a task
378 while(tp->get_active_threads_count() > 0)
379 ThisThread::sleep_for(std::chrono::milliseconds(10));
380
381 if(!tp->is_alive())
382 {
383 func();
384 return;
385 }
386
387 thread_execute_map_t thread_execute_map{};
388
389 //========================================================================//
390 // wrap the function so that it will only be executed if the thread
391 // has an ID in the set
392 auto thread_specific_func = [&]() {
393 ScopeDestructor _dtor = tg.get_scope_destructor();
394 static Mutex _mtx;
395 _mtx.lock();
396 bool& _executed = thread_execute_map[GetThreadBin()];
397 _mtx.unlock();
398 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
399 {
400 func();
401 _executed = true;
402 return 1;
403 }
404 return 0;
405 };
406 //========================================================================//
407
408 if(tid_set.count(ThisThread::get_id()) > 0)
409 func();
410
411 AcquireHold();
412 for(int i = 0; i < (m_workers + 1); ++i)
413 {
414 if(i == GetThreadBin())
415 continue;
416
417 InsertTask(tg.wrap(thread_specific_func), ThreadData::GetInstance(), i);
418 }
419 tp->notify_all();
420 decltype(tid_set.size()) nexecuted = tg.join();
421 if(nexecuted != tid_set.size())
422 {
423 std::stringstream msg;
424 msg << "Failure executing routine on specific threads! Only " << nexecuted
425 << " threads executed function out of " << tid_set.size() << " workers";
426 std::cerr << msg.str() << std::endl;
427 }
428 ReleaseHold();
429}
430
431//======================================================================================//
432
433void
435{
436 bool _hold;
437 while(!(_hold = m_hold->load(std::memory_order_relaxed)))
438 {
439 m_hold->compare_exchange_strong(_hold, true, std::memory_order_release,
440 std::memory_order_relaxed);
441 }
442}
443
444//======================================================================================//
445
446void
448{
449 bool _hold;
450 while((_hold = m_hold->load(std::memory_order_relaxed)))
451 {
452 m_hold->compare_exchange_strong(_hold, false, std::memory_order_release,
453 std::memory_order_relaxed);
454 }
455}
456
457//======================================================================================//
static ThreadData *& GetInstance()
Definition: ThreadData.cc:35
static uintmax_t get_this_thread_id()
Definition: ThreadPool.cc:122
virtual VUserTaskQueue * clone() override
std::atomic_uintmax_t * m_ntasks
std::vector< TaskSubQueue * > TaskSubQueueContainer
virtual void resize(intmax_t) override
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1) override
virtual intmax_t GetThreadBin() const override
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
intmax_t GetInsertBin() const
UserTaskQueue(intmax_t nworkers=-1, UserTaskQueue *=nullptr)
std::atomic_bool * m_hold
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f) override
TaskSubQueueContainer * m_subqueues
virtual intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) override PTL_NO_SANITIZE_THREAD
bool true_empty() const override
virtual ~UserTaskQueue() override
task_pointer GetThreadBinTask()
std::shared_ptr< VTask > task_pointer
size_type true_size() const override
intmax_t workers() const
std::set< ThreadId > ThreadIdSet
std::function< void()> function_type
Definition: AutoLock.hh:254
std::recursive_mutex RecursiveMutex
Definition: Threading.hh:78
MutexTp & TypeMutex(const unsigned int &_n=0)
Definition: Threading.hh:119
std::mutex Mutex
Definition: Threading.hh:77