Async++ unknown
Async (co_await/co_return) code for C++
Loading...
Searching...
No Matches
event.h
1#pragma once
3#include <asyncpp/dispatcher.h>
4
5#include <atomic>
6#include <cassert>
7
8namespace asyncpp {
32 public:
37 explicit constexpr single_consumer_event(bool set_initially = false) noexcept
38 : m_state(set_initially ? this : nullptr) {}
39#ifndef NDEBUG
40 ~single_consumer_event() noexcept { assert(!is_awaited()); }
41#endif
46 [[nodiscard]] bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; }
47
52 [[nodiscard]] bool is_awaited() const noexcept {
53 auto val = m_state.load(std::memory_order::acquire);
54 return val != nullptr && val != this;
55 }
56
63 bool set(dispatcher* resume_dispatcher = nullptr) noexcept {
64 auto state = m_state.exchange(this, std::memory_order::acq_rel);
65 if (state != nullptr && state != this) {
66 auto await = static_cast<awaiter*>(state);
67 assert(await->m_parent == this);
68 assert(await->m_handle);
69 if (await->m_dispatcher != nullptr) {
70 await->m_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
71 } else if (resume_dispatcher != nullptr) {
72 resume_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
73 } else {
74 await->m_handle.resume();
75 }
76 return true;
77 }
78 return false;
79 }
80
84 void reset() noexcept {
85 void* old_state = this;
86 m_state.compare_exchange_strong(old_state, nullptr, std::memory_order::relaxed);
87 }
88
97 [[nodiscard]] auto operator co_await() noexcept { return awaiter{this, dispatcher::current()}; }
98
109 [[nodiscard]] constexpr auto wait(dispatcher* resume_dispatcher = nullptr) noexcept {
110 return awaiter{this, resume_dispatcher};
111 }
112
113 private:
114 /* nullptr => unset
115 * this => set
116 * x => awaiter*
117 */
118 std::atomic<void*> m_state;
119
120 struct [[nodiscard]] awaiter {
121 explicit constexpr awaiter(single_consumer_event* parent, dispatcher* dispatcher) noexcept
122 : m_parent(parent), m_dispatcher(dispatcher) {}
123 [[nodiscard]] bool await_ready() const noexcept { return m_parent->is_set(); }
124 [[nodiscard]] bool await_suspend(coroutine_handle<> hdl) noexcept {
125 m_handle = hdl;
126 void* old_state = nullptr;
127 // If the current state is unset set it to this
128 bool was_equal = m_parent->m_state.compare_exchange_strong(old_state, this, std::memory_order::release,
129 std::memory_order::acquire);
130 // If the state was not unset it has to be set,
131 // otherwise we have a concurrent await, which is not supported
132 assert(was_equal || old_state == m_parent);
133 return was_equal;
134 }
135 constexpr void await_resume() const noexcept {}
136
137 single_consumer_event* m_parent;
138 dispatcher* m_dispatcher;
139 coroutine_handle<> m_handle{};
140 };
141 };
142
168 public:
173 explicit constexpr single_consumer_auto_reset_event(bool set_initially = false) noexcept
174 : m_state(set_initially ? this : nullptr) {}
175#ifndef NDEBUG
176 ~single_consumer_auto_reset_event() noexcept { assert(!is_awaited()); }
177#endif
182 [[nodiscard]] bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; }
183
188 [[nodiscard]] bool is_awaited() const noexcept {
189 auto ptr = m_state.load(std::memory_order::acquire);
190 return ptr != nullptr && ptr != this;
191 }
192
199 bool set(dispatcher* resume_dispatcher = nullptr) noexcept {
200 auto state = m_state.exchange(this, std::memory_order::release);
201 if (state != nullptr && state != this) {
202 auto await = static_cast<awaiter*>(state);
203
204 // Only modify the state if it has not been changed in between
205 state = this;
206 m_state.compare_exchange_strong(state, nullptr, std::memory_order::acq_rel);
207
208 assert(await->m_parent == this);
209 assert(await->m_handle);
210 if (await->m_dispatcher != nullptr) {
211 await->m_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
212 } else if (resume_dispatcher != nullptr) {
213 resume_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
214 } else {
215 await->m_handle.resume();
216 }
217 return true;
218 }
219 return false;
220 }
221
225 void reset() noexcept {
226 void* old_state = this;
227 m_state.compare_exchange_strong(old_state, nullptr, std::memory_order::relaxed);
228 }
229
238 [[nodiscard]] auto operator co_await() noexcept { return awaiter{this, dispatcher::current()}; }
239
250 [[nodiscard]] constexpr auto wait(dispatcher* resume_dispatcher = nullptr) noexcept {
251 return awaiter{this, resume_dispatcher};
252 }
253
254 private:
255 /* nullptr => unset
256 * this => set
257 * x => awaiter*
258 */
259 std::atomic<void*> m_state;
260
261 struct [[nodiscard]] awaiter {
262 explicit constexpr awaiter(single_consumer_auto_reset_event* parent, dispatcher* dispatcher) noexcept
263 : m_parent(parent), m_dispatcher(dispatcher) {}
264 [[nodiscard]] constexpr bool await_ready() const noexcept { return false; }
265 [[nodiscard]] bool await_suspend(coroutine_handle<> hdl) noexcept {
266 m_handle = hdl;
267 void* old_state = nullptr;
268 if (!m_parent->m_state.compare_exchange_strong(old_state, this, std::memory_order::release,
269 std::memory_order::relaxed)) {
270 // No duplicate awaiters allowed, so the only valid values are m_parent and nullptr
271 assert(m_parent == old_state);
272 m_parent->m_state.exchange(nullptr, std::memory_order::acquire);
273 return false;
274 }
275 return true;
276 }
277 constexpr void await_resume() const noexcept {}
278
279 single_consumer_auto_reset_event* m_parent;
280 dispatcher* m_dispatcher;
281 coroutine_handle<> m_handle{};
282 };
283 };
284
308 public:
313 explicit constexpr multi_consumer_event(bool set_initially = false) noexcept
314 : m_state(set_initially ? this : nullptr) {}
315#ifndef NDEBUG
316 ~multi_consumer_event() noexcept { assert(!is_awaited()); }
317#endif
318
323 [[nodiscard]] bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; }
324
329 [[nodiscard]] bool is_awaited() const noexcept {
330 auto ptr = m_state.load(std::memory_order::acquire);
331 return ptr != nullptr && ptr != this;
332 }
333
340 bool set(dispatcher* resume_dispatcher = nullptr) noexcept {
341 auto state = m_state.exchange(this, std::memory_order::acq_rel);
342 if (state == this) return false;
343 auto await = static_cast<awaiter*>(state);
344 while (await != nullptr) {
345 auto next = await->m_next;
346 assert(await->m_parent == this);
347 assert(await->m_handle);
348 if (await->m_dispatcher != nullptr) {
349 await->m_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
350 } else if (resume_dispatcher != nullptr) {
351 resume_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
352 } else {
353 await->m_handle.resume();
354 }
355 await = next;
356 }
357 return true;
358 }
359
363 void reset() noexcept {
364 void* old_state = this;
365 m_state.compare_exchange_strong(old_state, nullptr, std::memory_order::relaxed);
366 }
367
376 [[nodiscard]] auto operator co_await() noexcept { return awaiter{this, dispatcher::current()}; }
377
388 [[nodiscard]] constexpr auto wait(dispatcher* resume_dispatcher = nullptr) noexcept {
389 return awaiter{this, resume_dispatcher};
390 }
391
392 private:
393 /* nullptr => unset
394 * this => set
395 * x => head of awaiter* list
396 */
397 std::atomic<void*> m_state;
398
399 struct [[nodiscard]] awaiter {
400 explicit constexpr awaiter(multi_consumer_event* parent, dispatcher* dispatcher) noexcept
401 : m_parent(parent), m_dispatcher(dispatcher) {}
402 [[nodiscard]] bool await_ready() const noexcept { return m_parent->is_set(); }
403 [[nodiscard]] bool await_suspend(coroutine_handle<> hdl) noexcept {
404 m_handle = hdl;
405 void* old_state = m_parent->m_state.load(std::memory_order::acquire);
406 do {
407 // event became set
408 if (old_state == m_parent) return false;
409 m_next = static_cast<awaiter*>(old_state);
410 } while (!m_parent->m_state.compare_exchange_weak( //
411 old_state, this, std::memory_order::release, std::memory_order::acquire));
412 return true;
413 }
414 constexpr void await_resume() const noexcept {}
415
416 multi_consumer_event* m_parent;
417 dispatcher* m_dispatcher;
418 awaiter* m_next{nullptr};
419 coroutine_handle<> m_handle{};
420 };
421 };
422
448 public:
453 explicit constexpr multi_consumer_auto_reset_event(bool set_initially = false) noexcept
454 : m_state(set_initially ? this : nullptr) {}
455#ifndef NDEBUG
456 ~multi_consumer_auto_reset_event() noexcept { assert(!is_awaited()); }
457#endif
458
463 [[nodiscard]] bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; }
464
469 [[nodiscard]] bool is_awaited() const noexcept {
470 auto ptr = m_state.load(std::memory_order::acquire);
471 return ptr != nullptr && ptr != this;
472 }
473
480 bool set(dispatcher* resume_dispatcher = nullptr) noexcept {
481 // We assume the event is
482 awaiter* await = nullptr;
483 void* old_state = m_state.load(std::memory_order::acquire);
484 void* new_state;
485 do {
486 // If the state is already set we can just return
487 if (old_state == this) return false;
488 // Otherwise store the state as an awaiter
489 await = static_cast<awaiter*>(old_state);
490 // if the event had awaiters we update the state to "unset", if it was previously unset we transition to set
491 new_state = (old_state == nullptr) ? this : nullptr;
492 } while (!m_state.compare_exchange_weak(old_state, new_state, std::memory_order::release,
493 std::memory_order::acquire));
494
495 // Execute the awaiters (if any)
496 if (await == nullptr) return false;
497 do {
498 auto next = await->m_next;
499 assert(await->m_parent == this);
500 assert(await->m_handle);
501 if (await->m_dispatcher != nullptr) {
502 await->m_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
503 } else if (resume_dispatcher != nullptr) {
504 resume_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
505 } else {
506 await->m_handle.resume();
507 }
508 await = next;
509 } while (await != nullptr);
510 return true;
511 }
512
516 void reset() noexcept {
517 void* old_state = this;
518 m_state.compare_exchange_strong(old_state, nullptr, std::memory_order::relaxed);
519 }
520
529 [[nodiscard]] auto operator co_await() noexcept { return awaiter{this, dispatcher::current()}; }
530
541 [[nodiscard]] constexpr auto wait(dispatcher* resume_dispatcher = nullptr) noexcept {
542 return awaiter{this, resume_dispatcher};
543 }
544
545 private:
546 /* nullptr => unset
547 * this => set
548 * x => head of awaiter* list
549 */
550 std::atomic<void*> m_state;
551
552 struct [[nodiscard]] awaiter {
553 explicit constexpr awaiter(multi_consumer_auto_reset_event* parent, dispatcher* dispatcher) noexcept
554 : m_parent(parent), m_dispatcher(dispatcher) {}
555 [[nodiscard]] bool await_ready() const noexcept {
556 // We assume the event is set
557 void* old_state = m_parent;
558 // And try to replace it with unset
559 // NOTE: compare_exchange_weak can fail spuriously, but we don't care
560 // because in that case we just enter the main loop in await_suspend
561 // Returns true if the value was updated (i.e. old_state equalled m_parent and cas succeeded)
562 return m_parent->m_state.compare_exchange_weak( //
563 old_state, nullptr, std::memory_order::release, std::memory_order::acquire);
564 }
565 [[nodiscard]] bool await_suspend(coroutine_handle<> hdl) noexcept {
566 m_handle = hdl;
567 void* old_state = m_parent->m_state.load(std::memory_order::acquire);
568 do {
569 // If the event is set, reset it and resume the coroutine
570 if (old_state == m_parent) {
571 if (m_parent->m_state.compare_exchange_weak( //
572 old_state, nullptr, std::memory_order::release, std::memory_order::acquire))
573 return false;
574 // the state changed between load and compare (either because another coroutine was faster or the event was reset).
575 // Retry from the start
576 continue;
577 }
578 // The event is unset, add the existing awaiter (if any) to the list and suspend the coroutine
579 m_next = static_cast<awaiter*>(old_state);
580 } while (!m_parent->m_state.compare_exchange_weak( //
581 old_state, this, std::memory_order::release, std::memory_order::acquire));
582 return true;
583 }
584 constexpr void await_resume() const noexcept {}
585
586 multi_consumer_auto_reset_event* m_parent;
587 dispatcher* m_dispatcher{};
588 awaiter* m_next{nullptr};
589 coroutine_handle<> m_handle{};
590 };
591 };
592} // namespace asyncpp
Basic dispatcher interface class.
Definition dispatcher.h:8
static dispatcher * current() noexcept
Definition dispatcher.h:48
virtual void push(std::function< void()> cbfn)=0
Simple auto reset event supporting multiple consumers.
Definition event.h:447
constexpr auto wait(dispatcher *resume_dispatcher=nullptr) noexcept
Suspend the current coroutine until the event is set.
Definition event.h:541
constexpr multi_consumer_auto_reset_event(bool set_initially=false) noexcept
Construct a new event.
Definition event.h:453
void reset() noexcept
Reset the event back to unset.
Definition event.h:516
bool set(dispatcher *resume_dispatcher=nullptr) noexcept
Set the event.
Definition event.h:480
bool is_set() const noexcept
Query if the event is currently set.
Definition event.h:463
bool is_awaited() const noexcept
Query if the event is currently being awaited.
Definition event.h:469
Simple manual reset event supporting multiple consumers.
Definition event.h:307
bool is_awaited() const noexcept
Query if the event is currently being awaited.
Definition event.h:329
void reset() noexcept
Reset the event back to unset.
Definition event.h:363
bool set(dispatcher *resume_dispatcher=nullptr) noexcept
Set the event.
Definition event.h:340
bool is_set() const noexcept
Query if the event is currently set.
Definition event.h:323
constexpr multi_consumer_event(bool set_initially=false) noexcept
Construct a new event.
Definition event.h:313
constexpr auto wait(dispatcher *resume_dispatcher=nullptr) noexcept
Suspend the current coroutine until the event is set.
Definition event.h:388
Simple auto reset event supporting a single consumer.
Definition event.h:167
void reset() noexcept
Reset the event back to unset.
Definition event.h:225
bool is_set() const noexcept
Query if the event is currently set.
Definition event.h:182
constexpr single_consumer_auto_reset_event(bool set_initially=false) noexcept
Construct a new event.
Definition event.h:173
constexpr auto wait(dispatcher *resume_dispatcher=nullptr) noexcept
Suspend the current coroutine until the event is set.
Definition event.h:250
bool is_awaited() const noexcept
Query if the event is currently being awaited.
Definition event.h:188
bool set(dispatcher *resume_dispatcher=nullptr) noexcept
Set the event.
Definition event.h:199
Simple manual reset event supporting a single consumer.
Definition event.h:31
bool is_set() const noexcept
Query if the event is currently set.
Definition event.h:46
bool set(dispatcher *resume_dispatcher=nullptr) noexcept
Set the event.
Definition event.h:63
bool is_awaited() const noexcept
Query if the event is currently being awaited.
Definition event.h:52
void reset() noexcept
Reset the event back to unset.
Definition event.h:84
constexpr single_consumer_event(bool set_initially=false) noexcept
Construct a new event.
Definition event.h:37
constexpr auto wait(dispatcher *resume_dispatcher=nullptr) noexcept
Suspend the current coroutine until the event is set.
Definition event.h:109
Provides a consistent import interface for coroutine, experimental/coroutine or a best effort fallbac...