39, m_is_clone((parent) ? true : false)
40, m_thread_bin((parent) ? (
ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
41, m_insert_bin((parent) ? (
ThreadPool::get_this_thread_id() % (nworkers + 1)) : 0)
42, m_hold((parent) ? parent->m_hold : new
std::atomic_bool(false))
43, m_ntasks((parent) ? parent->m_ntasks : new
std::atomic_uintmax_t(0))
49 for(intmax_t i = 0; i < nworkers + 1; ++i)
54 if(GetEnv<int>(
"PTL_VERBOSE", 0) > 3)
59 << __FUNCTION__ <<
":" << __LINE__ <<
"] "
60 <<
"this = " <<
this <<
", "
61 <<
"clone = " << std::boolalpha <<
m_is_clone <<
", "
64 <<
"hold = " <<
m_hold->load() <<
" @ " <<
m_hold <<
", "
69 std::cout << ss.str() << std::endl;
130 static thread_local intmax_t tl_bin =
149 TaskSubQueue* task_subq = (*m_subqueues)[tbin % (
m_workers + 1)];
153 auto get_task = [&]() {
154 if(task_subq->AcquireClaim())
157 _task = task_subq->PopTask(
true);
159 task_subq->ReleaseClaim();
164 return (_task !=
nullptr);
169 while(!task_subq->empty())
188 intmax_t
n = (subq < 0) ? tbin : subq;
192 if(
m_hold->load(std::memory_order_relaxed))
199 auto get_task = [&](intmax_t _n) {
200 TaskSubQueue* task_subq = (*m_subqueues)[_n % (
m_workers + 1)];
203 if(!task_subq->empty() && task_subq->AcquireClaim())
206 _task = task_subq->PopTask(
n == tbin);
208 task_subq->ReleaseClaim();
213 return (_task !=
nullptr);
221 for(intmax_t i = 0; i < nitr; ++i, ++
n)
243 bool spin =
m_hold->load(std::memory_order_relaxed);
259 auto insert_task = [&](intmax_t _n) {
260 TaskSubQueue* task_subq = (*m_subqueues)[_n];
267 if(task_subq->AcquireClaim())
270 task_subq->PushTask(std::move(task));
272 task_subq->ReleaseClaim();
286 while(!insert_task(
n))
308 using thread_execute_map_t = std::map<int64_t, bool>;
316 task_group_type tg{ [](
int& ref,
int i) {
return (ref += i); },
tp };
320 while(
tp->get_active_threads_count() > 0)
321 ThisThread::sleep_for(std::chrono::milliseconds(10));
323 thread_execute_map_t thread_execute_map{};
324 std::vector<std::shared_ptr<VTask>> _tasks{};
334 auto thread_specific_func = [&]() {
354 int nexecuted = tg.join();
357 std::stringstream msg;
358 msg <<
"Failure executing routine on all threads! Only " << nexecuted
359 <<
" threads executed function out of " <<
m_workers <<
" workers";
360 std::cerr << msg.str() << std::endl;
372 using thread_execute_map_t = std::map<int64_t, bool>;
374 task_group_type tg{ [](
int& ref,
int i) {
return (ref += i); },
tp };
378 while(
tp->get_active_threads_count() > 0)
379 ThisThread::sleep_for(std::chrono::milliseconds(10));
387 thread_execute_map_t thread_execute_map{};
392 auto thread_specific_func = [&]() {
398 if(!_executed && tid_set.count(ThisThread::get_id()) > 0)
408 if(tid_set.count(ThisThread::get_id()) > 0)
420 decltype(tid_set.size()) nexecuted = tg.join();
421 if(nexecuted != tid_set.size())
423 std::stringstream msg;
424 msg <<
"Failure executing routine on specific threads! Only " << nexecuted
425 <<
" threads executed function out of " << tid_set.size() <<
" workers";
426 std::cerr << msg.str() << std::endl;
437 while(!(_hold =
m_hold->load(std::memory_order_relaxed)))
439 m_hold->compare_exchange_strong(_hold,
true, std::memory_order_release,
440 std::memory_order_relaxed);
450 while((_hold =
m_hold->load(std::memory_order_relaxed)))
452 m_hold->compare_exchange_strong(_hold,
false, std::memory_order_release,
453 std::memory_order_relaxed);
static ThreadData *& GetInstance()
static uintmax_t get_this_thread_id()
virtual VUserTaskQueue * clone() override
std::atomic_uintmax_t * m_ntasks
std::vector< TaskSubQueue * > TaskSubQueueContainer
virtual void resize(intmax_t) override
virtual task_pointer GetTask(intmax_t subq=-1, intmax_t nitr=-1) override
virtual intmax_t GetThreadBin() const override
virtual void ExecuteOnSpecificThreads(ThreadIdSet tid_set, ThreadPool *tp, function_type f) override
intmax_t GetInsertBin() const
UserTaskQueue(intmax_t nworkers=-1, UserTaskQueue *=nullptr)
std::atomic_bool * m_hold
virtual void ExecuteOnAllThreads(ThreadPool *tp, function_type f) override
TaskSubQueueContainer * m_subqueues
virtual intmax_t InsertTask(task_pointer &&, ThreadData *=nullptr, intmax_t subq=-1) override PTL_NO_SANITIZE_THREAD
bool true_empty() const override
virtual ~UserTaskQueue() override
task_pointer GetThreadBinTask()
std::shared_ptr< VTask > task_pointer
size_type true_size() const override
std::set< ThreadId > ThreadIdSet
std::function< void()> function_type
std::recursive_mutex RecursiveMutex
MutexTp & TypeMutex(const unsigned int &_n=0)