Async++ unknown
Async (co_await/co_return) code for C++
Loading...
Searching...
No Matches
channel.h
1#pragma once
2#include <asyncpp/detail/concepts.h>
4#include <asyncpp/dispatcher.h>
5
6#include <atomic>
7#include <cassert>
8#include <mutex>
9#include <optional>
10
11namespace asyncpp {
12
21 template<typename T>
22 class channel {
23 struct read_awaiter;
24 struct write_awaiter;
25
26 std::atomic<bool> m_closed{false};
27 std::mutex m_mtx;
28 read_awaiter* m_reader_list{};
29 write_awaiter* m_writer_list{};
30
31 public:
32#ifndef NDEBUG
33 ~channel() {
34 assert(m_reader_list == nullptr && m_writer_list == nullptr && "channel destroyed with waiting coroutines");
35 }
36#endif
37
44 [[nodiscard]] read_awaiter read();
49 [[nodiscard]] std::optional<T> try_read();
50
56 [[nodiscard]] write_awaiter write(T value);
61 [[nodiscard]] bool try_write(T value);
62
68 void close();
69
78 [[nodiscard]] bool is_closed() const noexcept { return m_closed.load(std::memory_order::relaxed); }
79 };
80
81 template<typename T>
82 struct channel<T>::read_awaiter {
83 channel* m_parent;
84
85 read_awaiter* m_next{};
86 coroutine_handle<> m_handle;
87 dispatcher* m_dispatcher = dispatcher::current();
88 std::optional<T> m_result{std::nullopt};
89
99 m_dispatcher = dsp;
100 return *this;
101 }
102
103 [[nodiscard]] constexpr bool await_ready() const noexcept;
104 bool await_suspend(coroutine_handle<> hndl);
105 std::optional<T> await_resume();
106 };
107
108 template<typename T>
110 channel* m_parent;
111 T m_value;
112
113 write_awaiter* m_next{};
114 coroutine_handle<> m_handle;
115 dispatcher* m_dispatcher = dispatcher::current();
116 bool m_result{false};
117
127 m_dispatcher = dsp;
128 return *this;
129 }
130
131 [[nodiscard]] constexpr bool await_ready() const noexcept;
132 bool await_suspend(coroutine_handle<> hndl);
133 bool await_resume();
134 };
135
136 template<typename T>
137 inline typename channel<T>::read_awaiter channel<T>::read() {
138 return read_awaiter{this};
139 }
140
141 template<typename T>
142 inline std::optional<T> channel<T>::try_read() {
143 if (m_closed.load(std::memory_order::relaxed)) return std::nullopt;
144 std::unique_lock lck{m_mtx};
145 // Check if there is a writer waiting
146 if (auto wrt = m_writer_list; wrt != nullptr && !m_closed.load(std::memory_order::relaxed)) {
147 // Unhook writer
148 m_writer_list = wrt->m_next;
149 lck.unlock();
150 // Take the value out
151 std::optional<T> res = std::move(wrt->m_value);
152 wrt->m_result = true;
153 // Resume writer
154 if (wrt->m_dispatcher != nullptr)
155 wrt->m_dispatcher->push([wrt]() { wrt->m_handle.resume(); });
156 else
157 wrt->m_handle.resume();
158 return res;
159 }
160 return std::nullopt;
161 }
162
163 template<typename T>
165 return write_awaiter{this, std::move(value)};
166 }
167
168 template<typename T>
169 inline bool channel<T>::try_write(T value) {
170 if (m_closed.load(std::memory_order::relaxed)) return false;
171 std::unique_lock lck{m_mtx};
172 // Check if there is a reader waiting
173 if (auto rdr = m_reader_list; rdr != nullptr && !m_closed.load(std::memory_order::relaxed)) {
174 // Unhook reader
175 m_reader_list = rdr->m_next;
176 lck.unlock();
177 // Take the value out
178 rdr->m_result = std::move(value);
179 // Resume writer
180 if (rdr->m_dispatcher != nullptr)
181 rdr->m_dispatcher->push([rdr]() { rdr->m_handle.resume(); });
182 else
183 rdr->m_handle.resume();
184 return true;
185 }
186 return false;
187 }
188
189 template<typename T>
190 inline void channel<T>::close() {
191 if (m_closed.load(std::memory_order::relaxed)) return;
192 std::unique_lock lck{m_mtx};
193 if (!m_closed.exchange(true)) {
194 // This is the first close, so cancel all waiting awaiters
195 while (m_reader_list != nullptr) {
196 auto rdr = m_reader_list;
197 m_reader_list = rdr->m_next;
198 // We do not need to unlock, because all further attempts to access this
199 // cannel will get refused.
200 rdr->m_result.reset();
201 // Resume reader
202 if (rdr->m_dispatcher != nullptr)
203 rdr->m_dispatcher->push([rdr]() { rdr->m_handle.resume(); });
204 else
205 rdr->m_handle.resume();
206 }
207 while (m_writer_list != nullptr) {
208 auto wrt = m_writer_list;
209 m_writer_list = wrt->m_next;
210 // We do not need to unlock, because all further attempts to access this
211 // cannel will get refused.
212 wrt->m_result = false;
213 // Resume reader
214 if (wrt->m_dispatcher != nullptr)
215 wrt->m_dispatcher->push([wrt]() { wrt->m_handle.resume(); });
216 else
217 wrt->m_handle.resume();
218 }
219 }
220 }
221
222 template<typename T>
223 inline constexpr bool channel<T>::read_awaiter::await_ready() const noexcept {
224 return m_parent->m_closed.load(std::memory_order::relaxed);
225 }
226
227 template<typename T>
228 inline bool channel<T>::read_awaiter::await_suspend(coroutine_handle<> hndl) {
229 m_handle = hndl;
230 m_next = nullptr;
231 std::unique_lock lck{m_parent->m_mtx};
232 // Check if there is a writer waiting
233 if (auto wrt = m_parent->m_writer_list; wrt != nullptr) {
234 // Unhook writer
235 m_parent->m_writer_list = wrt->m_next;
236 lck.unlock();
237 // Take the value out
238 m_result = std::move(wrt->m_value);
239 wrt->m_result = true;
240 // Resume writer
241 if (wrt->m_dispatcher != nullptr)
242 wrt->m_dispatcher->push([wrt]() { wrt->m_handle.resume(); });
243 else
244 wrt->m_handle.resume();
245 // Do not suspend ourself
246 return false;
247 }
248
249 // No writer available, we have to wait...
250 auto last = m_parent->m_reader_list;
251 while (last && last->m_next)
252 last = last->m_next;
253 if (last == nullptr)
254 m_parent->m_reader_list = this;
255 else
256 last->m_next = this;
257 return true;
258 }
259
260 template<typename T>
261 inline std::optional<T> channel<T>::read_awaiter::await_resume() {
262 return std::move(m_result);
263 }
264
265 template<typename T>
266 inline constexpr bool channel<T>::write_awaiter::await_ready() const noexcept {
267 return m_parent->m_closed.load(std::memory_order::relaxed);
268 }
269
270 template<typename T>
271 inline bool channel<T>::write_awaiter::await_suspend(coroutine_handle<> hndl) {
272 m_handle = hndl;
273 m_next = nullptr;
274 if (m_parent->m_closed.load(std::memory_order::relaxed)) return false;
275 std::unique_lock lck{m_parent->m_mtx};
276 if (m_parent->m_closed.load(std::memory_order::relaxed)) return false;
277 // Check if there is a reader waiting
278 if (auto rdr = m_parent->m_reader_list; rdr != nullptr) {
279 // Unhook reader
280 m_parent->m_reader_list = rdr->m_next;
281 lck.unlock();
282 // Copy the value over
283 rdr->m_result = std::move(m_value);
284 // Resume reader
285 if (rdr->m_dispatcher != nullptr)
286 rdr->m_dispatcher->push([rdr]() { rdr->m_handle.resume(); });
287 else
288 rdr->m_handle.resume();
289 // Do not suspend ourself
290 return false;
291 }
292
293 // No reader available, we have to wait...
294 auto last = m_parent->m_writer_list;
295 while (last && last->m_next)
296 last = last->m_next;
297 if (last == nullptr)
298 m_parent->m_writer_list = this;
299 else
300 last->m_next = this;
301 return true;
302 }
303
304 template<typename T>
305 inline bool channel<T>::write_awaiter::await_resume() {
306 return m_result;
307 }
308} // namespace asyncpp
Channel for communication between coroutines.
Definition channel.h:22
read_awaiter read()
Read from the channel.
Definition channel.h:137
bool is_closed() const noexcept
Check if the channel is closed.
Definition channel.h:78
bool try_write(T value)
Attempt to write a value without suspending. If the channel is closed or no reader is suspended this ...
Definition channel.h:169
std::optional< T > try_read()
Attempt to read a value without suspending. If the channel is closed or no writer is suspended this r...
Definition channel.h:142
void close()
Close the channel.
Definition channel.h:190
write_awaiter write(T value)
Write to the channel.
Definition channel.h:164
Basic dispatcher interface class.
Definition dispatcher.h:8
static dispatcher * current() noexcept
Definition dispatcher.h:48
Provides a consistent import interface for coroutine, experimental/coroutine or a best effort fallbac...
Definition channel.h:82
read_awaiter & resume_on(dispatcher *dsp) noexcept
Specify a dispatcher to resume after on after reading.
Definition channel.h:98
Definition channel.h:109
write_awaiter & resume_on(dispatcher *dsp) noexcept
Specify a dispatcher to resume after on after writing.
Definition channel.h:126