suanPan
Loading...
Searching...
No Matches
thread_pool.hpp
Go to the documentation of this file.
1#pragma once
2
3#include <atomic>
4#include <chrono>
5#include <condition_variable>
6#include <exception>
7#include <functional>
8#include <future>
9#include <iostream>
10#include <memory>
11#include <mutex>
12#include <queue>
13#include <thread>
14#include <type_traits>
15#include <utility>
16#include <vector>
17
27 using concurrency_t = std::invoke_result_t<decltype(std::thread::hardware_concurrency)>;
28
29 std::atomic<bool> running = false;
30
31 std::condition_variable task_available_cv = {};
32
33 std::condition_variable task_done_cv = {};
34
35 std::queue<std::function<void()>> tasks = {};
36
37 std::atomic<size_t> tasks_total = 0;
38
39 mutable std::mutex tasks_mutex = {};
40
41 concurrency_t thread_count = 0;
42
43 std::unique_ptr<std::thread[]> threads = nullptr;
44
45 std::atomic<bool> waiting = false;
46
47 void create_threads() {
48 running = true;
49 for(concurrency_t i = 0; i < thread_count; ++i) threads[i] = std::thread(&thread_pool::worker, this);
50 }
51
52 void destroy_threads() {
53 running = false;
54 task_available_cv.notify_all();
55 for(concurrency_t i = 0; i < thread_count; ++i) threads[i].join();
56 }
57
58 void worker() {
59 while(running) {
60 std::function<void()> task;
61 std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
62 task_available_cv.wait(tasks_lock, [&] { return !tasks.empty() || !running; });
63 if(running && !paused) {
64 task = std::move(tasks.front());
65 tasks.pop();
66 tasks_lock.unlock();
67 task();
68 --tasks_total;
69 if(waiting) task_done_cv.notify_one();
70 }
71 }
72 }
73
74public:
75 explicit thread_pool(const concurrency_t thread_count_ = std::thread::hardware_concurrency())
76 : thread_count(thread_count_ > 1 ? thread_count_ : 1)
77 , threads(std::make_unique<std::thread[]>(thread_count)) { create_threads(); }
78
81 destroy_threads();
82 }
83
84 size_t get_tasks_queued() const {
85 const std::scoped_lock tasks_lock(tasks_mutex);
86 return tasks.size();
87 }
88
89 size_t get_tasks_running() const {
90 const std::scoped_lock tasks_lock(tasks_mutex);
91 return tasks_total - tasks.size();
92 }
93
94 size_t get_tasks_total() const { return tasks_total; }
95
96 concurrency_t get_thread_count() const { return thread_count; }
97
98 template<typename F, typename... A> void push_task(const F& task, const A&... args) {
99 {
100 const std::scoped_lock tasks_lock(tasks_mutex);
101 if constexpr(sizeof...(args) == 0) tasks.push(std::function<void()>(task));
102 else tasks.push(std::function<void()>([task, args...] { task(args...); }));
103 }
104 ++tasks_total;
105 task_available_cv.notify_one();
106 }
107
108 void reset(const concurrency_t thread_count_ = std::thread::hardware_concurrency()) {
109 const bool was_paused = paused;
110 paused = true;
112 destroy_threads();
113 thread_count = thread_count_ > 1 ? thread_count_ : 1;
114 threads = std::make_unique<std::thread[]>(thread_count);
115 paused = was_paused;
116 create_threads();
117 }
118
119 template<typename F, typename... A, typename R = std::invoke_result_t<std::decay_t<F>, std::decay_t<A>...>> std::future<R> submit(const F& task, const A&... args) {
120 std::shared_ptr<std::promise<R>> task_promise = std::make_shared<std::promise<R>>();
121 push_task(
122 [task, args..., task_promise] {
123 try {
124 if constexpr(std::is_void_v<R>) {
125 task(args...);
126 task_promise->set_value();
127 }
128 else { task_promise->set_value(task(args...)); }
129 }
130 catch(...) {
131 try { task_promise->set_exception(std::current_exception()); }
132 catch(...) { }
133 }
134 });
135 return task_promise->get_future();
136 }
137
139 waiting = true;
140 std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
141 task_done_cv.wait(tasks_lock, [this] { return (tasks_total == (paused ? tasks.size() : 0)); });
142 waiting = false;
143 }
144
145 std::atomic<bool> paused = false;
146};
Definition: thread_pool.hpp:26
size_t get_tasks_running() const
Definition: thread_pool.hpp:89
std::atomic< bool > paused
Definition: thread_pool.hpp:145
void wait_for_tasks()
Definition: thread_pool.hpp:138
size_t get_tasks_queued() const
Definition: thread_pool.hpp:84
size_t get_tasks_total() const
Definition: thread_pool.hpp:94
thread_pool(const concurrency_t thread_count_=std::thread::hardware_concurrency())
Definition: thread_pool.hpp:75
void push_task(const F &task, const A &... args)
Definition: thread_pool.hpp:98
void reset(const concurrency_t thread_count_=std::thread::hardware_concurrency())
Definition: thread_pool.hpp:108
~thread_pool()
Definition: thread_pool.hpp:79
concurrency_t get_thread_count() const
Definition: thread_pool.hpp:96
std::future< R > submit(const F &task, const A &... args)
Definition: thread_pool.hpp:119