33 #ifndef MADNESS_WORLD_WORLDTHREAD_H__INCLUDED
34 #define MADNESS_WORLD_WORLDTHREAD_H__INCLUDED
49 #ifdef MADNESS_TASK_PROFILING
55 extern char * cplus_demangle (
const char *mangled,
int options);
56 #define DMGL_NO_OPTS 0
61 #endif // MADNESS_TASK_PROFILING
68 #ifndef _SC_NPROCESSORS_CONF
71 #include <sys/types.h>
72 #include <sys/sysctl.h>
82 void error(
const char *msg);
96 static pthread_key_t thread_key;
98 static void*
main(
void*
self);
103 static void init_thread_key() {
104 const int rc = pthread_key_create(&thread_key, NULL);
109 static void delete_thread_key() {
110 pthread_key_delete(thread_key);
113 void set_pool_thread_index(
int i) { pool_num = i; }
115 #if defined(HAVE_IBMBGQ) and defined(HPM)
116 static const int hpm_thread_id_all = -10;
117 static const int hpm_thread_id_main = -2;
118 static bool main_instrumented;
119 static bool all_instrumented;
120 static int hpm_thread_id;
131 virtual void run() = 0;
137 static void exit() { pthread_exit(0); }
140 const pthread_t&
get_id()
const {
return id; }
158 return static_cast<ThreadBase*
>(pthread_getspecific(thread_key));
161 #if defined(HAVE_IBMBGQ) and defined(HPM)
162 static void set_hpm_thread_env(
int hpm_thread_id);
171 void run() { f(args); }
183 void start(
void* (*f)(
void *),
void* args=0) {
237 else flags &= ~STEALABLE;
244 flags &= ~HIGHPRIORITY;
257 MADNESS_ASSERT(nthread>=0 && nthread<256);
258 flags = (flags & (~NTHREAD)) | (nthread &
NTHREAD);
268 template <
typename Archive>
296 : _nthread(nthread), _id(id), _barrier(barrier)
303 : _nthread(nthread), _id(id), _barrier(NULL)
309 int id()
const {
return _id;}
315 MADNESS_ASSERT(_barrier);
316 return _barrier->
enter(_id);
322 #ifdef MADNESS_TASK_PROFILING
324 namespace profiling {
333 std::pair<void*, unsigned short> id_;
334 unsigned short threads_;
344 static void print_demangled(std::ostream& os,
const char* symbol) {
348 #ifndef USE_LIBIBERTY
349 const char* name = abi::__cxa_demangle(symbol, 0, 0, &status);
351 char* name = cplus_demangle(symbol, DMGL_NO_OPTS);
358 os << symbol <<
"\t";
372 void*
const * func_ptr =
const_cast<void*
const *
>(& id_.first);
373 char** bt_sym = backtrace_symbols(func_ptr, 1);
382 std::istringstream iss(bt_sym[0]);
385 iss >> frame >> file >> address >> mangled_name;
386 #else // Assume Linux
389 const char* first = strchr(bt_sym[0],
'(');
392 const char* last = strrchr(first,
'+');
394 mangled_name.assign(first, (last - first) - 1);
415 void start(
const std::pair<void*, unsigned short>&
id,
416 const unsigned short threads,
const double submit_time)
420 times_[0] = submit_time;
435 friend std::ostream&
operator<<(std::ostream& os,
const TaskEvent& te) {
437 os << std::hex << std::showbase << te.id_.first <<
438 std::dec << std::noshowbase <<
"\t";
441 switch(te.id_.second) {
447 if(! mangled_name.empty())
448 print_demangled(os, mangled_name.c_str());
454 print_demangled(os, static_cast<const char*>(te.id_.first));
463 const std::streamsize precision = os.precision();
465 os << std::fixed <<
"\t" << te.times_[0]
466 <<
"\t" << te.times_[1] <<
"\t" << te.times_[2];
467 os.precision(precision);
476 class TaskEventListBase {
478 TaskEventListBase* next_;
480 TaskEventListBase(
const TaskEventListBase&);
481 TaskEventListBase& operator=(
const TaskEventListBase&);
486 TaskEventListBase() : next_(NULL) { }
489 virtual ~TaskEventListBase() { }
492 TaskEventListBase* next()
const {
return next_; }
497 void insert(TaskEventListBase* list) {
508 friend inline std::ostream&
operator<<(std::ostream& os,
const TaskEventListBase& tel) {
509 return tel.print_events(os);
515 virtual std::ostream& print_events(std::ostream&)
const = 0;
523 class TaskEventList :
public TaskEventListBase {
529 TaskEventList(
const TaskEventList&);
530 TaskEventList& operator=(
const TaskEventList&);
535 TaskEventList(
const unsigned int nmax) :
536 TaskEventListBase(), n_(0ul), events_(new TaskEvent[nmax])
540 virtual ~TaskEventList() {
delete [] events_; }
547 TaskEvent* event() {
return events_ + (n_++); }
552 virtual std::ostream& print_events(std::ostream& os)
const {
554 for(std::size_t i = 0; i < n_; ++i)
555 os << thread_id <<
"\t" << events_[i] << std::endl;
568 TaskEventListBase* head_;
569 TaskEventListBase* tail_;
571 static Mutex output_mutex_;
574 TaskProfiler(
const TaskProfiler&);
575 TaskProfiler& operator=(
const TaskProfiler&);
578 static const char* output_file_name_;
584 TaskProfiler() : head_(NULL), tail_(NULL) { }
589 TaskEventListBase* next = NULL;
590 while(head_ != NULL) {
591 next = head_->next();
601 TaskEventList* new_list(
const std::size_t nmax) {
603 TaskEventList* list =
new TaskEventList(nmax);
624 void write_to_file();
629 #endif // MADNESS_TASK_PROFILING
637 #ifdef HAVE_INTEL_TBB
639 #endif // HAVE_INTEL_TBB
648 #ifdef MADNESS_TASK_PROFILING
649 profiling::TaskEvent* task_event_;
651 std::pair<void*, unsigned short> id_;
653 void set_event(profiling::TaskEvent* task_event) {
654 task_event_ = task_event;
662 #endif // MADNESS_TASK_PROFILING
668 template <
typename T>
669 union FunctionPointerGrabber {
677 template <
typename fnT>
678 static typename enable_if_c<detail::function_traits<fnT>::value ||
679 detail::memfunc_traits<fnT>::value>::type
680 make_id(std::pair<void*,unsigned short>&
id, fnT fn) {
681 FunctionPointerGrabber<fnT> poop;
687 template <
typename fnobjT>
689 detail::memfunc_traits<fnobjT>::value>::type
690 make_id(std::pair<void*,unsigned short>&
id,
const fnobjT&) {
691 id.first =
reinterpret_cast<void*
>(
const_cast<char*
>(
typeid(fnobjT).name()));
697 virtual void get_id(std::pair<void*,unsigned short>&
id)
const {
703 bool run_multi_threaded() {
704 #ifdef HAVE_INTEL_TBB
705 MADNESS_EXCEPTION(
"run_multi_threaded should not be called when using Intel TBB", 1);
713 #ifdef MADNESS_TASK_PROFILING
714 task_event_->start(id_, nthread, submit_time_);
715 #endif // MADNESS_TASK_PROFILING
716 TAU_START(
"PoolTaskInterface::run_multithreaded run(TaskThreadEnv)");
717 run(TaskThreadEnv(1,0,0));
718 TAU_STOP(
"PoolTaskInterface::run_multithreaded run(TaskThreadEnv)");
719 #ifdef MADNESS_TASK_PROFILING
721 #endif // MADNESS_TASK_PROFILING
726 volatile bool barrier_flag;
729 #ifdef MADNESS_TASK_PROFILING
731 task_event_->start(id_, nthread, submit_time_);
732 #endif // MADNESS_TASK_PROFILING
734 TAU_START(
"PoolTaskInterface::run_multithreaded run(TaskThreadEnv)");
735 run(TaskThreadEnv(nthread,
id, barrier));
736 TAU_STOP(
"PoolTaskInterface::run_multithreaded run(TaskThreadEnv)");
738 #ifdef MADNESS_TASK_PROFILING
739 const bool cleanup = barrier->
enter(
id);
740 if(cleanup) task_event_->stop();
743 return barrier->
enter(
id);
744 #endif // MADNESS_TASK_PROFILING
746 #endif // HAVE_INTEL_TBB
774 barrier =
new Barrier(nthread);
781 tbb::task* execute() {
791 static inline void *
operator new(std::size_t size)
throw(std::bad_alloc);
793 #endif // HAVE_INTEL_TBB
796 static inline void operator delete(
void* p, std::size_t size)
throw() {
798 #ifdef HAVE_INTEL_TBB
799 tbb::task::destroy(*reinterpret_cast<tbb::task*>(p));
801 ::operator
delete(p);
802 #endif // HAVE_INTEL_TBB
827 virtual void get_id(std::pair<void*,unsigned short>&
id)
const {
839 #ifdef MADNESS_TASK_PROFILING
840 profiling::TaskProfiler profiler_;
841 #endif // MADNESS_TASK_PROFILING
850 #ifdef MADNESS_TASK_PROFILING
851 profiling::TaskProfiler& profiler() {
return profiler_; }
853 #endif // MADNESS_TASK_PROFILING
868 volatile bool finish;
874 #warning WE NEED TO TUNE THE nmax PARAMETER
876 static const int nmax=128;
877 static double await_timeout;
879 #if defined(HAVE_IBMBGQ) and defined(HPM)
880 static unsigned int main_hpmctx;
889 int default_nthread();
897 if (!wait && queue.
empty())
return false;
898 std::pair<PoolTaskInterface*,bool> t = queue.
pop_front(wait);
899 #ifdef MADNESS_TASK_PROFILING
900 profiling::TaskEventList* event_list =
901 this_thread->profiler().new_list(1);
902 #endif // MADNESS_TASK_PROFILING
904 if (t.second && t.first) {
905 #ifdef MADNESS_TASK_PROFILING
906 t.first->set_event(event_list->event());
907 #endif // MADNESS_TASK_PROFILING
908 if (t.first->run_multi_threaded())
915 bool run_tasks(
bool wait, ThreadPoolThread* this_thread) {
930 PoolTaskInterface* taskbuf[nmax];
931 int ntask = queue.
pop_front(nmax, taskbuf, wait);
932 #ifdef MADNESS_TASK_PROFILING
933 profiling::TaskEventList* event_list =
934 this_thread->profiler().new_list(ntask);
935 #endif // MADNESS_TASK_PROFILING
936 for (
int i=0; i<ntask; ++i) {
938 #ifdef MADNESS_TASK_PROFILING
939 taskbuf[i]->set_event(event_list->event());
940 #endif // MADNESS_TASK_PROFILING
941 if (taskbuf[i]->run_multi_threaded()) {
950 void thread_main(ThreadPoolThread*
const thread);
953 static void* pool_thread_main(
void *v);
957 static ThreadPool* instance() {
958 #ifndef MADNESS_ASSERTIONS_DISABLE
960 std::cerr <<
"!!! ERROR: The thread pool has not been initialized.\n"
961 <<
"!!! ERROR: Call madness::initialize before submitting tasks to the task queue.\n";
974 static tbb::empty_task* tbb_parent_task;
975 static tbb::task_scheduler_init* tbb_scheduler;
979 static void begin(
int nthread=-1);
985 #ifdef MADNESS_TASK_PROFILING
987 #endif // MADNESS_TASK_PROFILING
989 ThreadPool::tbb_parent_task->increment_ref_count();
991 ThreadPool::tbb_parent_task->spawn(*task);
994 ThreadPool::tbb_parent_task->enqueue(*task);
1002 instance()->queue.push_front(task);
1005 instance()->queue.push_back(task, task_threads);
1007 #endif // HAVE_INTEL_TBB
1010 template <
typename opT>
1017 static void add(
const std::vector<PoolTaskInterface*>& tasks) {
1019 MADNESS_EXCEPTION(
"Do not add tasks to the madness task queue when using Intel TBB.", 1);
1021 typedef std::vector<PoolTaskInterface*>::const_iterator iteratorT;
1022 for (iteratorT it=tasks.begin(); it!=tasks.end(); ++it) {
1032 #ifdef MADNESS_TASK_PROFILING
1035 return instance()->run_tasks(
false, NULL);
1036 #endif // MADNESS_TASK_PROFILING
1041 return instance()->nthreads;
1046 return instance()->queue.size();
1055 template <
typename Probe>
1056 static void await(
const Probe& probe,
bool dowork =
true) {
1057 double start = cpu_time();
1058 const double timeout = await_timeout;
1066 const bool working =
false;
1069 #endif // HAVE_INTEL_TBB
1070 const double current_time = cpu_time();
1075 start = current_time;
1079 if(((current_time - start) > timeout) && (timeout > 1.0)) {
1080 std::cout <<
"!!MADNESS: Hung queue?\n";
1085 0, 1, __LINE__, __FUNCTION__, __FILE__);
1095 tbb_parent_task->decrement_ref_count();
1096 tbb::task::destroy(*tbb_parent_task);
1097 tbb_scheduler->terminate();
1098 delete(tbb_scheduler);
1103 #ifdef HAVE_INTEL_TBB
1104 inline void * PoolTaskInterface::operator
new(std::size_t size)
throw(std::bad_alloc)
1106 if(! ThreadPool::tbb_parent_task) {
1107 std::cerr <<
"!!! Error: Cannot allocate task object because the thread pool has not been initialized.\n";
1108 throw std::bad_alloc();
1110 return ::operator
new(size, ThreadPool::tbb_parent_task->allocate_child());
1112 #endif // HAVE_INTEL_TBB
1116 #endif // MADNESS_WORLD_WORLDTHREAD_H__INCLUDED
bool is_stealable() const
Definition: worldthread.h:224
int id() const
Definition: worldthread.h:309
TaskThreadEnv(int nthread, int id, Barrier *barrier)
Definition: worldthread.h:295
static void set_affinity_pattern(const bool bind[3], const int cpu[3])
Specify the affinity pattern or how to bind threads to cpus.
Definition: worldthread.cc:206
void error(const char *msg)
Definition: world.cc:128
int main(int argc, char **argv)
Definition: DFcode/mcpfit.cc:983
void set_nthread(int nthread)
Are you sure this is what you want to call?
Definition: worldthread.h:256
ThreadBase()
Default constructor ... must invoke start() to actually begin the thread.
Definition: worldthread.h:126
void start(void *(*f)(void *), void *args=0)
Definition: worldthread.h:183
static ThreadBase * this_thread()
Definition: worldthread.h:157
static TaskAttributes hipri()
Definition: worldthread.h:277
PoolTaskInterface()
Definition: worldthread.h:750
A no-op task used for various purposes.
Definition: worldthread.h:822
void start()
Start the thread running.
Definition: worldthread.cc:156
int pop_front(int nmax, T *r, bool wait)
Pop multiple values off the front of queue ... returns number popped ... might be zero...
Definition: dqueue.h:210
~ThreadPool()
Definition: worldthread.h:1093
static std::size_t queue_size()
Returns number of tasks in the queue.
Definition: worldthread.h:1045
PoolTaskInterface(const TaskAttributes &attr)
Definition: worldthread.h:757
virtual ~PoolTaskNull()
Definition: worldthread.h:825
static void begin(int nthread=-1)
Please invoke while in single threaded environment.
Definition: worldthread.cc:443
Used to pass info about thread environment into users task.
Definition: worldthread.h:289
::std::string string
Definition: gtest-port.h:872
const pthread_t & get_id() const
Get the pthread id of this thread (if running)
Definition: worldthread.h:140
TaskAttributes(const TaskAttributes &attr)
Definition: worldthread.h:218
bool barrier() const
Definition: worldthread.h:311
Grossly simplified Boost-like type traits and templates.
Simplified thread wrapper to hide pthread complexity.
Definition: worldthread.h:167
void scan(opT &op)
Definition: worldthread.h:1011
bool enter(const int id)
Each thread calls this with its id (0,..,nthread-1) to enter the barrier.
Definition: worldmutex.h:651
bool empty() const
Definition: dqueue.h:281
int get_nthread() const
Definition: worldthread.h:261
Definition: worldmutex.h:72
void wait()
Definition: worldmutex.cc:43
static void await(const Probe &probe, bool dowork=true)
Gracefully wait for a condition to become true ... executes tasks if any in queue.
Definition: worldthread.h:1056
static const unsigned long STEALABLE
Definition: worldthread.h:213
virtual void run()=0
You implement this to do useful work.
void run(const TaskThreadEnv &)
Override this method to implement a multi-threaded task.
Definition: worldthread.h:824
Thread(void *(*f)(void *), void *args=0)
Create a thread and start it running f(args)
Definition: worldthread.h:178
Multi-threaded queue to manage and run tasks.
Definition: worldtask.h:393
Lowest level task interface.
Definition: worldthread.h:636
void scan(opT &op)
Definition: dqueue.h:188
virtual ~Thread()
Definition: worldthread.h:189
static const unsigned long GENERATOR
Definition: worldthread.h:212
const T1 &f1 return GTEST_2_TUPLE_() T(f0, f1)
ThreadPool thread object.
Definition: worldthread.h:836
virtual ~ThreadPoolThread()
Virtual destructor.
Definition: worldthread.h:848
static disable_if_c< detail::function_traits< fnobjT >::value||detail::memfunc_traits< fnobjT >::value >::type make_id(std::pair< void *, unsigned short > &id, const fnobjT &)
Definition: worldthread.h:690
static void add(PoolTaskInterface *task)
Add a new task to the pool.
Definition: worldthread.h:984
A thread safe, fast but simple doubled-ended queue.
Definition: dqueue.h:72
int cancel() const
Cancel this thread.
Definition: worldthread.h:146
static void exit()
A thread can call this to terminate its execution.
Definition: worldthread.h:137
static void add(const std::vector< PoolTaskInterface * > &tasks)
Add a vector of tasks to the pool.
Definition: worldthread.h:1017
A singleton pool of threads for dynamic execution of tasks.
Definition: worldthread.h:859
virtual void run(const TaskThreadEnv &info)=0
Override this method to implement a multi-threaded task.
void serialize(Archive &ar)
Definition: worldthread.h:269
void reset()
Definition: worldmutex.h:87
void set_highpriority(bool hipri)
Definition: worldthread.h:240
bool is_high_priority() const
Definition: worldthread.h:226
static std::size_t size()
Returns number of threads in the pool.
Definition: worldthread.h:1040
An integer with atomic set, get, read+inc, read+dec, dec+test operations.
Definition: atomicint.h:73
static const unsigned long NTHREAD
Definition: worldthread.h:211
static TaskAttributes generator()
Definition: worldthread.h:273
int nthread() const
Definition: worldthread.h:307
Definition: worldmutex.h:622
static bool run_task()
An otherwise idle thread can all this to run a task.
Definition: worldthread.h:1031
void register_thread(int id, volatile bool *pflag)
Each thread calls this once before first use.
Definition: worldmutex.h:640
int get_pool_thread_index() const
Get index of thread in ThreadPool (0,...,nthread-1) or -1 if not in ThreadPool.
Definition: worldthread.h:143
virtual ~PoolTaskInterface()
Definition: worldthread.h:816
void set_stealable(bool stealable)
Definition: worldthread.h:235
TaskAttributes(unsigned long flags=0)
Definition: worldthread.h:216
virtual ~ThreadBase()
Definition: worldthread.h:128
virtual ~TaskAttributes()
Definition: worldthread.h:220
Contains attributes of a task.
Definition: worldthread.h:208
ThreadPoolThread()
Default constructor.
Definition: worldthread.h:845
Thread()
Default constructor ... must invoke start() to actually begin the thread.
Definition: worldthread.h:175
std::ostream & operator<<(std::ostream &s, const ContractedGaussianShell &c)
Definition: chem/molecularbasis.cc:38
static enable_if_c< detail::function_traits< fnT >::value||detail::memfunc_traits< fnT >::value >::type make_id(std::pair< void *, unsigned short > &id, fnT fn)
Definition: worldthread.h:680
static void set_affinity(int logical_id, int ind=-1)
Definition: worldthread.cc:224
static void end()
Definition: worldthread.cc:498
Simplified thread wrapper to hide pthread complexity.
Definition: worldthread.h:91
disable_if from Boost for conditionally instantiating templates based on type
Definition: enable_if.h:68
static const DQStats & get_stats()
Returns queue statistics.
Definition: worldthread.cc:522
Tensor< double > op(const Tensor< double > &x)
Definition: kain.cc:508
void set_nthread(int nthread)
Call this to reset the number of threads before the task is submitted.
Definition: worldthread.h:769
#define MADNESS_EXCEPTION(msg, value)
Definition: worldexc.h:88
#define TAU_STOP(a)
Definition: TAU.h:7
void set_generator(bool generator_hint)
Definition: worldthread.h:228
static int num_hw_processors()
Get no. of actual hardware processors.
Definition: worldthread.cc:172
Holds machinery to set up Functions/FuncImpls using various Factories and Interfaces.
Definition: chem/atomutil.cc:45
bool is_generator() const
Definition: worldthread.h:222
double wall_time()
Returns the wall time in seconds relative to arbitrary origin.
Definition: world.cc:248
static const unsigned long HIGHPRIORITY
Definition: worldthread.h:214
#define TAU_START(a)
Definition: TAU.h:6
Most exceptions thrown in MADNESS should be derived from these.
Definition: worldexc.h:53
static TaskAttributes multi_threaded(int nthread)
Definition: worldthread.h:281