GCC Code Coverage Report


Directory: ./
File: libs/capy/include/boost/capy/when_all.hpp
Date: 2026-01-19 05:31:50
Exec Total Coverage
Lines: 98 100 98.0%
Functions: 298 326 91.4%
Branches: 77 80 96.2%

Line Branch Exec Source
1 //
2 // Copyright (c) 2026 Steve Gerbino
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/cppalliance/capy
8 //
9
10 #ifndef BOOST_CAPY_WHEN_ALL_HPP
11 #define BOOST_CAPY_WHEN_ALL_HPP
12
13 #include <boost/capy/detail/config.hpp>
14 #include <boost/capy/concept/executor.hpp>
15 #include <boost/capy/concept/io_awaitable.hpp>
16 #include <boost/capy/ex/any_coro.hpp>
17 #include <boost/capy/ex/any_executor_ref.hpp>
18 #include <boost/capy/ex/frame_allocator.hpp>
19 #include <boost/capy/task.hpp>
20
21 #include <array>
22 #include <atomic>
23 #include <exception>
24 #include <optional>
25 #include <stop_token>
26 #include <tuple>
27 #include <type_traits>
28 #include <utility>
29
30 namespace boost {
31 namespace capy {
32
33 namespace detail {
34
35 /** Type trait to filter void types from a tuple.
36
37 Void-returning tasks do not contribute a value to the result tuple.
38 This trait computes the filtered result type.
39
40 Example: filter_void_tuple_t<int, void, string> = tuple<int, string>
41 */
42 template<typename T>
43 using wrap_non_void_t = std::conditional_t<std::is_void_v<T>, std::tuple<>, std::tuple<T>>;
44
45 template<typename... Ts>
46 using filter_void_tuple_t = decltype(std::tuple_cat(std::declval<wrap_non_void_t<Ts>>()...));
47
48 /** Holds the result of a single task within when_all.
49 */
50 template<typename T>
51 struct result_holder
52 {
53 std::optional<T> value_;
54
55 90 void set(T v)
56 {
57 90 value_ = std::move(v);
58 90 }
59
60 76 T get() &&
61 {
62 76 return std::move(*value_);
63 }
64 };
65
66 /** Specialization for void tasks - no value storage needed.
67 */
68 template<>
69 struct result_holder<void>
70 {
71 };
72
73 /** Shared state for when_all operation.
74
75 @tparam Ts The result types of the tasks.
76 */
77 template<typename... Ts>
78 struct when_all_state
79 {
80 static constexpr std::size_t task_count = sizeof...(Ts);
81
82 // Completion tracking - when_all waits for all children
83 std::atomic<std::size_t> remaining_count_;
84
85 // Result storage in input order
86 std::tuple<result_holder<Ts>...> results_;
87
88 // Runner handles - destroyed in await_resume while allocator is valid
89 std::array<any_coro, task_count> runner_handles_{};
90
91 // Exception storage - first error wins, others discarded
92 std::atomic<bool> has_exception_{false};
93 std::exception_ptr first_exception_;
94
95 // Stop propagation - on error, request stop for siblings
96 std::stop_source stop_source_;
97
98 // Connects parent's stop_token to our stop_source
99 struct stop_callback_fn
100 {
101 std::stop_source* source_;
102 2 void operator()() const { source_->request_stop(); }
103 };
104 using stop_callback_t = std::stop_callback<stop_callback_fn>;
105 std::optional<stop_callback_t> parent_stop_callback_;
106
107 // Parent resumption
108 any_coro continuation_;
109 any_executor_ref caller_ex_;
110
111 48 when_all_state()
112
1/1
✓ Branch 5 taken 24 times.
48 : remaining_count_(task_count)
113 {
114 48 }
115
116 48 ~when_all_state()
117 {
118
2/2
✓ Branch 0 taken 61 times.
✓ Branch 1 taken 24 times.
170 for(auto h : runner_handles_)
119
1/2
✓ Branch 1 taken 61 times.
✗ Branch 2 not taken.
122 if(h)
120 122 h.destroy();
121 48 }
122
123 /** Capture an exception (first one wins).
124 */
125 22 void capture_exception(std::exception_ptr ep)
126 {
127 22 bool expected = false;
128
2/2
✓ Branch 1 taken 8 times.
✓ Branch 2 taken 3 times.
22 if(has_exception_.compare_exchange_strong(
129 expected, true, std::memory_order_relaxed))
130 16 first_exception_ = ep;
131 22 }
132
133 /** Signal that a task has completed.
134
135 The last child to complete triggers resumption of the parent.
136 */
137 122 any_coro signal_completion()
138 {
139 122 auto remaining = remaining_count_.fetch_sub(1, std::memory_order_acq_rel);
140
2/2
✓ Branch 0 taken 24 times.
✓ Branch 1 taken 37 times.
122 if(remaining == 1)
141 48 return caller_ex_.dispatch(continuation_);
142 74 return std::noop_coroutine();
143 }
144
145 };
146
147 /** Wrapper coroutine that intercepts task completion.
148
149 This runner awaits its assigned task and stores the result in
150 the shared state, or captures the exception and requests stop.
151 */
152 template<typename T, typename... Ts>
153 struct when_all_runner
154 {
155 struct promise_type : frame_allocating_base
156 {
157 when_all_state<Ts...>* state_ = nullptr;
158 any_executor_ref ex_;
159 std::stop_token stop_token_;
160
161 122 when_all_runner get_return_object()
162 {
163 122 return when_all_runner(std::coroutine_handle<promise_type>::from_promise(*this));
164 }
165
166 122 std::suspend_always initial_suspend() noexcept
167 {
168 122 return {};
169 }
170
171 122 auto final_suspend() noexcept
172 {
173 struct awaiter
174 {
175 promise_type* p_;
176
177 61 bool await_ready() const noexcept
178 {
179 61 return false;
180 }
181
182 61 any_coro await_suspend(any_coro) noexcept
183 {
184 // Signal completion; last task resumes parent
185 61 return p_->state_->signal_completion();
186 }
187
188 void await_resume() const noexcept
189 {
190 }
191 };
192 122 return awaiter{this};
193 }
194
195 100 void return_void()
196 {
197 100 }
198
199 22 void unhandled_exception()
200 {
201 22 state_->capture_exception(std::current_exception());
202 // Request stop for sibling tasks
203 22 state_->stop_source_.request_stop();
204 22 }
205
206 template<class Awaitable>
207 struct transform_awaiter
208 {
209 std::decay_t<Awaitable> a_;
210 promise_type* p_;
211
212 122 bool await_ready()
213 {
214 122 return a_.await_ready();
215 }
216
217 122 auto await_resume()
218 {
219 122 return a_.await_resume();
220 }
221
222 template<class Promise>
223 122 auto await_suspend(std::coroutine_handle<Promise> h)
224 {
225
1/1
✓ Branch 3 taken 55 times.
122 return a_.await_suspend(h, p_->ex_, p_->stop_token_);
226 }
227 };
228
229 template<class Awaitable>
230 122 auto await_transform(Awaitable&& a)
231 {
232 using A = std::decay_t<Awaitable>;
233 if constexpr (IoAwaitable<A, any_executor_ref>)
234 {
235 return transform_awaiter<Awaitable>{
236 244 std::forward<Awaitable>(a), this};
237 }
238 else
239 {
240 return make_affine(std::forward<Awaitable>(a), ex_);
241 }
242 122 }
243 };
244
245 std::coroutine_handle<promise_type> h_;
246
247 122 explicit when_all_runner(std::coroutine_handle<promise_type> h)
248 122 : h_(h)
249 {
250 122 }
251
252 #if defined(__clang__) && __clang_major__ == 14 && !defined(__apple_build_version__)
253 // Clang 14 has a bug where it calls the move constructor for coroutine
254 // return objects even though they should be constructed in-place via RVO.
255 // This happens when returning a non-movable type from a coroutine.
256 when_all_runner(when_all_runner&& other) noexcept : h_(std::exchange(other.h_, nullptr)) {}
257 #endif
258
259 // Non-copyable, non-movable - release() is always called immediately
260 when_all_runner(when_all_runner const&) = delete;
261 when_all_runner& operator=(when_all_runner const&) = delete;
262
263 #if !defined(__clang__) || __clang_major__ != 14 || defined(__apple_build_version__)
264 when_all_runner(when_all_runner&&) = delete;
265 #endif
266
267 when_all_runner& operator=(when_all_runner&&) = delete;
268
269 122 auto release() noexcept
270 {
271 122 return std::exchange(h_, nullptr);
272 }
273 };
274
275 /** Create a runner coroutine for a single task.
276
277 Task is passed directly to ensure proper coroutine frame storage.
278 */
279 template<std::size_t Index, typename T, typename... Ts>
280 when_all_runner<T, Ts...>
281
1/1
✓ Branch 1 taken 61 times.
122 make_when_all_runner(task<T> inner, when_all_state<Ts...>* state)
282 {
283 if constexpr (std::is_void_v<T>)
284 co_await std::move(inner);
285 else
286 std::get<Index>(state->results_).set(co_await std::move(inner));
287 244 }
288
289 /** Internal awaitable that launches all runner coroutines and waits.
290
291 This awaitable is used inside the when_all coroutine to handle
292 the concurrent execution of child tasks.
293 */
294 template<typename... Ts>
295 class when_all_launcher
296 {
297 std::tuple<task<Ts>...>* tasks_;
298 when_all_state<Ts...>* state_;
299
300 public:
301 48 when_all_launcher(
302 std::tuple<task<Ts>...>* tasks,
303 when_all_state<Ts...>* state)
304 48 : tasks_(tasks)
305 48 , state_(state)
306 {
307 48 }
308
309 48 bool await_ready() const noexcept
310 {
311 48 return sizeof...(Ts) == 0;
312 }
313
314 template<typename Ex>
315 48 any_coro await_suspend(any_coro continuation, Ex const& caller_ex, std::stop_token parent_token = {})
316 {
317 48 state_->continuation_ = continuation;
318 48 state_->caller_ex_ = caller_ex;
319
320 // Forward parent's stop requests to children
321
2/2
✓ Branch 1 taken 4 times.
✓ Branch 2 taken 20 times.
48 if(parent_token.stop_possible())
322 {
323 16 state_->parent_stop_callback_.emplace(
324 parent_token,
325 8 typename when_all_state<Ts...>::stop_callback_fn{&state_->stop_source_});
326
327
2/2
✓ Branch 1 taken 1 times.
✓ Branch 2 taken 3 times.
8 if(parent_token.stop_requested())
328 2 state_->stop_source_.request_stop();
329 }
330
331 // Launch all tasks concurrently
332 48 auto token = state_->stop_source_.get_token();
333 48 [&]<std::size_t... Is>(std::index_sequence<Is...>) {
334
28/28
✓ Branch 2 taken 1 times.
✓ Branch 6 taken 1 times.
✓ Branch 10 taken 1 times.
✓ Branch 20 taken 1 times.
✓ Branch 24 taken 1 times.
✓ Branch 28 taken 1 times.
✓ Branch 38 taken 1 times.
✓ Branch 42 taken 1 times.
✓ Branch 46 taken 1 times.
✓ Branch 50 taken 1 times.
✓ Branch 54 taken 1 times.
✓ Branch 58 taken 1 times.
✓ Branch 62 taken 1 times.
✓ Branch 66 taken 1 times.
✓ Branch 86 taken 1 times.
✓ Branch 90 taken 1 times.
✓ Branch 94 taken 1 times.
✓ Branch 104 taken 1 times.
✓ Branch 108 taken 1 times.
✓ Branch 116 taken 1 times.
✓ Branch 122 taken 1 times.
✓ Branch 126 taken 1 times.
✓ Branch 130 taken 1 times.
✓ Branch 140 taken 4 times.
✓ Branch 144 taken 4 times.
✓ Branch 148 taken 4 times.
✓ Branch 158 taken 13 times.
✓ Branch 162 taken 13 times.
24 (..., launch_one<Is>(caller_ex, token));
335
1/1
✓ Branch 1 taken 24 times.
48 }(std::index_sequence_for<Ts...>{});
336
337 // Let signal_completion() handle resumption
338 96 return std::noop_coroutine();
339 48 }
340
341 48 void await_resume() const noexcept
342 {
343 // Results are extracted by the when_all coroutine from state
344 48 }
345
346 private:
347 template<std::size_t I, typename Ex>
348 122 void launch_one(Ex const& caller_ex, std::stop_token token)
349 {
350
1/1
✓ Branch 2 taken 61 times.
122 auto runner = make_when_all_runner<I>(
351 122 std::move(std::get<I>(*tasks_)), state_);
352
353 122 auto h = runner.release();
354 122 h.promise().state_ = state_;
355 122 h.promise().ex_ = caller_ex;
356 122 h.promise().stop_token_ = token;
357
358 122 any_coro ch{h};
359 122 state_->runner_handles_[I] = ch;
360
2/2
✓ Branch 1 taken 61 times.
✓ Branch 4 taken 61 times.
122 caller_ex.dispatch(ch).resume();
361 122 }
362 };
363
364 /** Compute the result type for when_all.
365
366 Returns void when all tasks are void (P2300 aligned),
367 otherwise returns a tuple with void types filtered out.
368 */
369 template<typename... Ts>
370 using when_all_result_t = std::conditional_t<
371 std::is_same_v<filter_void_tuple_t<Ts...>, std::tuple<>>,
372 void,
373 filter_void_tuple_t<Ts...>>;
374
375 /** Helper to extract a single result, returning empty tuple for void.
376 This is a separate function to work around a GCC-11 ICE that occurs
377 when using nested immediately-invoked lambdas with pack expansion.
378 */
379 template<std::size_t I, typename... Ts>
380 80 auto extract_single_result(when_all_state<Ts...>& state)
381 {
382 using T = std::tuple_element_t<I, std::tuple<Ts...>>;
383 if constexpr (std::is_void_v<T>)
384 4 return std::tuple<>();
385 else
386
1/1
✓ Branch 4 taken 38 times.
76 return std::make_tuple(std::move(std::get<I>(state.results_)).get());
387 }
388
389 /** Extract results from state, filtering void types.
390 */
391 template<typename... Ts>
392 30 auto extract_results(when_all_state<Ts...>& state)
393 {
394 45 return [&]<std::size_t... Is>(std::index_sequence<Is...>) {
395
28/30
✓ Branch 1 taken 1 times.
✓ Branch 5 taken 1 times.
✓ Branch 8 taken 1 times.
✓ Branch 11 taken 1 times.
✓ Branch 14 taken 1 times.
✓ Branch 17 taken 1 times.
✓ Branch 20 taken 1 times.
✓ Branch 29 taken 1 times.
✓ Branch 32 taken 1 times.
✓ Branch 35 taken 1 times.
✓ Branch 38 taken 1 times.
✓ Branch 41 taken 1 times.
✓ Branch 44 taken 1 times.
✓ Branch 47 taken 1 times.
✓ Branch 50 taken 1 times.
✓ Branch 53 taken 1 times.
✗ Branch 57 not taken.
✗ Branch 60 not taken.
✓ Branch 63 taken 1 times.
✓ Branch 66 taken 1 times.
✓ Branch 70 taken 1 times.
✓ Branch 73 taken 1 times.
✓ Branch 76 taken 1 times.
✓ Branch 81 taken 2 times.
✓ Branch 84 taken 2 times.
✓ Branch 87 taken 2 times.
✓ Branch 90 taken 2 times.
✓ Branch 93 taken 8 times.
✓ Branch 96 taken 8 times.
✓ Branch 99 taken 8 times.
15 return std::tuple_cat(extract_single_result<Is>(state)...);
396
1/1
✓ Branch 1 taken 15 times.
60 }(std::index_sequence_for<Ts...>{});
397 }
398
399 } // namespace detail
400
401 /** Wait for all tasks to complete concurrently.
402
403 @par Example
404 @code
405 task<void> example() {
406 auto [a, b] = co_await when_all(
407 fetch_int(), // task<int>
408 fetch_string() // task<std::string>
409 );
410 }
411 @endcode
412
413 @param tasks The tasks to execute concurrently.
414 @return A task yielding a tuple of results (void types filtered out).
415
416 Key features:
417 @li All child tasks are launched concurrently
418 @li Results are collected in input order
419 @li First error is captured; subsequent errors are discarded
420 @li On error, stop is requested for all siblings
421 @li Completes only after all children have completed
422 @li Void tasks do not contribute to the result tuple
423 @li Properly propagates frame allocators to all child coroutines
424 */
425 template<typename... Ts>
426 [[nodiscard]] task<detail::when_all_result_t<Ts...>>
427
1/1
✓ Branch 1 taken 24 times.
48 when_all(task<Ts>... tasks)
428 {
429 using result_type = detail::when_all_result_t<Ts...>;
430
431 // State is stored in the coroutine frame, using the frame allocator
432 detail::when_all_state<Ts...> state;
433
434 // Store tasks in the frame
435 std::tuple<task<Ts>...> task_tuple(std::move(tasks)...);
436
437 // Launch all tasks and wait for completion
438 co_await detail::when_all_launcher<Ts...>(&task_tuple, &state);
439
440 // Propagate first exception if any.
441 // Safe without explicit acquire: capture_exception() is sequenced-before
442 // signal_completion()'s acq_rel fetch_sub, which synchronizes-with the
443 // last task's decrement that resumes this coroutine.
444 if(state.first_exception_)
445 std::rethrow_exception(state.first_exception_);
446
447 // Extract and return results
448 if constexpr (std::is_void_v<result_type>)
449 co_return;
450 else
451 co_return detail::extract_results(state);
452 96 }
453
454 // For backwards compatibility and type queries, expose result type computation
455 template<typename... Ts>
456 using when_all_result_type = detail::when_all_result_t<Ts...>;
457
458 } // namespace capy
459 } // namespace boost
460
461 #endif
462