src/ex/detail/strand_queue.hpp

76.6% Lines (49/64) 92.3% List of functions (12/13)
strand_queue.hpp
f(x) Functions (13)
Line TLA Hits Source Code
1 //
2 // Copyright (c) 2025 Vinnie Falco (vinnie.falco@gmail.com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
11 #define BOOST_CAPY_SRC_EX_DETAIL_STRAND_QUEUE_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/ex/frame_allocator.hpp>
15
16 #include <coroutine>
17 #include <cstddef>
18 #include <exception>
19
20 namespace boost {
21 namespace capy {
22 namespace detail {
23
24 class strand_queue;
25
26 //----------------------------------------------------------
27
28 // Metadata stored before the coroutine frame
29 struct frame_prefix
30 {
31 frame_prefix* next;
32 strand_queue* queue;
33 std::size_t alloc_size;
34 };
35
36 //----------------------------------------------------------
37
38 /** Wrapper coroutine for strand queue dispatch operations.
39
40 This coroutine wraps a target coroutine handle and resumes
41 it when dispatched. The wrapper ensures control returns to
42 the dispatch loop after the target suspends or completes.
43
44 The promise contains an intrusive list node for queue
45 storage and supports a custom allocator that recycles
46 coroutine frames via a free list.
47 */
48 struct strand_op
49 {
50 struct promise_type
51 {
52 promise_type* next = nullptr;
53
54 void*
55 operator new(
56 std::size_t size,
57 strand_queue& q,
58 std::coroutine_handle<void>);
59
60 void
61 operator delete(void* p, std::size_t);
62
63 strand_op
64 30340x get_return_object() noexcept
65 {
66 30340x return {std::coroutine_handle<promise_type>::from_promise(*this)};
67 }
68
69 std::suspend_always
70 30340x initial_suspend() noexcept
71 {
72 30340x return {};
73 }
74
75 std::suspend_always
76 30340x final_suspend() noexcept
77 {
78 30340x return {};
79 }
80
81 void
82 30340x return_void() noexcept
83 {
84 30340x }
85
86 void
87 unhandled_exception()
88 {
89 std::terminate();
90 }
91 };
92
93 std::coroutine_handle<promise_type> h_;
94 };
95
96 //----------------------------------------------------------
97
98 /** Single-threaded dispatch queue for coroutine handles.
99
100 This queue stores coroutine handles and resumes them
101 sequentially when dispatch() is called. Each pushed
102 handle is wrapped in a strand_op coroutine that ensures
103 control returns to the dispatch loop after the target
104 suspends or completes.
105
106 The queue uses an intrusive singly-linked list through
107 the promise type to avoid separate node allocations.
108 A free list recycles wrapper coroutine frames to reduce
109 allocation overhead during repeated push/dispatch cycles.
110
111 @par Thread Safety
112 This class is not thread-safe. All operations must be
113 called from a single thread.
114 */
115 class strand_queue
116 {
117 using promise_type = strand_op::promise_type;
118
119 promise_type* head_ = nullptr;
120 promise_type* tail_ = nullptr;
121 frame_prefix* free_list_ = nullptr;
122
123 friend struct strand_op::promise_type;
124
125 static
126 strand_op
127 30340x make_strand_op(
128 strand_queue& q,
129 std::coroutine_handle<void> target)
130 {
131 (void)q;
132 safe_resume(target);
133 co_return;
134 60680x }
135
136 public:
137 11442x strand_queue() = default;
138
139 strand_queue(strand_queue const&) = delete;
140 strand_queue& operator=(strand_queue const&) = delete;
141
142 /** Destructor.
143
144 Destroys any pending wrappers without resuming them,
145 then frees all memory in the free list.
146 */
147 11442x ~strand_queue()
148 {
149 // Destroy pending wrappers
150 11442x while(head_)
151 {
152 promise_type* p = head_;
153 head_ = p->next;
154
155 auto h = std::coroutine_handle<promise_type>::from_promise(*p);
156 h.destroy();
157 }
158
159 // Free the free list memory
160 11442x while(free_list_)
161 {
162 frame_prefix* prefix = free_list_;
163 free_list_ = prefix->next;
164 ::operator delete(prefix);
165 }
166 11442x }
167
168 /** Returns true if there are no pending operations.
169 */
170 bool
171 19815x empty() const noexcept
172 {
173 19815x return head_ == nullptr;
174 }
175
176 /** Push a coroutine handle to the queue.
177
178 Creates a wrapper coroutine and appends it to the
179 queue. The wrapper will resume the target handle
180 when dispatch() processes it.
181
182 @param h The coroutine handle to dispatch.
183 */
184 void
185 30340x push(std::coroutine_handle<void> h)
186 {
187 30340x strand_op op = make_strand_op(*this, h);
188
189 30340x promise_type* p = &op.h_.promise();
190 30340x p->next = nullptr;
191
192 30340x if(tail_)
193 10525x tail_->next = p;
194 else
195 19815x head_ = p;
196 30340x tail_ = p;
197 30340x }
198
199 /** Resume all queued coroutines in sequence.
200
201 Processes each wrapper in FIFO order, resuming its
202 target coroutine. After each target suspends or
203 completes, the wrapper is destroyed and its frame
204 is added to the free list for reuse.
205
206 Coroutines resumed during dispatch may push new
207 handles, which will also be processed in the same
208 dispatch call.
209
210 @warning Not thread-safe. Do not call while another
211 thread may be calling push().
212 */
213 void
214 dispatch()
215 {
216 while(head_)
217 {
218 promise_type* p = head_;
219 head_ = p->next;
220 if(!head_)
221 tail_ = nullptr;
222
223 auto h = std::coroutine_handle<promise_type>::from_promise(*p);
224 safe_resume(h);
225 h.destroy();
226 }
227 }
228
229 /** Batch of taken items for thread-safe dispatch. */
230 struct taken_batch
231 {
232 promise_type* head = nullptr;
233 promise_type* tail = nullptr;
234 };
235
236 /** Take all pending items atomically.
237
238 Removes all items from the queue and returns them
239 as a batch. The queue is left empty.
240
241 @return The batch of taken items.
242 */
243 taken_batch
244 19815x take_all() noexcept
245 {
246 19815x taken_batch batch{head_, tail_};
247 19815x head_ = tail_ = nullptr;
248 19815x return batch;
249 }
250
251 /** Dispatch a batch of taken items.
252
253 @param batch The batch to dispatch.
254
255 @note This is thread-safe w.r.t. push() because it doesn't
256 access the queue's free_list_. Frames are deleted directly
257 rather than recycled.
258 */
259 static
260 void
261 19815x dispatch_batch(taken_batch& batch)
262 {
263 50155x while(batch.head)
264 {
265 30340x promise_type* p = batch.head;
266 30340x batch.head = p->next;
267
268 30340x auto h = std::coroutine_handle<promise_type>::from_promise(*p);
269 30340x safe_resume(h);
270 // Don't use h.destroy() - it would call operator delete which
271 // accesses the queue's free_list_ (race with push).
272 // Instead, manually free the frame without recycling.
273 // h.address() returns the frame base (what operator new returned).
274 30340x frame_prefix* prefix = static_cast<frame_prefix*>(h.address()) - 1;
275 30340x ::operator delete(prefix);
276 }
277 19815x batch.tail = nullptr;
278 19815x }
279 };
280
281 //----------------------------------------------------------
282
283 inline
284 void*
285 30340x strand_op::promise_type::operator new(
286 std::size_t size,
287 strand_queue& q,
288 std::coroutine_handle<void>)
289 {
290 // Total size includes prefix
291 30340x std::size_t alloc_size = size + sizeof(frame_prefix);
292 void* raw;
293
294 // Try to reuse from free list
295 30340x if(q.free_list_)
296 {
297 frame_prefix* prefix = q.free_list_;
298 q.free_list_ = prefix->next;
299 raw = prefix;
300 }
301 else
302 {
303 30340x raw = ::operator new(alloc_size);
304 }
305
306 // Initialize prefix
307 30340x frame_prefix* prefix = static_cast<frame_prefix*>(raw);
308 30340x prefix->next = nullptr;
309 30340x prefix->queue = &q;
310 30340x prefix->alloc_size = alloc_size;
311
312 // Return pointer AFTER the prefix (this is where coroutine frame goes)
313 30340x return prefix + 1;
314 }
315
316 inline
317 void
318 strand_op::promise_type::operator delete(void* p, std::size_t)
319 {
320 // Calculate back to get the prefix
321 frame_prefix* prefix = static_cast<frame_prefix*>(p) - 1;
322
323 // Add to free list
324 prefix->next = prefix->queue->free_list_;
325 prefix->queue->free_list_ = prefix;
326 }
327
328 } // namespace detail
329 } // namespace capy
330 } // namespace boost
331
332 #endif
333