Async++ unknown
Async (co_await/co_return) code for C++
Loading...
Searching...
No Matches
queue.h
1#pragma once
2#include <asyncpp/detail/concepts.h>
4#include <asyncpp/dispatcher.h>
5
6#include <cassert>
7#include <cstdio>
8#include <mutex>
9#include <optional>
10#include <queue>
11
12namespace asyncpp {
13
17 template<typename T, typename TContainer = std::deque<T>>
18 class queue {
19 class pop_awaiter;
20 class push_awaiter;
21
22 mutable std::mutex m_mtx;
23 std::queue<T, TContainer> m_queue;
24 size_t m_max_size;
25 dispatcher* m_dispatcher;
26 pop_awaiter* m_pop_list{};
27 push_awaiter* m_push_list{};
28
29 public:
36 template<typename... Args>
37 explicit queue(size_t max_size = std::numeric_limits<size_t>::max(), dispatcher* disp = nullptr, Args&&... args)
38 : m_queue(std::forward<Args>(args)...), m_max_size(max_size), m_dispatcher{disp} {}
39
40 queue(const queue&) = delete;
41 queue(queue&&) noexcept;
42 queue& operator=(const queue&) = delete;
43 queue& operator=(queue&&) noexcept;
44
51 [[nodiscard]] size_t size() const noexcept;
55 [[nodiscard]] bool empty() const noexcept;
56
62 [[nodiscard]] bool try_push(T&& value);
63
69 template<typename... Args>
70 [[nodiscard]] bool try_emplace(Args&&... args);
71
76 [[nodiscard]] std::optional<T> try_pop();
77
81 [[nodiscard]] pop_awaiter pop();
82
87 [[nodiscard]] push_awaiter push(T&& value);
88
92 void clear();
93 };
94
95 template<typename T, typename TContainer>
96 class queue<T, TContainer>::pop_awaiter {
97 queue* m_parent;
98
99 pop_awaiter* m_next{};
100 coroutine_handle<> m_handle;
101 dispatcher* m_dispatcher = dispatcher::current();
102 std::optional<T> m_result{std::nullopt};
103
104 friend class queue<T, TContainer>::push_awaiter;
105 friend class queue<T, TContainer>;
106
107 void resume(T&& value);
108
109 public:
110 explicit pop_awaiter(queue* parent) noexcept : m_parent(parent) {}
111
117 m_dispatcher = dsp;
118 return *this;
119 }
120
121 [[nodiscard]] bool await_ready();
122 [[nodiscard]] bool await_suspend(coroutine_handle<> hndl);
123 [[nodiscard]] std::optional<T> await_resume();
124 };
125
126 template<typename T, typename TContainer>
127 class queue<T, TContainer>::push_awaiter {
128 queue* m_parent;
129 T m_value;
130
131 push_awaiter* m_next{};
132 coroutine_handle<> m_handle;
133 dispatcher* m_dispatcher = dispatcher::current();
134 bool m_result{true};
135
136 friend class queue<T, TContainer>::pop_awaiter;
137 friend class queue<T, TContainer>;
138
139 void resume();
140
141 public:
142 explicit push_awaiter(queue* parent, T&& value) noexcept : m_parent(parent), m_value(std::forward<T>(value)) {}
143
149 m_dispatcher = dsp;
150 return *this;
151 }
152
153 [[nodiscard]] bool await_ready();
154 void await_suspend(coroutine_handle<> hndl);
155 bool await_resume();
156 };
157
158 template<typename T, typename TContainer>
159 inline queue<T, TContainer>::queue(queue&& other) noexcept {
160 std::scoped_lock lck(other.m_mtx);
161 m_queue = std::move(other.m_queue);
162 m_max_size = other.m_max_size;
163 m_dispatcher = other.m_dispatcher;
164 // NOTE: We do not bother to update the m_parent pointer inside the awaitables,
165 // because its not used after await_suspend returned.
166 m_pop_list = other.m_pop_list;
167 other.m_pop_list = nullptr;
168 m_push_list = other.m_push_list;
169 other.m_push_list = nullptr;
170 }
171
172 template<typename T, typename TContainer>
173 inline queue<T, TContainer>& queue<T, TContainer>::operator=(queue&& other) noexcept {
174 pop_awaiter* old_pop = nullptr;
175 push_awaiter* old_push = nullptr;
176 {
177 std::scoped_lock lck(m_mtx, other.m_mtx);
178 m_queue = std::move(other.m_queue);
179 m_max_size = other.m_max_size;
180 m_dispatcher = other.m_dispatcher;
181 // NOTE: We do not bother to update the m_parent pointer inside the awaitables,
182 // because its not used after await_suspend returned.
183 old_pop = m_pop_list;
184 m_pop_list = other.m_pop_list;
185 other.m_pop_list = nullptr;
186 old_push = m_push_list;
187 m_push_list = other.m_push_list;
188 other.m_push_list = nullptr;
189 }
190 // Fail all pop operations on the moved to queue
191 while (old_pop != nullptr) {
192 auto await = old_pop;
193 old_pop = old_pop->m_next;
194 await->m_result.reset();
195 if (await->m_dispatcher != nullptr)
196 await->m_dispatcher->push([await]() { await->m_handle.resume(); });
197 else
198 await->m_handle.resume();
199 }
200 // Fail all push operations on the moved to queue
201 while (old_push != nullptr) {
202 auto await = old_push;
203 old_push = old_push->m_next;
204 await->m_result = false;
205 await->resume();
206 }
207 return *this;
208 }
209
210 template<typename T, typename TContainer>
211 inline size_t queue<T, TContainer>::size() const noexcept {
212 std::unique_lock lck{m_mtx};
213 size_t cnt = m_queue.size();
214 for (auto pa = m_push_list; pa != nullptr; pa = pa->m_next)
215 cnt++;
216 return cnt;
217 }
218
219 template<typename T, typename TContainer>
220 inline bool queue<T, TContainer>::empty() const noexcept {
221 std::unique_lock lck{m_mtx};
222 return m_queue.empty() && m_push_list == nullptr;
223 }
224
225 template<typename T, typename TContainer>
226 inline bool queue<T, TContainer>::try_push(T&& value) {
227 std::unique_lock lck{m_mtx};
228 // Early exit if we are oversized
229 if (m_queue.size() >= m_max_size) return false;
230 // Check if there is an task waiting in pop
231 if (auto awaiter = m_pop_list; awaiter != nullptr) {
232 m_pop_list = awaiter->m_next;
233 // There should never be a task waiting in pop with a non empty queue
234 assert(m_queue.empty());
235 lck.unlock();
236 awaiter->resume(std::forward<T>(value));
237 } else {
238 // There is noone waiting for the data (yet), so we just queue it
239 m_queue.push(std::forward<T>(value));
240 }
241 return true;
242 }
243
244 template<typename T, typename TContainer>
245 template<typename... Args>
246 inline bool queue<T, TContainer>::try_emplace(Args&&... args) {
247 std::unique_lock lck{m_mtx};
248 // Early exit if we are oversized
249 if (m_queue.size() >= m_max_size) return false;
250 // Check if there is an task waiting in pop
251 if (auto awaiter = m_pop_list; awaiter != nullptr) {
252 m_pop_list = awaiter->m_next;
253 // There should never be a task waiting in pop with a non empty queue
254 assert(m_queue.empty());
255 lck.unlock();
256 awaiter->resume(T{std::forward<Args>(args)...});
257 } else {
258 // There is noone waiting for the data (yet), so we just queue it
259 m_queue.emplace(std::forward<Args>(args)...);
260 }
261 return true;
262 }
263
264 template<typename T, typename TContainer>
265 inline std::optional<T> queue<T, TContainer>::try_pop() {
266 std::unique_lock lck{m_mtx};
267 // Early out if the queue is empty
268 if (m_queue.empty()) return std::nullopt;
269 // Pop the first value
270 std::optional<T> res{std::move(m_queue.front())};
271 m_queue.pop();
272 // Check if there is someone waiting in push
273 if (auto awaiter = m_push_list; awaiter != nullptr) {
274 m_push_list = awaiter->m_next;
275 // This can only happen if the queue used to be full
276 assert(m_queue.size() == m_max_size - 1);
277 m_queue.push(std::move(awaiter->m_value));
278 lck.unlock();
279 // Resume the first task waiting to push a value
280 awaiter->resume();
281 }
282 return res;
283 }
284
285 template<typename T, typename TContainer>
289
290 template<typename T, typename TContainer>
292 return push_awaiter{this, std::forward<T>(value)};
293 }
294
295 template<typename T, typename TContainer>
297 std::unique_lock lck{m_mtx};
298 while (!m_queue.empty())
299 m_queue.pop();
300 // Check if there is someone waiting in push
301 auto push_list = m_push_list;
302 m_push_list = nullptr;
303 lck.unlock();
304
305 while (push_list != nullptr) {
306 const auto await = push_list;
307 push_list = push_list->m_next;
308 await->m_result = false;
309 await->resume();
310 }
311 }
312
313 template<typename T, typename TContainer>
314 inline void queue<T, TContainer>::pop_awaiter::resume(T&& value) {
315 // NOTE: Because we do not update the m_parent variable in move constructors we can not use it here
316 // Move the value right to this awaiter
317 m_result.emplace(std::forward<T>(value));
318 // And resume
319 if (m_dispatcher != nullptr)
320 m_dispatcher->push([this]() { m_handle.resume(); });
321 else
322 m_handle.resume();
323 }
324
325 template<typename T, typename TContainer>
326 inline bool queue<T, TContainer>::pop_awaiter::await_ready() {
327 std::unique_lock lck{m_parent->m_mtx};
328 if (m_parent->m_queue.empty()) {
329 lck.release(); // We unlock inside suspend
330 return false;
331 }
332 m_result.emplace(std::move(m_parent->m_queue.front()));
333 m_parent->m_queue.pop();
334 // Check if there is someone waiting in push
335 auto awaiter = m_parent->m_push_list;
336 if (awaiter != nullptr) {
337 m_parent->m_push_list = awaiter->m_next;
338 // This can only happen if the queue used to be full
339 assert(m_parent->m_queue.size() == m_parent->m_max_size - 1);
340 m_parent->m_queue.push(std::move(awaiter->m_value));
341 lck.unlock();
342 // Resume the first task waiting to push a value
343 awaiter->resume();
344 }
345 return true;
346 }
347
348 template<typename T, typename TContainer>
349 inline bool queue<T, TContainer>::pop_awaiter::await_suspend(coroutine_handle<> hndl) {
350 m_handle = hndl;
351 auto val = m_parent->m_pop_list;
352 while (val != nullptr && val->m_next != nullptr)
353 val = val->m_next;
354 if (val == nullptr)
355 m_parent->m_pop_list = this;
356 else
357 val->m_next = this;
358 // Copy dispatcher from parent if needed to allow for easier move constructor
359 if (m_dispatcher == nullptr) m_dispatcher = m_parent->m_dispatcher;
360 // Unlock the mutex locked in await_ready
361 m_parent->m_mtx.unlock();
362 return true;
363 }
364
365 template<typename T, typename TContainer>
366 inline std::optional<T> queue<T, TContainer>::pop_awaiter::await_resume() {
367 // NOTE: Because we do not update the m_parent variable in move constructors we can not use it here
368 return std::move(m_result);
369 }
370
371 template<typename T, typename TContainer>
372 inline void queue<T, TContainer>::push_awaiter::resume() {
373 // NOTE: Because we do not update the m_parent variable in move constructors we can not use it here
374 if (m_dispatcher != nullptr)
375 m_dispatcher->push([this]() { m_handle.resume(); });
376 else
377 m_handle.resume();
378 }
379
380 template<typename T, typename TContainer>
381 inline bool queue<T, TContainer>::push_awaiter::await_ready() {
382 std::unique_lock lck{m_parent->m_mtx};
383 // If there is a task waiting to pop
384 if (m_parent->m_pop_list != nullptr) {
385 assert(m_parent->m_queue.empty());
386 auto awaiter = m_parent->m_pop_list;
387 m_parent->m_pop_list = awaiter->m_next;
388 lck.unlock();
389 awaiter->resume(std::move(m_value));
390 return true;
391 }
392 // If there is space in the queue push to it
393 if (m_parent->m_queue.size() < m_parent->m_max_size) {
394 m_parent->m_queue.push(std::move(m_value));
395 return true;
396 }
397 // Otherwise we suspend
398 lck.release();
399 return false;
400 }
401
402 template<typename T, typename TContainer>
403 inline void queue<T, TContainer>::push_awaiter::await_suspend(coroutine_handle<> hndl) {
404 m_handle = hndl;
405 auto val = m_parent->m_push_list;
406 while (val != nullptr && val->m_next != nullptr)
407 val = val->m_next;
408 if (val == nullptr)
409 m_parent->m_push_list = this;
410 else
411 val->m_next = this;
412 // Copy dispatcher from parent if needed to allow for easier move constructor
413 if (m_dispatcher == nullptr) m_dispatcher = m_parent->m_dispatcher;
414 // Unlock the mutex locked in await_ready
415 m_parent->m_mtx.unlock();
416 }
417
418 template<typename T, typename TContainer>
419 inline bool queue<T, TContainer>::push_awaiter::await_resume() {
420 // NOTE: Because we do not update the m_parent variable in move constructors we can not use it here
421 return m_result;
422 }
423} // namespace asyncpp
Basic dispatcher interface class.
Definition dispatcher.h:8
static dispatcher * current() noexcept
Definition dispatcher.h:48
virtual void push(std::function< void()> cbfn)=0
Definition queue.h:96
pop_awaiter & resume_on(dispatcher *dsp) noexcept
Set a different dispatcher to resume this operation on.
Definition queue.h:116
Definition queue.h:127
push_awaiter & resume_on(dispatcher *dsp) noexcept
Set a different dispatcher to resume this operation on.
Definition queue.h:148
Queue for sharing items between multiple producers/consumers.
Definition queue.h:18
bool try_emplace(Args &&... args)
Try pushing a new item to the queue.
Definition queue.h:246
queue(size_t max_size=std::numeric_limits< size_t >::max(), dispatcher *disp=nullptr, Args &&... args)
Construct a new queue.
Definition queue.h:37
void clear()
Clear the queue.
Definition queue.h:296
pop_awaiter pop()
Pop a value from the queue and suspend if the queue is empty.
Definition queue.h:286
std::optional< T > try_pop()
Try poping a item from the queue.
Definition queue.h:265
push_awaiter push(T &&value)
Push a value to the queue and suspend if the queue is full.
Definition queue.h:291
bool empty() const noexcept
True if the queue is empty and calling pop() would suspend.
Definition queue.h:220
bool try_push(T &&value)
Try pushing a new item to the queue.
Definition queue.h:226
size_t size() const noexcept
Get the number of items available without suspending.
Definition queue.h:211
Provides a consistent import interface for coroutine, experimental/coroutine or a best effort fallbac...