Async++ unknown
Async (co_await/co_return) code for C++
Loading...
Searching...
No Matches
event.h
1#pragma once
3#include <asyncpp/dispatcher.h>
4#include <atomic>
5#include <cassert>
6
7namespace asyncpp {
31 public:
36 explicit constexpr single_consumer_event(bool set_initially = false) noexcept
37 : m_state(set_initially ? this : nullptr) {}
38#ifndef NDEBUG
39 ~single_consumer_event() noexcept { assert(!is_awaited()); }
40#endif
45 [[nodiscard]] bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; }
46
51 [[nodiscard]] bool is_awaited() const noexcept {
52 auto val = m_state.load(std::memory_order::acquire);
53 return val != nullptr && val != this;
54 }
55
62 bool set(dispatcher* resume_dispatcher = nullptr) noexcept {
63 auto state = m_state.exchange(this, std::memory_order::acq_rel);
64 if (state != nullptr && state != this) {
65 auto await = static_cast<awaiter*>(state);
66 assert(await->m_parent == this);
67 assert(await->m_handle);
68 if (await->m_dispatcher != nullptr) {
69 await->m_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
70 } else if (resume_dispatcher != nullptr) {
71 resume_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
72 } else {
73 await->m_handle.resume();
74 }
75 return true;
76 }
77 return false;
78 }
79
83 void reset() noexcept {
84 void* old_state = this;
85 m_state.compare_exchange_strong(old_state, nullptr, std::memory_order::relaxed);
86 }
87
96 [[nodiscard]] auto operator co_await() noexcept { return awaiter{this, dispatcher::current()}; }
97
108 [[nodiscard]] constexpr auto wait(dispatcher* resume_dispatcher = nullptr) noexcept {
109 return awaiter{this, resume_dispatcher};
110 }
111
112 private:
113 /* nullptr => unset
114 * this => set
115 * x => awaiter*
116 */
117 std::atomic<void*> m_state;
118
119 struct [[nodiscard]] awaiter {
120 explicit constexpr awaiter(single_consumer_event* parent, dispatcher* dispatcher) noexcept
121 : m_parent(parent), m_dispatcher(dispatcher) {}
122 [[nodiscard]] bool await_ready() const noexcept { return m_parent->is_set(); }
123 [[nodiscard]] bool await_suspend(coroutine_handle<> hdl) noexcept {
124 m_handle = hdl;
125 void* old_state = nullptr;
126 // If the current state is unset set it to this
127 bool was_equal = m_parent->m_state.compare_exchange_strong(old_state, this, std::memory_order::release,
128 std::memory_order::acquire);
129 // If the state was not unset it has to be set,
130 // otherwise we have a concurrent await, which is not supported
131 assert(was_equal || old_state == m_parent);
132 return was_equal;
133 }
134 constexpr void await_resume() const noexcept {}
135
136 single_consumer_event* m_parent;
137 dispatcher* m_dispatcher;
138 coroutine_handle<> m_handle{};
139 };
140 };
141
167 public:
172 explicit constexpr single_consumer_auto_reset_event(bool set_initially = false) noexcept
173 : m_state(set_initially ? this : nullptr) {}
174#ifndef NDEBUG
175 ~single_consumer_auto_reset_event() noexcept { assert(!is_awaited()); }
176#endif
181 [[nodiscard]] bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; }
182
187 [[nodiscard]] bool is_awaited() const noexcept {
188 auto ptr = m_state.load(std::memory_order::acquire);
189 return ptr != nullptr && ptr != this;
190 }
191
198 bool set(dispatcher* resume_dispatcher = nullptr) noexcept {
199 auto state = m_state.exchange(this, std::memory_order::release);
200 if (state != nullptr && state != this) {
201 auto await = static_cast<awaiter*>(state);
202
203 // Only modify the state if it has not been changed in between
204 state = this;
205 m_state.compare_exchange_strong(state, nullptr, std::memory_order::acq_rel);
206
207 assert(await->m_parent == this);
208 assert(await->m_handle);
209 if (await->m_dispatcher != nullptr) {
210 await->m_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
211 } else if (resume_dispatcher != nullptr) {
212 resume_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
213 } else {
214 await->m_handle.resume();
215 }
216 return true;
217 }
218 return false;
219 }
220
224 void reset() noexcept {
225 void* old_state = this;
226 m_state.compare_exchange_strong(old_state, nullptr, std::memory_order::relaxed);
227 }
228
237 [[nodiscard]] auto operator co_await() noexcept { return awaiter{this, dispatcher::current()}; }
238
249 [[nodiscard]] constexpr auto wait(dispatcher* resume_dispatcher = nullptr) noexcept {
250 return awaiter{this, resume_dispatcher};
251 }
252
253 private:
254 /* nullptr => unset
255 * this => set
256 * x => awaiter*
257 */
258 std::atomic<void*> m_state;
259
260 struct [[nodiscard]] awaiter {
261 explicit constexpr awaiter(single_consumer_auto_reset_event* parent, dispatcher* dispatcher) noexcept
262 : m_parent(parent), m_dispatcher(dispatcher) {}
263 [[nodiscard]] constexpr bool await_ready() const noexcept { return false; }
264 [[nodiscard]] bool await_suspend(coroutine_handle<> hdl) noexcept {
265 m_handle = hdl;
266 void* old_state = nullptr;
267 if (!m_parent->m_state.compare_exchange_strong(old_state, this, std::memory_order::release,
268 std::memory_order::relaxed)) {
269 // No duplicate awaiters allowed, so the only valid values are m_parent and nullptr
270 assert(m_parent == old_state);
271 m_parent->m_state.exchange(nullptr, std::memory_order::acquire);
272 return false;
273 }
274 return true;
275 }
276 constexpr void await_resume() const noexcept {}
277
278 single_consumer_auto_reset_event* m_parent;
279 dispatcher* m_dispatcher;
280 coroutine_handle<> m_handle{};
281 };
282 };
283
307 public:
312 explicit constexpr multi_consumer_event(bool set_initially = false) noexcept
313 : m_state(set_initially ? this : nullptr) {}
314#ifndef NDEBUG
315 ~multi_consumer_event() noexcept { assert(!is_awaited()); }
316#endif
317
322 [[nodiscard]] bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; }
323
328 [[nodiscard]] bool is_awaited() const noexcept {
329 auto ptr = m_state.load(std::memory_order::acquire);
330 return ptr != nullptr && ptr != this;
331 }
332
339 bool set(dispatcher* resume_dispatcher = nullptr) noexcept {
340 auto state = m_state.exchange(this, std::memory_order::acq_rel);
341 if (state == this) return false;
342 auto await = static_cast<awaiter*>(state);
343 while (await != nullptr) {
344 auto next = await->m_next;
345 assert(await->m_parent == this);
346 assert(await->m_handle);
347 if (await->m_dispatcher != nullptr) {
348 await->m_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
349 } else if (resume_dispatcher != nullptr) {
350 resume_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
351 } else {
352 await->m_handle.resume();
353 }
354 await = next;
355 }
356 return true;
357 }
358
362 void reset() noexcept {
363 void* old_state = this;
364 m_state.compare_exchange_strong(old_state, nullptr, std::memory_order::relaxed);
365 }
366
375 [[nodiscard]] auto operator co_await() noexcept { return awaiter{this, dispatcher::current()}; }
376
387 [[nodiscard]] constexpr auto wait(dispatcher* resume_dispatcher = nullptr) noexcept {
388 return awaiter{this, resume_dispatcher};
389 }
390
391 private:
392 /* nullptr => unset
393 * this => set
394 * x => head of awaiter* list
395 */
396 std::atomic<void*> m_state;
397
398 struct [[nodiscard]] awaiter {
399 explicit constexpr awaiter(multi_consumer_event* parent, dispatcher* dispatcher) noexcept
400 : m_parent(parent), m_dispatcher(dispatcher) {}
401 [[nodiscard]] bool await_ready() const noexcept { return m_parent->is_set(); }
402 [[nodiscard]] bool await_suspend(coroutine_handle<> hdl) noexcept {
403 m_handle = hdl;
404 void* old_state = m_parent->m_state.load(std::memory_order::acquire);
405 do {
406 // event became set
407 if (old_state == m_parent) return false;
408 m_next = static_cast<awaiter*>(old_state);
409 } while (!m_parent->m_state.compare_exchange_weak( //
410 old_state, this, std::memory_order::release, std::memory_order::acquire));
411 return true;
412 }
413 constexpr void await_resume() const noexcept {}
414
415 multi_consumer_event* m_parent;
416 dispatcher* m_dispatcher;
417 awaiter* m_next{nullptr};
418 coroutine_handle<> m_handle{};
419 };
420 };
421
451 explicit constexpr multi_consumer_auto_reset_event(bool set_initially = false) noexcept
452 : m_state(set_initially ? this : nullptr) {}
453#ifndef NDEBUG
454 ~multi_consumer_auto_reset_event() noexcept { assert(!is_awaited()); }
455#endif
456
461 [[nodiscard]] bool is_set() const noexcept { return m_state.load(std::memory_order::acquire) == this; }
462
467 [[nodiscard]] bool is_awaited() const noexcept {
468 auto ptr = m_state.load(std::memory_order::acquire);
469 return ptr != nullptr && ptr != this;
470 }
471
478 bool set(dispatcher* resume_dispatcher = nullptr) noexcept {
479 auto state = m_state.exchange(this, std::memory_order::acq_rel);
480 if (state == this || state == nullptr) return false;
481 auto await = static_cast<awaiter*>(state);
482
483 // Only modify the state if it has not been changed in between
484 state = this;
485 m_state.compare_exchange_strong(state, nullptr, std::memory_order::acq_rel);
486
487 while (await != nullptr) {
488 auto next = await->m_next;
489 assert(await->m_parent == this);
490 assert(await->m_handle);
491 if (await->m_dispatcher != nullptr) {
492 await->m_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
493 } else if (resume_dispatcher != nullptr) {
494 resume_dispatcher->push([hdl = await->m_handle]() mutable { hdl.resume(); });
495 } else {
496 await->m_handle.resume();
497 }
498 await = next;
499 }
500 return true;
501 }
502
506 void reset() noexcept {
507 void* old_state = this;
508 m_state.compare_exchange_strong(old_state, nullptr, std::memory_order::relaxed);
509 }
510
519 [[nodiscard]] auto operator co_await() noexcept { return awaiter{this, dispatcher::current()}; }
520
531 [[nodiscard]] constexpr auto wait(dispatcher* resume_dispatcher = nullptr) noexcept {
532 return awaiter{this, resume_dispatcher};
533 }
534
535 private:
536 /* nullptr => unset
537 * this => set
538 * x => head of awaiter* list
539 */
540 std::atomic<void*> m_state;
541
542 struct [[nodiscard]] awaiter {
543 explicit constexpr awaiter(multi_consumer_auto_reset_event* parent, dispatcher* dispatcher) noexcept
544 : m_parent(parent), m_dispatcher(dispatcher) {}
545 [[nodiscard]] bool await_ready() const noexcept { return m_parent->is_set(); }
546 [[nodiscard]] bool await_suspend(coroutine_handle<> hdl) noexcept {
547 m_handle = hdl;
548 void* old_state = m_parent->m_state.load(std::memory_order::acquire);
549 do {
550 // event became set
551 if (old_state == m_parent) return false;
552 m_next = static_cast<awaiter*>(old_state);
553 } while (!m_parent->m_state.compare_exchange_weak( //
554 old_state, this, std::memory_order::release, std::memory_order::acquire));
555 return true;
556 }
557 constexpr void await_resume() const noexcept {}
558
560 dispatcher* m_dispatcher{};
561 awaiter* m_next{nullptr};
562 coroutine_handle<> m_handle{};
563 };
564 };
565} // namespace asyncpp
Basic dispatcher interface class.
Definition dispatcher.h:8
static dispatcher * current() noexcept
Definition dispatcher.h:48
Simple auto reset event supporting multiple consumers.
Definition event.h:446
Simple manual reset event supporting multiple consumers.
Definition event.h:306
bool is_awaited() const noexcept
Query if the event is currently being awaited.
Definition event.h:328
void reset() noexcept
Reset the event back to unset.
Definition event.h:362
bool set(dispatcher *resume_dispatcher=nullptr) noexcept
Set the event.
Definition event.h:339
bool is_set() const noexcept
Query if the event is currently set.
Definition event.h:322
constexpr multi_consumer_event(bool set_initially=false) noexcept
Construct a new event.
Definition event.h:312
constexpr auto wait(dispatcher *resume_dispatcher=nullptr) noexcept
Suspend the current coroutine until the event is set.
Definition event.h:387
Simple auto reset event supporting a single consumer.
Definition event.h:166
void reset() noexcept
Reset the event back to unset.
Definition event.h:224
bool is_set() const noexcept
Query if the event is currently set.
Definition event.h:181
constexpr single_consumer_auto_reset_event(bool set_initially=false) noexcept
Construct a new event.
Definition event.h:172
constexpr auto wait(dispatcher *resume_dispatcher=nullptr) noexcept
Suspend the current coroutine until the event is set.
Definition event.h:249
bool is_awaited() const noexcept
Query if the event is currently being awaited.
Definition event.h:187
bool set(dispatcher *resume_dispatcher=nullptr) noexcept
Set the event.
Definition event.h:198
Simple manual reset event supporting a single consumer.
Definition event.h:30
bool is_set() const noexcept
Query if the event is currently set.
Definition event.h:45
bool set(dispatcher *resume_dispatcher=nullptr) noexcept
Set the event.
Definition event.h:62
bool is_awaited() const noexcept
Query if the event is currently being awaited.
Definition event.h:51
void reset() noexcept
Reset the event back to unset.
Definition event.h:83
constexpr single_consumer_event(bool set_initially=false) noexcept
Construct a new event.
Definition event.h:36
constexpr auto wait(dispatcher *resume_dispatcher=nullptr) noexcept
Suspend the current coroutine until the event is set.
Definition event.h:108
Provides a consistent import interface for coroutine, experimental/coroutine or a best effort fallbac...