5#include <condition_variable>
27 using concurrency_t = std::invoke_result_t<
decltype(std::thread::hardware_concurrency)>;
29 std::atomic<bool> running =
false;
31 std::condition_variable task_available_cv = {};
33 std::condition_variable task_done_cv = {};
35 std::queue<std::function<void()>> tasks = {};
37 std::atomic<size_t> tasks_total = 0;
39 mutable std::mutex tasks_mutex = {};
41 concurrency_t thread_count = 0;
43 std::unique_ptr<std::thread[]> threads =
nullptr;
45 std::atomic<bool> waiting =
false;
47 void create_threads() {
49 for(concurrency_t i = 0; i < thread_count; ++i) threads[i] = std::thread(&thread_pool::worker,
this);
52 void destroy_threads() {
54 task_available_cv.notify_all();
55 for(concurrency_t i = 0; i < thread_count; ++i) threads[i].join();
60 std::function<void()> task;
61 std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
62 task_available_cv.wait(tasks_lock, [&] {
return !tasks.empty() || !running; });
64 task = std::move(tasks.front());
69 if(waiting) task_done_cv.notify_one();
75 explicit thread_pool(
const concurrency_t thread_count_ = std::thread::hardware_concurrency())
76 : thread_count(thread_count_ > 1 ? thread_count_ : 1)
77 , threads(std::make_unique<std::thread[]>(thread_count)) { create_threads(); }
85 const std::scoped_lock tasks_lock(tasks_mutex);
90 const std::scoped_lock tasks_lock(tasks_mutex);
91 return tasks_total - tasks.size();
98 template<
typename F,
typename...
A>
void push_task(
const F& task,
const A&... args) {
100 const std::scoped_lock tasks_lock(tasks_mutex);
101 if constexpr(
sizeof...(args) == 0) tasks.push(std::function<
void()>(task));
102 else tasks.push(std::function<
void()>([task, args...] { task(args...); }));
105 task_available_cv.notify_one();
108 void reset(
const concurrency_t thread_count_ = std::thread::hardware_concurrency()) {
109 const bool was_paused =
paused;
113 thread_count = thread_count_ > 1 ? thread_count_ : 1;
114 threads = std::make_unique<std::thread[]>(thread_count);
119 template<
typename F,
typename...
A,
typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>> std::future<R>
submit(
const F& task,
const A&... args) {
120 std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
122 [task, args..., task_promise] {
124 if constexpr(std::is_void_v<R>) {
126 task_promise->set_value();
128 else { task_promise->set_value(task(args...)); }
131 try { task_promise->set_exception(std::current_exception()); }
135 return task_promise->get_future();
140 std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
141 task_done_cv.wait(tasks_lock, [
this] {
return (tasks_total == (paused ? tasks.size() : 0)); });
145 std::atomic<bool> paused =
false;
Definition: thread_pool.hpp:26
size_t get_tasks_running() const
Definition: thread_pool.hpp:89
std::atomic< bool > paused
Definition: thread_pool.hpp:145
void wait_for_tasks()
Definition: thread_pool.hpp:138
size_t get_tasks_queued() const
Definition: thread_pool.hpp:84
size_t get_tasks_total() const
Definition: thread_pool.hpp:94
thread_pool(const concurrency_t thread_count_=std::thread::hardware_concurrency())
Definition: thread_pool.hpp:75
void push_task(const F &task, const A &... args)
Definition: thread_pool.hpp:98
void reset(const concurrency_t thread_count_=std::thread::hardware_concurrency())
Definition: thread_pool.hpp:108
~thread_pool()
Definition: thread_pool.hpp:79
concurrency_t get_thread_count() const
Definition: thread_pool.hpp:96
std::future< R > submit(const F &task, const A &... args)
Definition: thread_pool.hpp:119