src/ex/thread_pool.cpp

100.0% Lines (139/139) 100.0% List of functions (29/29)
thread_pool.cpp
f(x) Functions (29)
Function Calls Lines Blocks
boost::capy::thread_pool::impl::push(boost::capy::continuation*) :66 19206x 100.0% 100.0% boost::capy::thread_pool::impl::pop() :76 19373x 100.0% 100.0% boost::capy::thread_pool::impl::empty() const :87 36106x 100.0% 100.0% boost::capy::thread_pool::impl::~impl() :104 167x 100.0% 100.0% boost::capy::thread_pool::impl::running_in_this_thread() const :107 355x 100.0% 100.0% boost::capy::thread_pool::impl::drain_abandoned() :118 167x 100.0% 100.0% boost::capy::thread_pool::impl::impl(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :128 167x 100.0% 72.0% boost::capy::thread_pool::impl::post(boost::capy::continuation&) :141 19206x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_started() :152 347x 100.0% 100.0% boost::capy::thread_pool::impl::on_work_finished() :158 347x 100.0% 100.0% boost::capy::thread_pool::impl::join() :172 180x 100.0% 85.0% boost::capy::thread_pool::impl::join()::{lambda()#1}::operator()() const :188 62x 100.0% 100.0% boost::capy::thread_pool::impl::stop() :200 169x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started() :212 19206x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const :214 110x 100.0% 100.0% boost::capy::thread_pool::impl::ensure_started()::{lambda()#1}::operator()() const::{lambda()#1}::operator()() const :217 197x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long) :222 197x 100.0% 78.0% boost::capy::thread_pool::impl::run(unsigned long)::scoped_pool::scoped_pool(boost::capy::thread_pool::impl const*) :233 197x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long)::scoped_pool::~scoped_pool() :234 197x 100.0% 100.0% boost::capy::thread_pool::impl::run(unsigned long)::{lambda()#1}::operator()() const :242 36106x 100.0% 100.0% boost::capy::thread_pool::~thread_pool() :258 167x 100.0% 100.0% boost::capy::thread_pool::thread_pool(unsigned long, std::basic_string_view<char, std::char_traits<char> >) :269 167x 100.0% 55.0% boost::capy::thread_pool::join() :277 13x 100.0% 100.0% boost::capy::thread_pool::stop() :284 2x 100.0% 100.0% boost::capy::thread_pool::get_executor() const :293 11582x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_started() const :301 347x 100.0% 100.0% boost::capy::thread_pool::executor_type::on_work_finished() const :308 347x 100.0% 100.0% boost::capy::thread_pool::executor_type::post(boost::capy::continuation&) const :315 18864x 100.0% 100.0% boost::capy::thread_pool::executor_type::dispatch(boost::capy::continuation&) const :322 355x 100.0% 100.0%
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 // Copyright (c) 2026 Michael Vandeberg
4 //
5 // Distributed under the Boost Software License, Version 1.0. (See accompanying
6 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
7 //
8 // Official repository: https://github.com/boostorg/capy
9 //
10
11 #include <boost/capy/ex/thread_pool.hpp>
12 #include <boost/capy/continuation.hpp>
13 #include <boost/capy/detail/thread_local_ptr.hpp>
14 #include <boost/capy/ex/frame_allocator.hpp>
15 #include <boost/capy/test/thread_name.hpp>
16 #include <algorithm>
17 #include <atomic>
18 #include <condition_variable>
19 #include <cstdio>
20 #include <mutex>
21 #include <thread>
22 #include <vector>
23
24 /*
25 Thread pool implementation using a shared work queue.
26
27 Work items are continuations linked via their intrusive next pointer,
28 stored in a single queue protected by a mutex. No per-post heap
29 allocation: the continuation is owned by the caller and linked
30 directly. Worker threads wait on a condition_variable until work
31 is available or stop is requested.
32
33 Threads are started lazily on first post() via std::call_once to avoid
34 spawning threads for pools that are constructed but never used. Each
35 thread is named with a configurable prefix plus index for debugger
36 visibility.
37
38 Work tracking: on_work_started/on_work_finished maintain an atomic
39 outstanding_work_ counter. join() blocks until this counter reaches
40 zero, then signals workers to stop and joins threads.
41
42 Two shutdown paths:
43 - join(): waits for outstanding work to drain, then stops workers.
44 - stop(): immediately signals workers to exit; queued work is abandoned.
45 - Destructor: stop() then join() (abandon + wait for threads).
46 */
47
48 namespace boost {
49 namespace capy {
50
51 //------------------------------------------------------------------------------
52
53 class thread_pool::impl
54 {
55 // Identifies the pool owning the current worker thread, or
56 // nullptr if the calling thread is not a pool worker. Checked
57 // by dispatch() to decide between symmetric transfer (inline
58 // resume) and post.
59 static inline detail::thread_local_ptr<impl const> current_;
60
61 // Intrusive queue of continuations via continuation::next.
62 // No per-post allocation: the continuation is owned by the caller.
63 continuation* head_ = nullptr;
64 continuation* tail_ = nullptr;
65
66 19206x void push(continuation* c) noexcept
67 {
68 19206x c->next = nullptr;
69 19206x if(tail_)
70 3178x tail_->next = c;
71 else
72 16028x head_ = c;
73 19206x tail_ = c;
74 19206x }
75
76 19373x continuation* pop() noexcept
77 {
78 19373x if(!head_)
79 167x return nullptr;
80 19206x continuation* c = head_;
81 19206x head_ = head_->next;
82 19206x if(!head_)
83 16028x tail_ = nullptr;
84 19206x return c;
85 }
86
87 36106x bool empty() const noexcept
88 {
89 36106x return head_ == nullptr;
90 }
91
92 std::mutex mutex_;
93 std::condition_variable work_cv_;
94 std::condition_variable done_cv_;
95 std::vector<std::thread> threads_;
96 std::atomic<std::size_t> outstanding_work_{0};
97 bool stop_{false};
98 bool joined_{false};
99 std::size_t num_threads_;
100 char thread_name_prefix_[13]{}; // 12 chars max + null terminator
101 std::once_flag start_flag_;
102
103 public:
104 167x ~impl() = default;
105
106 bool
107 355x running_in_this_thread() const noexcept
108 {
109 355x return current_.get() == this;
110 }
111
112 // Destroy abandoned coroutine frames. Must be called
113 // before execution_context::shutdown()/destroy() so
114 // that suspended-frame destructors (e.g. delay_awaitable
115 // calling timer_service::cancel()) run while services
116 // are still valid.
117 void
118 167x drain_abandoned() noexcept
119 {
120 364x while(auto* c = pop())
121 {
122 197x auto h = c->h;
123 197x if(h && h != std::noop_coroutine())
124 146x h.destroy();
125 197x }
126 167x }
127
128 167x impl(std::size_t num_threads, std::string_view thread_name_prefix)
129 167x : num_threads_(num_threads)
130 {
131 167x if(num_threads_ == 0)
132 6x num_threads_ = std::max(
133 3x std::thread::hardware_concurrency(), 1u);
134
135 // Truncate prefix to 12 chars, leaving room for up to 3-digit index.
136 167x auto n = thread_name_prefix.copy(thread_name_prefix_, 12);
137 167x thread_name_prefix_[n] = '\0';
138 167x }
139
140 void
141 19206x post(continuation& c)
142 {
143 19206x ensure_started();
144 {
145 19206x std::lock_guard<std::mutex> lock(mutex_);
146 19206x push(&c);
147 19206x }
148 19206x work_cv_.notify_one();
149 19206x }
150
151 void
152 347x on_work_started() noexcept
153 {
154 347x outstanding_work_.fetch_add(1, std::memory_order_acq_rel);
155 347x }
156
157 void
158 347x on_work_finished() noexcept
159 {
160 347x if(outstanding_work_.fetch_sub(
161 347x 1, std::memory_order_acq_rel) == 1)
162 {
163 87x std::lock_guard<std::mutex> lock(mutex_);
164 87x if(joined_ && !stop_)
165 4x stop_ = true;
166 87x done_cv_.notify_all();
167 87x work_cv_.notify_all();
168 87x }
169 347x }
170
171 void
172 180x join() noexcept
173 {
174 {
175 180x std::unique_lock<std::mutex> lock(mutex_);
176 180x if(joined_)
177 13x return;
178 167x joined_ = true;
179
180 167x if(outstanding_work_.load(
181 167x std::memory_order_acquire) == 0)
182 {
183 110x stop_ = true;
184 110x work_cv_.notify_all();
185 }
186 else
187 {
188 57x done_cv_.wait(lock, [this]{
189 62x return stop_;
190 });
191 }
192 180x }
193
194 364x for(auto& t : threads_)
195 197x if(t.joinable())
196 197x t.join();
197 }
198
199 void
200 169x stop() noexcept
201 {
202 {
203 169x std::lock_guard<std::mutex> lock(mutex_);
204 169x stop_ = true;
205 169x }
206 169x work_cv_.notify_all();
207 169x done_cv_.notify_all();
208 169x }
209
210 private:
211 void
212 19206x ensure_started()
213 {
214 19206x std::call_once(start_flag_, [this]{
215 110x threads_.reserve(num_threads_);
216 307x for(std::size_t i = 0; i < num_threads_; ++i)
217 394x threads_.emplace_back([this, i]{ run(i); });
218 110x });
219 19206x }
220
221 void
222 197x run(std::size_t index)
223 {
224 // Build name; set_current_thread_name truncates to platform limits.
225 char name[16];
226 197x std::snprintf(name, sizeof(name), "%s%zu", thread_name_prefix_, index);
227 197x set_current_thread_name(name);
228
229 // Mark this thread as a worker of this pool so dispatch()
230 // can symmetric-transfer when called from within pool work.
231 struct scoped_pool
232 {
233 197x scoped_pool(impl const* p) noexcept { current_.set(p); }
234 197x ~scoped_pool() noexcept { current_.set(nullptr); }
235 197x } guard(this);
236
237 for(;;)
238 {
239 19206x continuation* c = nullptr;
240 {
241 19206x std::unique_lock<std::mutex> lock(mutex_);
242 19206x work_cv_.wait(lock, [this]{
243 53108x return !empty() ||
244 53108x stop_;
245 });
246 19206x if(stop_)
247 394x return;
248 19009x c = pop();
249 19206x }
250 19009x if(c)
251 19009x safe_resume(c->h);
252 19009x }
253 197x }
254 };
255
256 //------------------------------------------------------------------------------
257
258 167x thread_pool::
259 ~thread_pool()
260 {
261 167x impl_->stop();
262 167x impl_->join();
263 167x impl_->drain_abandoned();
264 167x shutdown();
265 167x destroy();
266 167x delete impl_;
267 167x }
268
269 167x thread_pool::
270 167x thread_pool(std::size_t num_threads, std::string_view thread_name_prefix)
271 167x : impl_(new impl(num_threads, thread_name_prefix))
272 {
273 167x this->set_frame_allocator(std::allocator<void>{});
274 167x }
275
276 void
277 13x thread_pool::
278 join() noexcept
279 {
280 13x impl_->join();
281 13x }
282
283 void
284 2x thread_pool::
285 stop() noexcept
286 {
287 2x impl_->stop();
288 2x }
289
290 //------------------------------------------------------------------------------
291
292 thread_pool::executor_type
293 11582x thread_pool::
294 get_executor() const noexcept
295 {
296 11582x return executor_type(
297 11582x const_cast<thread_pool&>(*this));
298 }
299
300 void
301 347x thread_pool::executor_type::
302 on_work_started() const noexcept
303 {
304 347x pool_->impl_->on_work_started();
305 347x }
306
307 void
308 347x thread_pool::executor_type::
309 on_work_finished() const noexcept
310 {
311 347x pool_->impl_->on_work_finished();
312 347x }
313
314 void
315 18864x thread_pool::executor_type::
316 post(continuation& c) const
317 {
318 18864x pool_->impl_->post(c);
319 18864x }
320
321 std::coroutine_handle<>
322 355x thread_pool::executor_type::
323 dispatch(continuation& c) const
324 {
325 355x if(pool_->impl_->running_in_this_thread())
326 13x return c.h;
327 342x pool_->impl_->post(c);
328 342x return std::noop_coroutine();
329 }
330
331 } // capy
332 } // boost
333