Async++ unknown
Async (co_await/co_return) code for C++
Loading...
Searching...
No Matches
launch.h
1#pragma once
2#include <asyncpp/detail/promise_allocator_base.h>
3#include <asyncpp/scope_guard.h>
4#include <atomic>
5#include <cassert>
6#include <exception>
7#include <future>
8#include <stdexcept>
9#include <tuple>
10#include <utility>
11
12namespace asyncpp {
13 namespace detail {
20 template<ByteAllocator Allocator = default_allocator_type>
21 struct launch_task {
22 // Promise type of this task
23 struct promise_type : promise_allocator_base<Allocator> {
24 constexpr launch_task get_return_object() noexcept { return {}; }
25 constexpr suspend_never initial_suspend() noexcept { return {}; }
26 constexpr suspend_never final_suspend() noexcept { return {}; }
27 constexpr void return_void() noexcept {}
28 void unhandled_exception() noexcept { std::terminate(); }
29 };
30 };
31 } // namespace detail
32
41 template<typename Awaitable, ByteAllocator Allocator = default_allocator_type>
42 void launch(Awaitable&& awaitable, const Allocator& allocator = {}) {
43 [](std::decay_t<Awaitable> awaitable, const Allocator&) -> detail::launch_task<Allocator> {
44 co_await std::move(awaitable);
45 }(std::forward<decltype(awaitable)>(awaitable), allocator);
46 }
47
52 std::atomic<size_t> m_count{0U};
53 std::atomic<void*> m_continuation{nullptr};
54
55 public:
56 constexpr async_launch_scope() noexcept = default;
58 async_launch_scope& operator=(const async_launch_scope&) = delete;
59 ~async_launch_scope() { assert(m_count.load() == 0); }
60
66 template<typename Awaitable, ByteAllocator Allocator = default_allocator_type>
67 void launch(Awaitable&& awaitable, const Allocator& allocator = {}) {
68 [](async_launch_scope* scope, std::decay_t<Awaitable> awaitable,
69 const Allocator&) -> detail::launch_task<Allocator> {
70 scope->m_count.fetch_add(1);
71 scope_guard guard{[scope]() noexcept {
72 // If this is the last task
73 if (scope->m_count.fetch_sub(1) == 1) {
74 // And we are being awaited
75 auto hdl = scope->m_continuation.exchange(nullptr);
76 // Resume the awaiter
77 if (hdl != nullptr) coroutine_handle<>::from_address(hdl).resume();
78 }
79 }};
80 co_await awaitable;
81 }(this, std::forward<decltype(awaitable)>(awaitable), allocator);
82 }
83
91 template<typename Callable, typename... Args, ByteAllocator Allocator = default_allocator_type>
92 requires(std::is_invocable_v<Callable, Args...>)
93 void invoke_tuple(Callable&& callable, std::tuple<Args...>&& args, const Allocator& allocator = {}) {
94 [](async_launch_scope* scope, Callable callable, std::tuple<Args...>&& args,
95 const Allocator&) -> detail::launch_task<Allocator> {
96 scope->m_count.fetch_add(1);
97 scope_guard guard{[scope]() noexcept {
98 // If this is the last task
99 if (scope->m_count.fetch_sub(1) == 1) {
100 // And we are being awaited
101 auto hdl = scope->m_continuation.exchange(nullptr);
102 // Resume the awaiter
103 if (hdl != nullptr) coroutine_handle<>::from_address(hdl).resume();
104 }
105 }};
106 co_await std::apply(callable, std::move(args));
107 }(this, std::forward<Callable>(callable), std::move(args), allocator);
108 }
109
117 template<typename Callable, typename... Args>
118 requires(std::is_invocable_v<Callable, Args...>)
119 void invoke(Callable&& callable, Args&&... args) {
120 [](async_launch_scope* scope, Callable callable, Args&&... args) -> detail::launch_task<> {
121 scope->m_count.fetch_add(1);
122 scope_guard guard{[scope]() noexcept {
123 // If this is the last task
124 if (scope->m_count.fetch_sub(1) == 1) {
125 // And we are being awaited
126 auto hdl = scope->m_continuation.exchange(nullptr);
127 // Resume the awaiter
128 if (hdl != nullptr) coroutine_handle<>::from_address(hdl).resume();
129 }
130 }};
131 co_await std::invoke(callable, std::forward<Args>(args)...);
132 }(this, std::forward<Callable>(callable), std::forward<Args>(args)...);
133 }
134
139 [[nodiscard]] auto join() noexcept {
140 struct awaiter {
141 async_launch_scope* m_scope;
142 [[nodiscard]] bool await_ready() const noexcept {
143 // Dont wait if theres nothing to await
144 return m_scope->m_count.load() == 0;
145 }
146 [[nodiscard]] bool await_suspend(coroutine_handle<> hdl) const {
147 // Set our coroutine if there is noone waiting
148 void* expected = nullptr;
149 if (!m_scope->m_continuation.compare_exchange_strong(expected, hdl.address()))
150 throw std::logic_error("duplicate join");
151 // We might have nothing left to wait on if the last coroutine finished in the meantime
152 return m_scope->m_count.load() != 0;
153 }
154 constexpr void await_resume() const noexcept {}
155 };
156 return awaiter{this};
157 }
158
164 template<ByteAllocator Allocator = default_allocator_type>
165 std::future<void> join_future(const Allocator& allocator = {}) noexcept {
166 std::promise<void> done;
167 auto res = done.get_future();
168 [](async_launch_scope* that, std::promise<void> done, const Allocator&) -> detail::launch_task<Allocator> {
169 try {
170 co_await that->join();
171 done.set_value();
172 } catch (...) { done.set_exception(std::current_exception()); }
173 }(this, std::move(done), allocator);
174 return res;
175 }
176
181 [[nodiscard]] size_t inflight_coroutines() const noexcept { return m_count.load(std::memory_order::relaxed); }
182
186 [[nodiscard]] bool all_done() const noexcept { return inflight_coroutines() == 0; }
187 };
188} // namespace asyncpp
Holder class for spawning child tasks. Allows waiting for all of them to finish.
Definition launch.h:51
void invoke(Callable &&callable, Args &&... args)
Invoke the provided callable in a new task. The callable is copied into the task, making it save for ...
Definition launch.h:119
void launch(Awaitable &&awaitable, const Allocator &allocator={})
Spawn a new task for the given awaitable.
Definition launch.h:67
size_t inflight_coroutines() const noexcept
Returns the number of active task on this scope.
Definition launch.h:181
bool all_done() const noexcept
Returns true if there is no active task currently running on this scope.
Definition launch.h:186
auto join() noexcept
Wait for all active tasks to finish.
Definition launch.h:139
void invoke_tuple(Callable &&callable, std::tuple< Args... > &&args, const Allocator &allocator={})
Invoke the provided callable in a new task. The callable is copied into the task, making it save for ...
Definition launch.h:93
std::future< void > join_future(const Allocator &allocator={}) noexcept
Create a future that finishes once all tasks are done. This is equivalent to awaiting join() in a pro...
Definition launch.h:165
Execute a callback function when this scope gets destroyed.
Definition scope_guard.h:12