Async++ unknown
Async (co_await/co_return) code for C++
Loading...
Searching...
No Matches
promise.h
1#pragma once
3#include <asyncpp/ref.h>
4
5#include <cassert>
6#include <condition_variable>
7#include <cstddef>
8#include <functional>
9#include <memory>
10#include <mutex>
11#include <variant>
12#include <vector>
13
14namespace asyncpp {
15
16 template<typename TResult>
17 class promise;
18
19 namespace detail {
20 template<typename T, typename TError>
21 struct promise_state : intrusive_refcount<promise_state<T, TError>> {
22 inline constexpr static size_t pending_index = 0;
23 inline constexpr static size_t fulfilled_index = 1;
24 inline constexpr static size_t rejected_index = 2;
25
26 std::mutex m_mtx{};
27 std::condition_variable m_cv{};
28 std::variant<std::monostate, T, TError> m_value{};
29 std::vector<std::function<void()>> m_on_settle{};
30 std::vector<std::function<void(const T&)>> m_on_fulfill{};
31 std::vector<std::function<void(const TError&)>> m_on_reject{};
32
33 bool try_fulfill(auto&& value) {
34 std::unique_lock lck{m_mtx};
35 if (m_value.index() != pending_index) return false;
36 m_value.template emplace<fulfilled_index>(std::forward<decltype(value)>(value));
37 m_cv.notify_all();
38 auto settle_cbs = std::move(m_on_settle);
39 auto fulfill_cbs = std::move(m_on_fulfill);
40 m_on_settle.clear();
41 m_on_fulfill.clear();
42 m_on_reject.clear();
43 auto& res = std::get<fulfilled_index>(m_value);
44 lck.unlock();
45 for (auto& cbfn : settle_cbs) {
46 try {
47 cbfn();
48 } catch (...) { std::terminate(); }
49 }
50 for (auto& cbfn : fulfill_cbs) {
51 try {
52 cbfn(res);
53 } catch (...) { std::terminate(); }
54 }
55 return true;
56 }
57
58 bool try_reject(auto&& value) {
59 std::unique_lock lck{m_mtx};
60 if (m_value.index() != pending_index) return false;
61 m_value.template emplace<rejected_index>(std::forward<decltype(value)>(value));
62 m_cv.notify_all();
63 auto settle_cbs = std::move(m_on_settle);
64 auto reject_cbs = std::move(m_on_reject);
65 m_on_settle.clear();
66 m_on_fulfill.clear();
67 m_on_reject.clear();
68 auto& error = std::get<rejected_index>(m_value);
69 lck.unlock();
70 for (auto& cbfn : settle_cbs) {
71 try {
72 cbfn();
73 } catch (...) { std::terminate(); }
74 }
75 for (auto& cbfn : reject_cbs) {
76 try {
77 cbfn(error);
78 } catch (...) { std::terminate(); }
79 }
80 return true;
81 }
82
83 void then(std::function<void(const T&)> then_cb, std::function<void(const TError&)> catch_cb) {
84 std::unique_lock lck{m_mtx};
85 switch (m_value.index()) {
86 case pending_index:
87 if (then_cb) m_on_fulfill.emplace_back(std::move(then_cb));
88 if (catch_cb) m_on_reject.emplace_back(std::move(catch_cb));
89 break;
90 case fulfilled_index:
91 lck.unlock();
92 if (then_cb) then_cb(std::get<fulfilled_index>(m_value));
93 break;
94 case rejected_index:
95 lck.unlock();
96 if (catch_cb) catch_cb(std::get<rejected_index>(m_value));
97 break;
98 }
99 }
100
101 void on_settle(std::function<void()> settle_cb) {
102 if (!settle_cb) return;
103 std::unique_lock lck{m_mtx};
104 if (m_value.index() == pending_index)
105 m_on_settle.emplace_back(std::move(settle_cb));
106 else {
107 lck.unlock();
108 settle_cb();
109 }
110 }
111 };
112
113 template<typename T>
114 struct is_promise : std::false_type {};
115 template<typename T>
116 struct is_promise<promise<T>> : std::true_type {};
117 } // namespace detail
118
123 template<typename TResult>
124 class promise {
125 protected:
126 using state = detail::promise_state<TResult, std::exception_ptr>;
127 ref<state> m_state{};
128
129 public:
130 using result_type = TResult;
131
133 promise() : m_state{make_ref<state>()} {}
135 promise(const promise& other) : m_state{other.m_state} {}
137 promise& operator=(const promise& other) {
138 if (&other != this) m_state = other.m_state;
139 return *this;
140 }
141
146 bool is_pending() const noexcept {
147 std::unique_lock lck{m_state->m_mtx};
148 return m_state->m_value.index() == state::pending_index;
149 }
150
156 bool is_fulfilled() const noexcept {
157 std::unique_lock lck{m_state->m_mtx};
158 return m_state->m_value.index() == state::fulfilled_index;
159 }
160
166 bool is_rejected() const noexcept {
167 std::unique_lock lck{m_state->m_mtx};
168 return m_state->m_value.index() == state::rejected_index;
169 }
170
177 void fulfill(auto&& value) {
178 if (!m_state->try_fulfill(std::forward<decltype(value)>(value)))
179 throw std::logic_error("promise is not pending");
180 }
181
188 bool try_fulfill(auto&& value) { return m_state->try_fulfill(std::forward<decltype(value)>(value)); }
189
196 void reject(std::exception_ptr error) {
197 if (!m_state->try_reject(std::forward<decltype(error)>(error)))
198 throw std::logic_error("promise is not pending");
199 }
200
207 bool try_reject(std::exception_ptr error) { return m_state->try_reject(std::forward<decltype(error)>(error)); }
208
216 template<typename TException, typename... Args>
217 void reject(Args&&... args) {
218 reject(std::make_exception_ptr(TException{std::forward<Args>(args)...}));
219 }
220
228 template<typename TException, typename... Args>
229 bool try_reject(Args&&... args) {
230 return try_reject(std::make_exception_ptr(TException{std::forward<Args>(args)...}));
231 }
232
237 void then(std::function<void(const TResult&)> then_cb,
238 std::function<void(const std::exception_ptr&)> catch_cb) {
239 m_state->then(std::move(then_cb), std::move(catch_cb));
240 }
241
246 void on_settle(std::function<void()> settle_cb) { m_state->on_settle(std::move(settle_cb)); }
247
252 [[nodiscard]] TResult& get() const {
253 state& info = *m_state;
254 std::unique_lock lck{info.m_mtx};
255 while (info.m_value.index() == state::pending_index) {
256 info.m_cv.wait(lck);
257 }
258 if (info.m_value.index() == state::fulfilled_index) return std::get<state::fulfilled_index>(info.m_value);
259 std::rethrow_exception(std::get<state::rejected_index>(info.m_value));
260 }
261
266 template<class Rep, class Period>
267 [[nodiscard]] TResult* get(std::chrono::duration<Rep, Period> timeout) const {
268 auto until = std::chrono::steady_clock::now() + timeout;
269 state& info = *m_state;
270 std::unique_lock lck{info.m_mtx};
271 if (!info.m_cv.wait_until(lck, until, [&info]() { return info.m_value.index() != state::pending_index; }))
272 return nullptr;
273 if (info.m_value.index() == state::fulfilled_index) return &std::get<state::fulfilled_index>(info.m_value);
274 std::rethrow_exception(std::get<state::rejected_index>(info.m_value));
275 }
276
281 [[nodiscard]] std::pair<TResult*, std::exception_ptr> try_get(std::nothrow_t) const noexcept {
282 state& info = *m_state;
283 std::unique_lock lck{info.m_mtx};
284 if (info.m_value.index() == state::pending_index) return {nullptr, nullptr};
285 if (info.m_value.index() == state::fulfilled_index)
286 return {&std::get<state::fulfilled_index>(info.m_value), nullptr};
287 return {nullptr, std::get<state::rejected_index>(info.m_value)};
288 }
289
294 [[nodiscard]] TResult* try_get() const {
295 auto res = try_get(std::nothrow);
296 if (res.second) std::rethrow_exception(std::move(res.second));
297 return res.first;
298 }
299
304 [[nodiscard]] auto operator co_await() const noexcept {
305 struct awaiter {
306 constexpr explicit awaiter(ref<state> state) : m_state(std::move(state)) {}
307 [[nodiscard]] constexpr bool await_ready() noexcept {
308 assert(m_state);
309 std::unique_lock lck{m_state->m_mtx};
310 return m_state->m_value.index() != state::pending_index;
311 }
312 [[nodiscard]] bool await_suspend(coroutine_handle<> hndl) noexcept {
313 assert(m_state);
314 assert(hndl);
315 std::unique_lock lck{m_state->m_mtx};
316 if (m_state->m_value.index() == state::pending_index) {
317 m_state->m_on_settle.emplace_back([hndl]() mutable { hndl.resume(); });
318 return true;
319 }
320 return false;
321 }
322 [[nodiscard]] TResult& await_resume() {
323 assert(m_state);
324 std::unique_lock lck{m_state->m_mtx};
325 assert(m_state->m_value.index() != state::pending_index);
326 if (m_state->m_value.index() == state::fulfilled_index)
327 return std::get<state::fulfilled_index>(m_state->m_value);
328 std::rethrow_exception(std::get<state::rejected_index>(m_state->m_value));
329 }
330
331 private:
332 ref<state> m_state;
333 };
334 assert(this->m_state);
335 return awaiter{m_state};
336 }
337
343 [[nodiscard]] static promise make_fulfilled(TResult&& value) {
344 promise res;
345 res.fulfill(std::forward<decltype(value)>(value));
346 return res;
347 }
348
354 [[nodiscard]] static promise make_rejected(std::exception_ptr exception) {
355 promise res;
356 res.reject(std::move(exception));
357 return res;
358 }
359
366 template<typename TException, typename... Args>
367 [[nodiscard]] static promise make_rejected(Args&&... args) {
368 promise res;
369 res.reject<TException, Args...>(std::forward<Args>(args)...);
370 return res;
371 }
372 };
373
378 template<>
379 class promise<void> : private promise<std::monostate> {
380 public:
381 using result_type = void;
382
384 promise() = default;
386 promise(const promise& other) = default;
388 promise& operator=(const promise& other) = default;
389
390 using promise<std::monostate>::is_pending;
391 using promise<std::monostate>::is_fulfilled;
392 using promise<std::monostate>::is_rejected;
393 using promise<std::monostate>::reject;
394 using promise<std::monostate>::try_reject;
395 using promise<std::monostate>::on_settle;
396
397 void fulfill() { promise<std::monostate>::fulfill(std::monostate{}); }
398 [[nodiscard]] bool try_fulfill() { return promise<std::monostate>::try_fulfill(std::monostate{}); }
399
400 void then(std::function<void()> then_cb, std::function<void(const std::exception_ptr&)> catch_cb) {
401 std::function<void(const std::monostate&)> cbfn{};
402 if (then_cb) cbfn = [cbfn = std::move(then_cb)](const std::monostate&) { cbfn(); };
403 promise<std::monostate>::then(cbfn, std::move(catch_cb));
404 }
405
406 void get() const { static_cast<void>(promise<std::monostate>::get()); }
407 template<class Rep, class Period>
408 [[nodiscard]] bool get(std::chrono::duration<Rep, Period> timeout) const {
409 return promise<std::monostate>::get(timeout) != nullptr;
410 }
411
412 [[nodiscard]] std::pair<bool, std::exception_ptr> try_get(std::nothrow_t) const noexcept {
413 auto res = promise<std::monostate>::try_get(std::nothrow);
414 if (res.second) return {true, res.second};
415 return {res.first != nullptr, nullptr};
416 }
417 [[nodiscard]] bool try_get() const {
418 auto res = promise<std::monostate>::try_get(std::nothrow);
419 if (res.second) std::rethrow_exception(std::move(res.second));
420 return res.first != nullptr;
421 }
422
423 [[nodiscard]] auto operator co_await() const noexcept {
424 struct awaiter {
425 private:
426 decltype(std::declval<promise<std::monostate>>().operator co_await()) m_awaiter;
427
428 public:
429 constexpr explicit awaiter(decltype(m_awaiter)&& awaiter)
430 : m_awaiter(std::forward<decltype(awaiter)>(awaiter)) {}
431 [[nodiscard]] constexpr bool await_ready() noexcept { return m_awaiter.await_ready(); }
432 [[nodiscard]] bool await_suspend(coroutine_handle<> hndl) noexcept {
433 return m_awaiter.await_suspend(hndl);
434 }
435 void await_resume() { static_cast<void>(m_awaiter.await_resume()); }
436 };
437 return awaiter{promise<std::monostate>::operator co_await()};
438 }
439
440 [[nodiscard]] static promise make_fulfilled() {
441 promise res;
442 res.fulfill();
443 return res;
444 }
445 [[nodiscard]] static promise make_rejected(std::exception_ptr exception) {
446 promise res;
447 res.reject(std::move(exception));
448 return res;
449 }
450 template<typename TException, typename... Args>
451 [[nodiscard]] static promise make_rejected(Args&&... args) {
452 promise res;
453 res.reject<TException, Args...>(std::forward<Args>(args)...);
454 return res;
455 }
456 };
457
469 template<typename T, typename... TArgs>
470 inline promise<T> promise_first(promise<TArgs>... args) {
471 promise<T> result_promise;
472 (args.then(
473 [result_promise](const typename promise<TArgs>::result_type& res) mutable {
474 result_promise.try_fulfill(res);
475 },
476 [result_promise](const std::exception_ptr& exception) mutable { result_promise.try_reject(exception); }),
477 ...);
478 return result_promise;
479 }
480
492 template<typename T, typename... TArgs>
493 static promise<T> promise_first_successful(promise<TArgs>... args) {
494 promise<T> result_promise;
495 auto finished = std::make_shared<std::atomic<size_t>>(0);
496 (args.then(
497 [result_promise, finished](const typename promise<TArgs>::result_type& res) mutable {
498 finished->fetch_add(1);
499 result_promise.try_fulfill(res);
500 },
501 [result_promise, finished](const std::exception_ptr& exception) mutable {
502 if (finished->fetch_add(1) + 1 == sizeof...(args)) result_promise.try_reject(exception);
503 }),
504 ...);
505 return result_promise;
506 }
507
515 template<typename... TArgs>
516 static promise<void> promise_all(promise<TArgs>... args) {
517 struct state {
518 std::atomic<size_t> count{};
519 promise<void> result;
520 };
521 auto shared = std::make_shared<state>();
522 (args.on_settle([shared]() mutable {
523 auto curid = shared->count.fetch_add(1);
524 if (curid + 1 == sizeof...(args)) { shared->result.fulfill(); }
525 }),
526 ...);
527 return shared->result;
528 }
529} // namespace asyncpp
promise & operator=(const promise &other)=default
Copy assignment.
promise()=default
Construct a new promise object in its pending state.
promise(const promise &other)=default
Copy constructor.
Promise type that allows waiting for a result in both synchronous and asynchronous code.
Definition promise.h:124
promise(const promise &other)
Copy constructor.
Definition promise.h:135
TResult * get(std::chrono::duration< Rep, Period > timeout) const
Synchronously get the result with a timeout. If the promise is rejected the rejecting exception gets ...
Definition promise.h:267
void fulfill(auto &&value)
Fulfill the promise with a value.
Definition promise.h:177
static promise make_rejected(Args &&... args)
Get a rejected promise with the specified exception.
Definition promise.h:367
bool try_reject(Args &&... args)
Try to reject the promise with an exception.
Definition promise.h:229
std::pair< TResult *, std::exception_ptr > try_get(std::nothrow_t) const noexcept
Synchronously try get the result. If the promise is rejected the rejecting exception gets thrown.
Definition promise.h:281
void reject(Args &&... args)
Reject the promise with an exception.
Definition promise.h:217
static promise make_fulfilled(TResult &&value)
Get a fufilled promise with the specified value.
Definition promise.h:343
promise()
Construct a new promise object in its pending state.
Definition promise.h:133
void on_settle(std::function< void()> settle_cb)
Register a callback to be executed once a result is available. If the promise is not pending the call...
Definition promise.h:246
static promise make_rejected(std::exception_ptr exception)
Get a rejected promise with the specified exception.
Definition promise.h:354
bool is_pending() const noexcept
Check if the promise is pending.
Definition promise.h:146
bool is_rejected() const noexcept
Check if the promise is rejected.
Definition promise.h:166
bool try_reject(std::exception_ptr error)
Try to reject the promise with an exception.
Definition promise.h:207
TResult & get() const
Synchronously get the result. If the promise is rejected the rejecting exception gets thrown.
Definition promise.h:252
bool try_fulfill(auto &&value)
Try to fulfill the promise with a value.
Definition promise.h:188
void reject(std::exception_ptr error)
Reject the promise with an exception.
Definition promise.h:196
promise & operator=(const promise &other)
Copy assignment.
Definition promise.h:137
TResult * try_get() const
Synchronously try get the result. If the promise is rejected the rejecting exception gets thrown.
Definition promise.h:294
bool is_fulfilled() const noexcept
Check if the promise is fulfilled.
Definition promise.h:156
void then(std::function< void(const TResult &)> then_cb, std::function< void(const std::exception_ptr &)> catch_cb)
Register a callback to be executed once a result is available. If the promise is not pending the call...
Definition promise.h:237
Reference count handle.
Definition ref.h:126
Provides a consistent import interface for coroutine, experimental/coroutine or a best effort fallbac...