Async++ unknown
Async (co_await/co_return) code for C++
Loading...
Searching...
No Matches
timer.h
1#pragma once
3#include <asyncpp/dispatcher.h>
5#include <atomic>
6#include <cassert>
7#include <chrono>
8#include <condition_variable>
9#include <mutex>
10#include <optional>
11#include <queue>
12#include <set>
13#include <thread>
14
15namespace asyncpp {
19 class timer final : public dispatcher {
20 struct scheduled_entry {
21 std::chrono::steady_clock::time_point timepoint;
22 std::function<void(bool)> invokable;
23
24 scheduled_entry(std::chrono::steady_clock::time_point time, std::function<void(bool)> cbfn) noexcept
25 : timepoint(time), invokable(std::move(cbfn)) {}
26
28 struct time_less {
30 constexpr bool operator()(const scheduled_entry& lhs, const scheduled_entry& rhs) const noexcept {
31 return lhs.timepoint < rhs.timepoint;
32 }
33 };
34 };
35 struct cancellable_scheduled_entry : public scheduled_entry {
41 std::multiset<cancellable_scheduled_entry, scheduled_entry::time_less>::const_iterator it;
43 void operator()() const {
44 // We have to distinguish between cancellation while in the process of construction and
45 // cancellation afterwards. If the stop_token is signalled at the time of construction it directly
46 // invokes the callback, however if we would lock at that point we would cause a deadlock with the
47 // outside lock in schedule(). We also can't directly delete the node in this case because
48 // std::optional<>::emplace tries to set a flag after construction which would cause use after free.
49 // Thus we set a flag in schedule() and postpone both the invoke and deletion until after emplace is done.
50 // If we are not in construction we need to lock and can directly destroy the node once we are done.
51 if (it->in_construction) {
52 auto entry =
53 new std::multiset<cancellable_scheduled_entry, scheduled_entry::time_less>::node_type(
54 parent->m_scheduled_cancellable_set.extract(it));
55 if (entry->value().invokable) {
56 parent->m_pushed.emplace([entry]() {
57 entry->value().invokable(false);
58 delete entry;
59 });
60 }
61 } else {
62 std::unique_lock lck{parent->m_mtx};
63 auto entry = parent->m_scheduled_cancellable_set.extract(it);
64 lck.unlock();
65 if (entry.value().invokable) {
66 parent->m_pushed.emplace([cbfn = std::move(entry.value().invokable)]() { cbfn(false); });
67 parent->m_cv.notify_all();
68 }
69 }
70 }
71 };
72 mutable std::optional<asyncpp::stop_callback<cancel_callback>> cancel_token;
73 mutable bool in_construction{true};
74
75 cancellable_scheduled_entry(std::chrono::steady_clock::time_point time,
76 std::function<void(bool)> cbfn) noexcept
77 : scheduled_entry{time, std::move(cbfn)} {}
78 };
79
80 public:
84 timer() : m_thread{[this]() noexcept { this->run(); }} {}
85 ~timer() {
86 {
87 std::unique_lock lck{m_mtx};
88 m_exit = true;
89 m_cv.notify_all();
90 }
91 if (m_thread.joinable()) m_thread.join();
92 assert(m_pushed.empty());
93 assert(m_scheduled_set.empty());
94 assert(m_scheduled_cancellable_set.empty());
95 }
96 timer(const timer&) = delete;
97 timer& operator=(const timer&) = delete;
98
103 void push(std::function<void()> cbfn) override {
104 if (m_exit) throw std::logic_error("shutting down");
105 std::unique_lock lck(m_mtx);
106 m_pushed.emplace(std::move(cbfn));
107 m_cv.notify_all();
108 }
109
115 void schedule(std::function<void(bool)> cbfn, std::chrono::steady_clock::time_point timeout) {
116 if (m_exit) throw std::logic_error("shutting down");
117 std::unique_lock lck(m_mtx);
118 m_scheduled_set.emplace(timeout, std::move(cbfn));
119 m_cv.notify_all();
120 }
126 void schedule(std::function<void(bool)> cbfn, std::chrono::nanoseconds timeout) {
127 schedule(std::move(cbfn), std::chrono::steady_clock::now() + timeout);
128 }
134 template<typename Clock, typename Duration>
135 void schedule(std::function<void(bool)> cbfn, std::chrono::time_point<Clock, Duration> timeout) {
136 schedule(std::move(cbfn), timeout - Clock::now());
137 }
138
145 void schedule(std::function<void(bool)> cbfn, std::chrono::steady_clock::time_point timeout,
146 asyncpp::stop_token stoken) {
147 if (m_exit) throw std::logic_error("shutting down");
148 std::unique_lock lck(m_mtx);
149 auto iter = m_scheduled_cancellable_set.emplace(timeout, std::move(cbfn));
150 iter->cancel_token.emplace(std::move(stoken), cancellable_scheduled_entry::cancel_callback{this, iter});
151 iter->in_construction = false;
152 m_cv.notify_all();
153 }
160 void schedule(std::function<void(bool)> cbfn, std::chrono::nanoseconds timeout, asyncpp::stop_token stoken) {
161 schedule(std::move(cbfn), std::chrono::steady_clock::now() + timeout, std::move(stoken));
162 }
169 template<typename Clock, typename Duration>
170 void schedule(std::function<void(bool)> cbfn, std::chrono::time_point<Clock, Duration> timeout,
171 asyncpp::stop_token stoken) {
172 schedule(std::move(cbfn), timeout - Clock::now(), std::move(stoken));
173 }
174
179 auto wait(std::chrono::steady_clock::time_point timeout) noexcept {
180 struct awaiter {
181 timer* const m_parent;
182 const std::chrono::steady_clock::time_point m_timeout;
183 bool m_result{true};
184 constexpr awaiter(timer* parent, std::chrono::steady_clock::time_point timeout) noexcept
185 : m_parent(parent), m_timeout(timeout) {}
186
187 [[nodiscard]] bool await_ready() const noexcept {
188 return std::chrono::steady_clock::now() >= m_timeout;
189 }
190 void await_suspend(coroutine_handle<> hndl) {
191 m_parent->schedule(
192 [this, hndl](bool res) mutable {
193 m_result = res;
194 hndl.resume();
195 },
196 m_timeout);
197 }
198 //NOLINTNEXTLINE(modernize-use-nodiscard)
199 constexpr bool await_resume() const noexcept { return m_result; }
200 };
201 return awaiter{this, timeout};
202 }
207 template<typename Rep, typename Period>
208 auto wait(std::chrono::duration<Rep, Period> timeout) noexcept {
209 return wait(std::chrono::steady_clock::now() + timeout);
210 }
215 template<typename Clock, typename Duration>
216 auto wait(std::chrono::time_point<Clock, Duration> timeout) {
217 return wait(timeout - Clock::now());
218 }
219
226 auto wait(std::chrono::steady_clock::time_point timeout, asyncpp::stop_token stoken) noexcept {
227 struct awaiter {
228 timer* const m_parent;
229 const std::chrono::steady_clock::time_point m_timeout;
230 asyncpp::stop_token m_stoptoken;
231 bool m_result{true};
232 awaiter(timer* parent, std::chrono::steady_clock::time_point timeout,
233 asyncpp::stop_token stoken) noexcept
234 : m_parent(parent), m_timeout(timeout), m_stoptoken(std::move(stoken)) {}
235
236 [[nodiscard]] bool await_ready() const noexcept {
237 return std::chrono::steady_clock::now() >= m_timeout;
238 }
239 void await_suspend(coroutine_handle<> hndl) {
240 m_parent->schedule(
241 [this, hndl](bool res) mutable {
242 m_result = res;
243 hndl.resume();
244 },
245 m_timeout, std::move(m_stoptoken));
246 }
247 //NOLINTNEXTLINE(modernize-use-nodiscard)
248 constexpr bool await_resume() const noexcept { return m_result; }
249 };
250 return awaiter{this, timeout, std::move(stoken)};
251 }
252
259 template<typename Rep, typename Period>
260 auto wait(std::chrono::duration<Rep, Period> timeout, asyncpp::stop_token stoken) noexcept {
261 return wait(std::chrono::steady_clock::now() + timeout, std::move(stoken));
262 }
269 template<typename Clock, typename Duration>
270 auto wait(std::chrono::time_point<Clock, Duration> timeout, asyncpp::stop_token stoken) {
271 return wait(timeout - Clock::now(), std::move(stoken));
272 }
273
277 static timer& get_default() {
278 static timer instance;
279 return instance;
280 }
281
282 private:
283 std::mutex m_mtx{};
284 std::condition_variable m_cv{};
285 std::queue<std::function<void()>> m_pushed{};
286 std::multiset<scheduled_entry, scheduled_entry::time_less> m_scheduled_set{};
287 std::multiset<cancellable_scheduled_entry, scheduled_entry::time_less> m_scheduled_cancellable_set{};
288 std::atomic<bool> m_exit{};
289 std::thread m_thread{};
290
291 void run() noexcept {
292#ifdef __linux__
293 pthread_setname_np(pthread_self(), "asyncpp_timer");
294#endif
295 while (true) {
296 std::unique_lock lck(m_mtx);
297 while (!m_pushed.empty()) {
298 auto entry = std::move(m_pushed.front());
299 m_pushed.pop();
300 if (entry) {
301 lck.unlock();
302 try {
303 entry();
304 } catch (...) { std::terminate(); }
305 lck.lock();
306 }
307 }
308 auto now = std::chrono::steady_clock::now();
309 while (!m_scheduled_set.empty()) {
310 auto elem = m_scheduled_set.begin();
311 if (elem->timepoint > now) break;
312 auto handle = m_scheduled_set.extract(elem);
313 if (handle.value().invokable) {
314 lck.unlock();
315 try {
316 handle.value().invokable(true);
317 } catch (...) { std::terminate(); }
318 lck.lock();
319 }
320 }
321 while (!m_scheduled_cancellable_set.empty()) {
322 auto elem = m_scheduled_cancellable_set.begin();
323 if (elem->timepoint > now) break;
324 auto handle = m_scheduled_cancellable_set.extract(elem);
325 if (handle.value().invokable) {
326 handle.value().cancel_token.reset();
327 lck.unlock();
328 try {
329 handle.value().invokable(true);
330 } catch (...) { std::terminate(); }
331 lck.lock();
332 }
333 }
334 now = std::chrono::steady_clock::now();
335 std::chrono::nanoseconds timeout{500 * 1000 * 1000};
336 if (!m_scheduled_set.empty()) timeout = (std::min)(m_scheduled_set.begin()->timepoint - now, timeout);
337 if (!m_scheduled_cancellable_set.empty())
338 timeout = (std::min)(m_scheduled_cancellable_set.begin()->timepoint - now, timeout);
339 if (m_pushed.empty() && timeout.count() > 0) {
340 if (m_exit) break;
341 m_cv.wait_for(lck, timeout);
342 }
343 }
344 std::unique_lock lck(m_mtx);
345 auto set = std::move(m_scheduled_set);
346 auto cset = std::move(m_scheduled_cancellable_set);
347 lck.unlock();
348 for (const auto& entry : set) {
349 if (entry.invokable) {
350 try {
351 entry.invokable(false);
352 } catch (...) { std::terminate(); }
353 }
354 }
355 for (const auto& entry : cset) {
356 if (entry.invokable) {
357 try {
358 entry.invokable(false);
359 } catch (...) { std::terminate(); }
360 }
361 }
362 lck.lock();
363 m_scheduled_set.clear();
364 m_scheduled_cancellable_set.clear();
365 }
366 };
367
368 template<typename Rep, typename Period>
369 inline auto operator co_await(std::chrono::duration<Rep, Period> duration) {
370 return timer::get_default().wait(duration);
371 }
372 template<typename Clock, typename Duration>
373 inline auto operator co_await(std::chrono::time_point<Clock, Duration> timeout) {
374 return timer::get_default().wait(timeout);
375 }
376
377} // namespace asyncpp
Basic dispatcher interface class.
Definition dispatcher.h:8
A dispatcher that provides a way to schedule coroutines based on time.
Definition timer.h:19
static timer & get_default()
Get a global timer instance.
Definition timer.h:277
void schedule(std::function< void(bool)> cbfn, std::chrono::steady_clock::time_point timeout, asyncpp::stop_token stoken)
Schedule a callback to be executed at a specific point in time with a stop_token.
Definition timer.h:145
auto wait(std::chrono::time_point< Clock, Duration > timeout, asyncpp::stop_token stoken)
Get an awaitable that pauses the current coroutine until the specified duration is elapsed,...
Definition timer.h:270
auto wait(std::chrono::duration< Rep, Period > timeout, asyncpp::stop_token stoken) noexcept
Get an awaitable that pauses the current coroutine until the specified duration is elapsed,...
Definition timer.h:260
void schedule(std::function< void(bool)> cbfn, std::chrono::time_point< Clock, Duration > timeout)
Schedule a callback to be executed after a certain duration expires.
Definition timer.h:135
void schedule(std::function< void(bool)> cbfn, std::chrono::time_point< Clock, Duration > timeout, asyncpp::stop_token stoken)
Schedule a callback to be executed after a certain duration expires with a stop_token.
Definition timer.h:170
auto wait(std::chrono::duration< Rep, Period > timeout) noexcept
Get an awaitable that pauses the current coroutine until the specified duration is elapsed.
Definition timer.h:208
void push(std::function< void()> cbfn) override
Push a callback to be executed in the timer thread.
Definition timer.h:103
auto wait(std::chrono::steady_clock::time_point timeout) noexcept
Get an awaitable that pauses the current coroutine until the specified time_point is reached.
Definition timer.h:179
void schedule(std::function< void(bool)> cbfn, std::chrono::nanoseconds timeout, asyncpp::stop_token stoken)
Schedule a callback to be executed after a certain duration expires with a stop_token.
Definition timer.h:160
auto wait(std::chrono::time_point< Clock, Duration > timeout)
Get an awaitable that pauses the current coroutine until the specified duration is elapsed.
Definition timer.h:216
timer()
Construct a new timer.
Definition timer.h:84
void schedule(std::function< void(bool)> cbfn, std::chrono::nanoseconds timeout)
Schedule a callback to be executed after a certain duration expires.
Definition timer.h:126
void schedule(std::function< void(bool)> cbfn, std::chrono::steady_clock::time_point timeout)
Schedule a callback to be executed at a specific point in time.
Definition timer.h:115
auto wait(std::chrono::steady_clock::time_point timeout, asyncpp::stop_token stoken) noexcept
Get an awaitable that pauses the current coroutine until the specified time_point is reached,...
Definition timer.h:226
Provides a consistent import interface for coroutine, experimental/coroutine or a best effort fallbac...
Polyfill implementation of std::stop_token and friends. Check the documentation of the stl version fo...
Callback type used with stop_tokens.
Definition timer.h:37
timer * parent
Pointer to the timer containing this node.
Definition timer.h:39
void operator()() const
Invocation operator called on cancellation.
Definition timer.h:43
std::multiset< cancellable_scheduled_entry, scheduled_entry::time_less >::const_iterator it
Iterator to this node in the entry set.
Definition timer.h:41
Comparator struct that compares the timepoints.
Definition timer.h:28
constexpr bool operator()(const scheduled_entry &lhs, const scheduled_entry &rhs) const noexcept
Compare two entries.
Definition timer.h:30