Doxygen
threadpool.h
浏览该文件的文档.
1 /******************************************************************************
2  *
3  * Copyright (C) 1997-2020 by Dimitri van Heesch.
4  *
5  * Permission to use, copy, modify, and distribute this software and its
6  * documentation under the terms of the GNU General Public License is hereby
7  * granted. No representations are made about the suitability of this software
8  * for any purpose. It is provided "as is" without express or implied warranty.
9  * See the GNU General Public License for more details.
10  *
11  * Documents produced by Doxygen are derivative works derived from the
12  * input used in their production; they are not affected by this license.
13  *
14  */
15 
16 #ifndef THREADPOOL_H
17 #define THREADPOOL_H
18 
19 #include <condition_variable>
20 #include <deque>
21 #include <functional>
22 #include <future>
23 #include <mutex>
24 #include <thread>
25 #include <type_traits>
26 #include <utility>
27 #include <vector>
28 
29 /// Class managing a pool of worker threads.
30 /// Work can be queued by passing a function to queue(). A future will be
31 /// returned that can be used to obtain the result of the function after execution.
32 ///
33 /// Usage example:
34 /// @code
35 /// ThreadPool pool(10);
36 /// std::vector< std::future< int > > results;
37 /// for (int i=0;i<10;i++)
38 /// {
39 /// auto run = [](int i) { return i*i; };
40 /// results.emplace_back(pool.queue(std::bind(run,i)));
41 /// }
42 /// for (auto &f : results)
43 /// {
44 /// printf("Result %d:\n", f.get());
45 /// }
46 /// @endcode
48 {
49  public:
50  /// start N threads in the thread pool.
51  ThreadPool(std::size_t N=1)
52  {
53  for (std::size_t i = 0; i < N; ++i)
54  {
55  // each thread is a std::async running thread_task():
56  m_finished.push_back(
57  std::async(
58  std::launch::async,
59  [this]{ threadTask(); }
60  )
61  );
62  }
63  }
64  /// deletes the thread pool by finishing all threads
65  ~ThreadPool()
66  {
67  finish();
68  }
69 
70  /// Queue the callable function \a f for the threads to execute.
71  /// A future of the return type of the function is returned to capture the result.
72  template<class F, class R=std::result_of_t<F&()> >
73  std::future<R> queue(F&& f)
74  {
75  // We wrap the function object into a packaged task, splitting
76  // execution from the return value.
77  // Since the packaged_task object is not copyable, we create it on the heap
78  // and capture it via a shared pointer in a lambda and then assign that lambda
79  // to a std::function.
80  auto ptr = std::make_shared< std::packaged_task<R()> >(std::forward<F>(f));
81  auto taskFunc = [ptr]() { if (ptr->valid()) (*ptr)(); };
82 
83  auto r=ptr->get_future(); // get the return value before we hand off the task
84  {
85  std::unique_lock<std::mutex> l(m_mutex);
86  m_work.emplace_back(taskFunc);
87  m_cond.notify_one(); // wake a thread to work on the task
88  }
89 
90  return r; // return the future result of the task
91  }
92 
93  /// finish enques a "stop the thread" message for every thread,
94  /// then waits for them to finish
95  void finish()
96  {
97  {
98  std::unique_lock<std::mutex> l(m_mutex);
99  for(auto&& u : m_finished)
100  {
101  unused_variable(u);
102  m_work.push_back({}); // insert empty function object to signal abort
103  }
104  }
105  m_cond.notify_all();
106  m_finished.clear();
107  }
108  private:
109 
110  // helper to silence the compiler warning about unused variables
111  template <typename ...Args>
112  void unused_variable(Args&& ...args) { (void)(sizeof...(args)); }
113 
114  // the work that a worker thread does:
115  void threadTask()
116  {
117  while(true)
118  {
119  // pop a task off the queue:
120  std::function<void()> f;
121  {
122  // usual thread-safe queue code:
123  std::unique_lock<std::mutex> l(m_mutex);
124  if (m_work.empty())
125  {
126  m_cond.wait(l,[&]{return !m_work.empty();});
127  }
128  f = std::move(m_work.front());
129  m_work.pop_front();
130  }
131  // if the function is empty, it means we are asked to abort
132  if (!f) return;
133  // run the task
134  f();
135  }
136  }
137 
138  // the mutex, condition variable and deque form a single
139  // thread-safe triggered queue of tasks:
140  std::mutex m_mutex;
141  std::condition_variable m_cond;
142 
143  // hold the queue of work
144  std::deque< std::function<void()> > m_work;
145 
146  // this holds futures representing the worker threads being done:
147  std::vector< std::future<void> > m_finished;
148 };
149 
150 #endif
151 
ThreadPool::queue
std::future< R > queue(F &&f)
Queue the callable function f for the threads to execute.
Definition: threadpool.h:86
ThreadPool
Class managing a pool of worker threads.
Definition: threadpool.h:47
ThreadPool::m_work
std::deque< std::function< void()> > m_work
Definition: threadpool.h:157
ThreadPool::m_finished
std::vector< std::future< void > > m_finished
Definition: threadpool.h:160
ThreadPool::threadTask
void threadTask()
Definition: threadpool.h:128
ThreadPool::m_mutex
std::mutex m_mutex
Definition: threadpool.h:153
ThreadPool::ThreadPool
ThreadPool(std::size_t N=1)
start N threads in the thread pool.
Definition: threadpool.h:64
ThreadPool::finish
void finish()
finish enques a "stop the thread" message for every thread, then waits for them to finish
Definition: threadpool.h:108
ThreadPool::~ThreadPool
~ThreadPool()
deletes the thread pool by finishing all threads
Definition: threadpool.h:78
ThreadPool::m_cond
std::condition_variable m_cond
Definition: threadpool.h:154
ThreadPool::unused_variable
void unused_variable(Args &&...args)
Definition: threadpool.h:125