2#include <asyncpp/detail/concepts.h>
4#include <asyncpp/dispatcher.h>
17 template<
typename T,
typename TContainer = std::deque<T>>
22 mutable std::mutex m_mtx;
23 std::queue<T, TContainer> m_queue;
36 template<
typename... Args>
37 explicit queue(
size_t max_size = std::numeric_limits<size_t>::max(),
dispatcher* disp =
nullptr, Args&&... args)
38 : m_queue(std::forward<Args>(args)...), m_max_size(max_size), m_dispatcher{disp} {}
51 [[nodiscard]]
size_t size() const noexcept;
55 [[nodiscard]]
bool empty() const noexcept;
62 [[nodiscard]]
bool try_push(T&& value);
69 template<typename... Args>
76 [[nodiscard]] std::optional<T>
try_pop();
81 [[nodiscard]] pop_awaiter
pop();
87 [[nodiscard]] push_awaiter
push(T&& value);
95 template<typename T, typename TContainer>
100 coroutine_handle<> m_handle;
102 std::optional<T> m_result{std::nullopt};
105 friend class
queue<T, TContainer>;
107 void resume(T&& value);
110 explicit
pop_awaiter(queue* parent) noexcept : m_parent(parent) {}
121 [[nodiscard]]
bool await_ready();
122 [[nodiscard]]
bool await_suspend(coroutine_handle<> hndl);
123 [[nodiscard]] std::optional<T> await_resume();
126 template<
typename T,
typename TContainer>
132 coroutine_handle<> m_handle;
137 friend class
queue<T, TContainer>;
142 explicit
push_awaiter(queue* parent, T&& value) noexcept : m_parent(parent), m_value(std::forward<T>(value)) {}
153 [[nodiscard]]
bool await_ready();
154 void await_suspend(coroutine_handle<> hndl);
158 template<
typename T,
typename TContainer>
160 std::scoped_lock lck(other.m_mtx);
161 m_queue = std::move(other.m_queue);
162 m_max_size = other.m_max_size;
163 m_dispatcher = other.m_dispatcher;
166 m_pop_list = other.m_pop_list;
167 other.m_pop_list =
nullptr;
168 m_push_list = other.m_push_list;
169 other.m_push_list =
nullptr;
172 template<
typename T,
typename TContainer>
173 inline queue<T, TContainer>& queue<T, TContainer>::operator=(
queue&& other)
noexcept {
174 pop_awaiter* old_pop =
nullptr;
175 push_awaiter* old_push =
nullptr;
177 std::scoped_lock lck(m_mtx, other.m_mtx);
178 m_queue = std::move(other.m_queue);
179 m_max_size = other.m_max_size;
180 m_dispatcher = other.m_dispatcher;
183 old_pop = m_pop_list;
184 m_pop_list = other.m_pop_list;
185 other.m_pop_list =
nullptr;
186 old_push = m_push_list;
187 m_push_list = other.m_push_list;
188 other.m_push_list =
nullptr;
191 while (old_pop !=
nullptr) {
192 auto await = old_pop;
193 old_pop = old_pop->m_next;
194 await->m_result.reset();
195 if (await->m_dispatcher !=
nullptr)
196 await->m_dispatcher->push([await]() { await->m_handle.resume(); });
198 await->m_handle.resume();
201 while (old_push !=
nullptr) {
202 auto await = old_push;
203 old_push = old_push->m_next;
204 await->m_result =
false;
210 template<
typename T,
typename TContainer>
212 std::unique_lock lck{m_mtx};
213 size_t cnt = m_queue.size();
214 for (
auto pa = m_push_list; pa !=
nullptr; pa = pa->m_next)
219 template<
typename T,
typename TContainer>
221 std::unique_lock lck{m_mtx};
222 return m_queue.empty() && m_push_list ==
nullptr;
225 template<
typename T,
typename TContainer>
227 std::unique_lock lck{m_mtx};
229 if (m_queue.size() >= m_max_size)
return false;
231 if (
auto awaiter = m_pop_list; awaiter !=
nullptr) {
232 m_pop_list = awaiter->m_next;
234 assert(m_queue.empty());
236 awaiter->resume(std::forward<T>(value));
239 m_queue.push(std::forward<T>(value));
244 template<
typename T,
typename TContainer>
245 template<
typename... Args>
247 std::unique_lock lck{m_mtx};
249 if (m_queue.size() >= m_max_size)
return false;
251 if (
auto awaiter = m_pop_list; awaiter !=
nullptr) {
252 m_pop_list = awaiter->m_next;
254 assert(m_queue.empty());
256 awaiter->resume(T{std::forward<Args>(args)...});
259 m_queue.emplace(std::forward<Args>(args)...);
264 template<
typename T,
typename TContainer>
266 std::unique_lock lck{m_mtx};
268 if (m_queue.empty())
return std::nullopt;
270 std::optional<T> res{std::move(m_queue.front())};
273 if (
auto awaiter = m_push_list; awaiter !=
nullptr) {
274 m_push_list = awaiter->m_next;
276 assert(m_queue.size() == m_max_size - 1);
277 m_queue.push(std::move(awaiter->m_value));
285 template<
typename T,
typename TContainer>
290 template<
typename T,
typename TContainer>
295 template<
typename T,
typename TContainer>
297 std::unique_lock lck{m_mtx};
298 while (!m_queue.empty())
301 auto push_list = m_push_list;
302 m_push_list =
nullptr;
305 while (push_list !=
nullptr) {
306 const auto await = push_list;
307 push_list = push_list->m_next;
308 await->m_result =
false;
313 template<
typename T,
typename TContainer>
317 m_result.emplace(std::forward<T>(value));
319 if (m_dispatcher !=
nullptr)
320 m_dispatcher->
push([
this]() { m_handle.resume(); });
325 template<
typename T,
typename TContainer>
326 inline bool queue<T, TContainer>::pop_awaiter::await_ready() {
327 std::unique_lock lck{m_parent->m_mtx};
328 if (m_parent->m_queue.empty()) {
332 m_result.emplace(std::move(m_parent->m_queue.front()));
333 m_parent->m_queue.pop();
335 auto awaiter = m_parent->m_push_list;
336 if (awaiter !=
nullptr) {
337 m_parent->m_push_list = awaiter->m_next;
339 assert(m_parent->m_queue.size() == m_parent->m_max_size - 1);
340 m_parent->m_queue.push(std::move(awaiter->m_value));
348 template<
typename T,
typename TContainer>
349 inline bool queue<T, TContainer>::pop_awaiter::await_suspend(coroutine_handle<> hndl) {
351 auto val = m_parent->m_pop_list;
352 while (val !=
nullptr && val->m_next !=
nullptr)
355 m_parent->m_pop_list =
this;
359 if (m_dispatcher ==
nullptr) m_dispatcher = m_parent->m_dispatcher;
361 m_parent->m_mtx.unlock();
365 template<
typename T,
typename TContainer>
366 inline std::optional<T> queue<T, TContainer>::pop_awaiter::await_resume() {
368 return std::move(m_result);
371 template<
typename T,
typename TContainer>
372 inline void queue<T, TContainer>::push_awaiter::resume() {
374 if (m_dispatcher !=
nullptr)
375 m_dispatcher->
push([
this]() { m_handle.resume(); });
380 template<
typename T,
typename TContainer>
381 inline bool queue<T, TContainer>::push_awaiter::await_ready() {
382 std::unique_lock lck{m_parent->m_mtx};
384 if (m_parent->m_pop_list !=
nullptr) {
385 assert(m_parent->m_queue.empty());
386 auto awaiter = m_parent->m_pop_list;
387 m_parent->m_pop_list = awaiter->m_next;
389 awaiter->resume(std::move(m_value));
393 if (m_parent->m_queue.size() < m_parent->m_max_size) {
394 m_parent->m_queue.push(std::move(m_value));
402 template<
typename T,
typename TContainer>
403 inline void queue<T, TContainer>::push_awaiter::await_suspend(coroutine_handle<> hndl) {
405 auto val = m_parent->m_push_list;
406 while (val !=
nullptr && val->m_next !=
nullptr)
409 m_parent->m_push_list =
this;
413 if (m_dispatcher ==
nullptr) m_dispatcher = m_parent->m_dispatcher;
415 m_parent->m_mtx.unlock();
418 template<
typename T,
typename TContainer>
419 inline bool queue<T, TContainer>::push_awaiter::await_resume() {
Basic dispatcher interface class.
Definition dispatcher.h:8
static dispatcher * current() noexcept
Definition dispatcher.h:48
virtual void push(std::function< void()> cbfn)=0
pop_awaiter & resume_on(dispatcher *dsp) noexcept
Set a different dispatcher to resume this operation on.
Definition queue.h:116
push_awaiter & resume_on(dispatcher *dsp) noexcept
Set a different dispatcher to resume this operation on.
Definition queue.h:148
Queue for sharing items between multiple producers/consumers.
Definition queue.h:18
bool try_emplace(Args &&... args)
Try pushing a new item to the queue.
Definition queue.h:246
queue(size_t max_size=std::numeric_limits< size_t >::max(), dispatcher *disp=nullptr, Args &&... args)
Construct a new queue.
Definition queue.h:37
void clear()
Clear the queue.
Definition queue.h:296
pop_awaiter pop()
Pop a value from the queue and suspend if the queue is empty.
Definition queue.h:286
std::optional< T > try_pop()
Try poping a item from the queue.
Definition queue.h:265
push_awaiter push(T &&value)
Push a value to the queue and suspend if the queue is full.
Definition queue.h:291
bool empty() const noexcept
True if the queue is empty and calling pop() would suspend.
Definition queue.h:220
bool try_push(T &&value)
Try pushing a new item to the queue.
Definition queue.h:226
size_t size() const noexcept
Get the number of items available without suspending.
Definition queue.h:211
Provides a consistent import interface for coroutine, experimental/coroutine or a best effort fallbac...