3#include <asyncpp/ref.h>
6#include <condition_variable>
16 template<
typename TResult>
20 template<
typename T,
typename TError>
21 struct promise_state : intrusive_refcount<promise_state<T, TError>> {
22 inline constexpr static size_t pending_index = 0;
23 inline constexpr static size_t fulfilled_index = 1;
24 inline constexpr static size_t rejected_index = 2;
27 std::condition_variable m_cv{};
28 std::variant<std::monostate, T, TError> m_value{};
29 std::vector<std::function<void()>> m_on_settle{};
30 std::vector<std::function<void(
const T&)>> m_on_fulfill{};
31 std::vector<std::function<void(
const TError&)>> m_on_reject{};
33 bool try_fulfill(
auto&& value) {
34 std::unique_lock lck{m_mtx};
35 if (m_value.index() != pending_index)
return false;
36 m_value.template emplace<fulfilled_index>(std::forward<
decltype(value)>(value));
38 auto settle_cbs = std::move(m_on_settle);
39 auto fulfill_cbs = std::move(m_on_fulfill);
43 auto& res = std::get<fulfilled_index>(m_value);
45 for (
auto& cbfn : settle_cbs) {
48 }
catch (...) { std::terminate(); }
50 for (
auto& cbfn : fulfill_cbs) {
53 }
catch (...) { std::terminate(); }
58 bool try_reject(
auto&& value) {
59 std::unique_lock lck{m_mtx};
60 if (m_value.index() != pending_index)
return false;
61 m_value.template emplace<rejected_index>(std::forward<
decltype(value)>(value));
63 auto settle_cbs = std::move(m_on_settle);
64 auto reject_cbs = std::move(m_on_reject);
68 auto& error = std::get<rejected_index>(m_value);
70 for (
auto& cbfn : settle_cbs) {
73 }
catch (...) { std::terminate(); }
75 for (
auto& cbfn : reject_cbs) {
78 }
catch (...) { std::terminate(); }
83 void then(std::function<
void(
const T&)> then_cb, std::function<
void(
const TError&)> catch_cb) {
84 std::unique_lock lck{m_mtx};
85 switch (m_value.index()) {
87 if (then_cb) m_on_fulfill.emplace_back(std::move(then_cb));
88 if (catch_cb) m_on_reject.emplace_back(std::move(catch_cb));
92 if (then_cb) then_cb(std::get<fulfilled_index>(m_value));
96 if (catch_cb) catch_cb(std::get<rejected_index>(m_value));
101 void on_settle(std::function<
void()> settle_cb) {
102 if (!settle_cb)
return;
103 std::unique_lock lck{m_mtx};
104 if (m_value.index() == pending_index)
105 m_on_settle.emplace_back(std::move(settle_cb));
114 struct is_promise : std::false_type {};
116 struct is_promise<promise<T>> : std::true_type {};
123 template<
typename TResult>
126 using state = detail::promise_state<TResult, std::exception_ptr>;
130 using result_type = TResult;
138 if (&other !=
this) m_state = other.m_state;
147 std::unique_lock lck{m_state->m_mtx};
148 return m_state->m_value.index() == state::pending_index;
157 std::unique_lock lck{m_state->m_mtx};
158 return m_state->m_value.index() == state::fulfilled_index;
167 std::unique_lock lck{m_state->m_mtx};
168 return m_state->m_value.index() == state::rejected_index;
178 if (!m_state->try_fulfill(std::forward<
decltype(value)>(value)))
179 throw std::logic_error(
"promise is not pending");
188 bool try_fulfill(
auto&& value) {
return m_state->try_fulfill(std::forward<
decltype(value)>(value)); }
197 if (!m_state->try_reject(std::forward<
decltype(error)>(error)))
198 throw std::logic_error(
"promise is not pending");
207 bool try_reject(std::exception_ptr error) {
return m_state->try_reject(std::forward<
decltype(error)>(error)); }
216 template<
typename TException,
typename... Args>
218 reject(std::make_exception_ptr(TException{std::forward<Args>(args)...}));
228 template<
typename TException,
typename... Args>
230 return try_reject(std::make_exception_ptr(TException{std::forward<Args>(args)...}));
237 void then(std::function<
void(
const TResult&)> then_cb,
238 std::function<
void(
const std::exception_ptr&)> catch_cb) {
239 m_state->then(std::move(then_cb), std::move(catch_cb));
246 void on_settle(std::function<
void()> settle_cb) { m_state->on_settle(std::move(settle_cb)); }
252 [[nodiscard]] TResult&
get()
const {
253 state& info = *m_state;
254 std::unique_lock lck{info.m_mtx};
255 while (info.m_value.index() == state::pending_index) {
258 if (info.m_value.index() == state::fulfilled_index)
return std::get<state::fulfilled_index>(info.m_value);
259 std::rethrow_exception(std::get<state::rejected_index>(info.m_value));
266 template<
class Rep,
class Period>
267 [[nodiscard]] TResult*
get(std::chrono::duration<Rep, Period> timeout)
const {
268 auto until = std::chrono::steady_clock::now() + timeout;
269 state& info = *m_state;
270 std::unique_lock lck{info.m_mtx};
271 if (!info.m_cv.wait_until(lck, until, [&info]() { return info.m_value.index() != state::pending_index; }))
273 if (info.m_value.index() == state::fulfilled_index)
return &std::get<state::fulfilled_index>(info.m_value);
274 std::rethrow_exception(std::get<state::rejected_index>(info.m_value));
281 [[nodiscard]] std::pair<TResult*, std::exception_ptr>
try_get(std::nothrow_t)
const noexcept {
282 state& info = *m_state;
283 std::unique_lock lck{info.m_mtx};
284 if (info.m_value.index() == state::pending_index)
return {
nullptr,
nullptr};
285 if (info.m_value.index() == state::fulfilled_index)
286 return {&std::get<state::fulfilled_index>(info.m_value),
nullptr};
287 return {
nullptr, std::get<state::rejected_index>(info.m_value)};
295 auto res =
try_get(std::nothrow);
296 if (res.second) std::rethrow_exception(std::move(res.second));
304 [[nodiscard]]
auto operator co_await()
const noexcept {
306 constexpr explicit awaiter(
ref<state> state) : m_state(std::move(state)) {}
307 [[nodiscard]]
constexpr bool await_ready()
noexcept {
309 std::unique_lock lck{m_state->m_mtx};
310 return m_state->m_value.index() != state::pending_index;
312 [[nodiscard]]
bool await_suspend(coroutine_handle<> hndl)
noexcept {
315 std::unique_lock lck{m_state->m_mtx};
316 if (m_state->m_value.index() == state::pending_index) {
317 m_state->m_on_settle.emplace_back([hndl]()
mutable { hndl.resume(); });
322 [[nodiscard]] TResult& await_resume() {
324 std::unique_lock lck{m_state->m_mtx};
325 assert(m_state->m_value.index() != state::pending_index);
326 if (m_state->m_value.index() == state::fulfilled_index)
327 return std::get<state::fulfilled_index>(m_state->m_value);
328 std::rethrow_exception(std::get<state::rejected_index>(m_state->m_value));
334 assert(this->m_state);
335 return awaiter{m_state};
345 res.
fulfill(std::forward<
decltype(value)>(value));
356 res.
reject(std::move(exception));
366 template<
typename TException,
typename... Args>
369 res.
reject<TException, Args...>(std::forward<Args>(args)...);
381 using result_type = void;
400 void then(std::function<
void()> then_cb, std::function<
void(
const std::exception_ptr&)> catch_cb) {
401 std::function<void(
const std::monostate&)> cbfn{};
402 if (then_cb) cbfn = [cbfn = std::move(then_cb)](
const std::monostate&) { cbfn(); };
407 template<
class Rep,
class Period>
408 [[nodiscard]]
bool get(std::chrono::duration<Rep, Period> timeout)
const {
412 [[nodiscard]] std::pair<bool, std::exception_ptr>
try_get(std::nothrow_t)
const noexcept {
414 if (res.second)
return {
true, res.second};
415 return {res.first !=
nullptr,
nullptr};
417 [[nodiscard]]
bool try_get()
const {
419 if (res.second) std::rethrow_exception(std::move(res.second));
420 return res.first !=
nullptr;
423 [[nodiscard]]
auto operator co_await()
const noexcept {
426 decltype(std::declval<promise<std::monostate>>().
operator co_await()) m_awaiter;
429 constexpr explicit awaiter(
decltype(m_awaiter)&& awaiter)
430 : m_awaiter(std::forward<decltype(awaiter)>(awaiter)) {}
431 [[nodiscard]]
constexpr bool await_ready() noexcept {
return m_awaiter.await_ready(); }
432 [[nodiscard]]
bool await_suspend(coroutine_handle<> hndl)
noexcept {
433 return m_awaiter.await_suspend(hndl);
435 void await_resume() {
static_cast<void>(m_awaiter.await_resume()); }
437 return awaiter{promise<std::monostate>::operator
co_await()};
447 res.
reject(std::move(exception));
450 template<
typename TException,
typename... Args>
453 res.
reject<TException, Args...>(std::forward<Args>(args)...);
469 template<
typename T,
typename... TArgs>
470 inline promise<T> promise_first(promise<TArgs>... args) {
471 promise<T> result_promise;
473 [result_promise](
const typename promise<TArgs>::result_type& res)
mutable {
474 result_promise.try_fulfill(res);
476 [result_promise](
const std::exception_ptr& exception)
mutable { result_promise.try_reject(exception); }),
478 return result_promise;
492 template<
typename T,
typename... TArgs>
493 static promise<T> promise_first_successful(promise<TArgs>... args) {
494 promise<T> result_promise;
495 auto finished = std::make_shared<std::atomic<size_t>>(0);
497 [result_promise, finished](
const typename promise<TArgs>::result_type& res)
mutable {
498 finished->fetch_add(1);
499 result_promise.try_fulfill(res);
501 [result_promise, finished](
const std::exception_ptr& exception)
mutable {
502 if (finished->fetch_add(1) + 1 ==
sizeof...(args)) result_promise.try_reject(exception);
505 return result_promise;
515 template<
typename... TArgs>
516 static promise<void> promise_all(promise<TArgs>... args) {
518 std::atomic<size_t> count{};
519 promise<void> result;
521 auto shared = std::make_shared<state>();
522 (args.on_settle([shared]()
mutable {
523 auto curid = shared->count.fetch_add(1);
524 if (curid + 1 ==
sizeof...(args)) { shared->result.fulfill(); }
527 return shared->result;
promise & operator=(const promise &other)=default
Copy assignment.
promise()=default
Construct a new promise object in its pending state.
promise(const promise &other)=default
Copy constructor.
Promise type that allows waiting for a result in both synchronous and asynchronous code.
Definition promise.h:124
promise(const promise &other)
Copy constructor.
Definition promise.h:135
TResult * get(std::chrono::duration< Rep, Period > timeout) const
Synchronously get the result with a timeout. If the promise is rejected the rejecting exception gets ...
Definition promise.h:267
void fulfill(auto &&value)
Fulfill the promise with a value.
Definition promise.h:177
static promise make_rejected(Args &&... args)
Get a rejected promise with the specified exception.
Definition promise.h:367
bool try_reject(Args &&... args)
Try to reject the promise with an exception.
Definition promise.h:229
std::pair< TResult *, std::exception_ptr > try_get(std::nothrow_t) const noexcept
Synchronously try get the result. If the promise is rejected the rejecting exception gets thrown.
Definition promise.h:281
void reject(Args &&... args)
Reject the promise with an exception.
Definition promise.h:217
static promise make_fulfilled(TResult &&value)
Get a fufilled promise with the specified value.
Definition promise.h:343
promise()
Construct a new promise object in its pending state.
Definition promise.h:133
void on_settle(std::function< void()> settle_cb)
Register a callback to be executed once a result is available. If the promise is not pending the call...
Definition promise.h:246
static promise make_rejected(std::exception_ptr exception)
Get a rejected promise with the specified exception.
Definition promise.h:354
bool is_pending() const noexcept
Check if the promise is pending.
Definition promise.h:146
bool is_rejected() const noexcept
Check if the promise is rejected.
Definition promise.h:166
bool try_reject(std::exception_ptr error)
Try to reject the promise with an exception.
Definition promise.h:207
TResult & get() const
Synchronously get the result. If the promise is rejected the rejecting exception gets thrown.
Definition promise.h:252
bool try_fulfill(auto &&value)
Try to fulfill the promise with a value.
Definition promise.h:188
void reject(std::exception_ptr error)
Reject the promise with an exception.
Definition promise.h:196
promise & operator=(const promise &other)
Copy assignment.
Definition promise.h:137
TResult * try_get() const
Synchronously try get the result. If the promise is rejected the rejecting exception gets thrown.
Definition promise.h:294
bool is_fulfilled() const noexcept
Check if the promise is fulfilled.
Definition promise.h:156
void then(std::function< void(const TResult &)> then_cb, std::function< void(const std::exception_ptr &)> catch_cb)
Register a callback to be executed once a result is available. If the promise is not pending the call...
Definition promise.h:237
Reference count handle.
Definition ref.h:126
Provides a consistent import interface for coroutine, experimental/coroutine or a best effort fallbac...