2#include <asyncpp/detail/concepts.h>
4#include <asyncpp/dispatcher.h>
26 std::atomic<bool> m_closed{
false};
34 assert(m_reader_list ==
nullptr && m_writer_list ==
nullptr &&
"channel destroyed with waiting coroutines");
49 [[nodiscard]] std::optional<T>
try_read();
78 [[nodiscard]]
bool is_closed() const noexcept {
return m_closed.load(std::memory_order::relaxed); }
86 coroutine_handle<> m_handle;
88 std::optional<T> m_result{std::nullopt};
103 [[nodiscard]]
constexpr bool await_ready() const noexcept;
104 bool await_suspend(coroutine_handle<> hndl);
105 std::optional<T> await_resume();
114 coroutine_handle<> m_handle;
116 bool m_result{
false};
131 [[nodiscard]]
constexpr bool await_ready() const noexcept;
132 bool await_suspend(coroutine_handle<> hndl);
143 if (m_closed.load(std::memory_order::relaxed))
return std::nullopt;
144 std::unique_lock lck{m_mtx};
146 if (
auto wrt = m_writer_list; wrt !=
nullptr && !m_closed.load(std::memory_order::relaxed)) {
148 m_writer_list = wrt->m_next;
151 std::optional<T> res = std::move(wrt->m_value);
152 wrt->m_result =
true;
154 if (wrt->m_dispatcher !=
nullptr)
155 wrt->m_dispatcher->push([wrt]() { wrt->m_handle.resume(); });
157 wrt->m_handle.resume();
170 if (m_closed.load(std::memory_order::relaxed))
return false;
171 std::unique_lock lck{m_mtx};
173 if (
auto rdr = m_reader_list; rdr !=
nullptr && !m_closed.load(std::memory_order::relaxed)) {
175 m_reader_list = rdr->m_next;
178 rdr->m_result = std::move(value);
180 if (rdr->m_dispatcher !=
nullptr)
181 rdr->m_dispatcher->push([rdr]() { rdr->m_handle.resume(); });
183 rdr->m_handle.resume();
191 if (m_closed.load(std::memory_order::relaxed))
return;
192 std::unique_lock lck{m_mtx};
193 if (!m_closed.exchange(
true)) {
195 while (m_reader_list !=
nullptr) {
196 auto rdr = m_reader_list;
197 m_reader_list = rdr->m_next;
200 rdr->m_result.reset();
202 if (rdr->m_dispatcher !=
nullptr)
203 rdr->m_dispatcher->push([rdr]() { rdr->m_handle.resume(); });
205 rdr->m_handle.resume();
207 while (m_writer_list !=
nullptr) {
208 auto wrt = m_writer_list;
209 m_writer_list = wrt->m_next;
212 wrt->m_result =
false;
214 if (wrt->m_dispatcher !=
nullptr)
215 wrt->m_dispatcher->push([wrt]() { wrt->m_handle.resume(); });
217 wrt->m_handle.resume();
224 return m_parent->m_closed.load(std::memory_order::relaxed);
228 inline bool channel<T>::read_awaiter::await_suspend(coroutine_handle<> hndl) {
231 std::unique_lock lck{m_parent->m_mtx};
233 if (
auto wrt = m_parent->m_writer_list; wrt !=
nullptr) {
235 m_parent->m_writer_list = wrt->m_next;
238 m_result = std::move(wrt->m_value);
239 wrt->m_result =
true;
241 if (wrt->m_dispatcher !=
nullptr)
242 wrt->m_dispatcher->push([wrt]() { wrt->m_handle.resume(); });
244 wrt->m_handle.resume();
250 auto last = m_parent->m_reader_list;
251 while (last && last->m_next)
254 m_parent->m_reader_list =
this;
261 inline std::optional<T> channel<T>::read_awaiter::await_resume() {
262 return std::move(m_result);
266 inline constexpr bool channel<T>::write_awaiter::await_ready() const noexcept {
267 return m_parent->m_closed.load(std::memory_order::relaxed);
271 inline bool channel<T>::write_awaiter::await_suspend(coroutine_handle<> hndl) {
274 if (m_parent->m_closed.load(std::memory_order::relaxed))
return false;
275 std::unique_lock lck{m_parent->m_mtx};
276 if (m_parent->m_closed.load(std::memory_order::relaxed))
return false;
278 if (
auto rdr = m_parent->m_reader_list; rdr !=
nullptr) {
280 m_parent->m_reader_list = rdr->m_next;
283 rdr->m_result = std::move(m_value);
285 if (rdr->m_dispatcher !=
nullptr)
286 rdr->m_dispatcher->push([rdr]() { rdr->m_handle.resume(); });
288 rdr->m_handle.resume();
294 auto last = m_parent->m_writer_list;
295 while (last && last->m_next)
298 m_parent->m_writer_list =
this;
305 inline bool channel<T>::write_awaiter::await_resume() {
Channel for communication between coroutines.
Definition channel.h:22
read_awaiter read()
Read from the channel.
Definition channel.h:137
bool is_closed() const noexcept
Check if the channel is closed.
Definition channel.h:78
bool try_write(T value)
Attempt to write a value without suspending. If the channel is closed or no reader is suspended this ...
Definition channel.h:169
std::optional< T > try_read()
Attempt to read a value without suspending. If the channel is closed or no writer is suspended this r...
Definition channel.h:142
void close()
Close the channel.
Definition channel.h:190
write_awaiter write(T value)
Write to the channel.
Definition channel.h:164
Basic dispatcher interface class.
Definition dispatcher.h:8
static dispatcher * current() noexcept
Definition dispatcher.h:48
Provides a consistent import interface for coroutine, experimental/coroutine or a best effort fallbac...
read_awaiter & resume_on(dispatcher *dsp) noexcept
Specify a dispatcher to resume after on after reading.
Definition channel.h:98
write_awaiter & resume_on(dispatcher *dsp) noexcept
Specify a dispatcher to resume after on after writing.
Definition channel.h:126