3#include <asyncpp/dispatcher.h>
8#include <condition_variable>
20 struct scheduled_entry {
21 std::chrono::steady_clock::time_point timepoint;
22 std::function<void(
bool)> invokable;
24 scheduled_entry(std::chrono::steady_clock::time_point time, std::function<
void(
bool)> cbfn) noexcept
25 : timepoint(time), invokable(std::move(cbfn)) {}
30 constexpr bool operator()(
const scheduled_entry& lhs,
const scheduled_entry& rhs)
const noexcept {
31 return lhs.timepoint < rhs.timepoint;
35 struct cancellable_scheduled_entry :
public scheduled_entry {
41 std::multiset<cancellable_scheduled_entry, scheduled_entry::time_less>::const_iterator
it;
51 if (
it->in_construction) {
53 new std::multiset<cancellable_scheduled_entry, scheduled_entry::time_less>::node_type(
54 parent->m_scheduled_cancellable_set.extract(
it));
55 if (entry->value().invokable) {
56 parent->m_pushed.emplace([entry]() {
57 entry->value().invokable(
false);
62 std::unique_lock lck{
parent->m_mtx};
63 auto entry =
parent->m_scheduled_cancellable_set.extract(
it);
65 if (entry.value().invokable) {
66 parent->m_pushed.emplace([cbfn = std::move(entry.value().invokable)]() { cbfn(false); });
72 mutable std::optional<asyncpp::stop_callback<cancel_callback>> cancel_token;
73 mutable bool in_construction{
true};
75 cancellable_scheduled_entry(std::chrono::steady_clock::time_point time,
76 std::function<
void(
bool)> cbfn) noexcept
77 : scheduled_entry{time, std::move(cbfn)} {}
84 timer() : m_thread{[this]() noexcept { this->run(); }} {}
87 std::unique_lock lck{m_mtx};
91 if (m_thread.joinable()) m_thread.join();
92 assert(m_pushed.empty());
93 assert(m_scheduled_set.empty());
94 assert(m_scheduled_cancellable_set.empty());
96 timer(
const timer&) =
delete;
97 timer& operator=(
const timer&) =
delete;
103 void push(std::function<
void()> cbfn)
override {
104 if (m_exit)
throw std::logic_error(
"shutting down");
105 std::unique_lock lck(m_mtx);
106 m_pushed.emplace(std::move(cbfn));
115 void schedule(std::function<
void(
bool)> cbfn, std::chrono::steady_clock::time_point timeout) {
116 if (m_exit)
throw std::logic_error(
"shutting down");
117 std::unique_lock lck(m_mtx);
118 m_scheduled_set.emplace(timeout, std::move(cbfn));
126 void schedule(std::function<
void(
bool)> cbfn, std::chrono::nanoseconds timeout) {
127 schedule(std::move(cbfn), std::chrono::steady_clock::now() + timeout);
134 template<
typename Clock,
typename Duration>
135 void schedule(std::function<
void(
bool)> cbfn, std::chrono::time_point<Clock, Duration> timeout) {
136 schedule(std::move(cbfn), timeout - Clock::now());
145 void schedule(std::function<
void(
bool)> cbfn, std::chrono::steady_clock::time_point timeout,
146 asyncpp::stop_token stoken) {
147 if (m_exit)
throw std::logic_error(
"shutting down");
148 std::unique_lock lck(m_mtx);
149 auto iter = m_scheduled_cancellable_set.emplace(timeout, std::move(cbfn));
151 iter->in_construction =
false;
160 void schedule(std::function<
void(
bool)> cbfn, std::chrono::nanoseconds timeout, asyncpp::stop_token stoken) {
161 schedule(std::move(cbfn), std::chrono::steady_clock::now() + timeout, std::move(stoken));
169 template<
typename Clock,
typename Duration>
170 void schedule(std::function<
void(
bool)> cbfn, std::chrono::time_point<Clock, Duration> timeout,
171 asyncpp::stop_token stoken) {
172 schedule(std::move(cbfn), timeout - Clock::now(), std::move(stoken));
179 auto wait(std::chrono::steady_clock::time_point timeout)
noexcept {
181 timer*
const m_parent;
182 const std::chrono::steady_clock::time_point m_timeout;
184 constexpr awaiter(
timer* parent, std::chrono::steady_clock::time_point timeout) noexcept
185 : m_parent(parent), m_timeout(timeout) {}
187 [[nodiscard]]
bool await_ready()
const noexcept {
188 return std::chrono::steady_clock::now() >= m_timeout;
190 void await_suspend(coroutine_handle<> hndl) {
192 [
this, hndl](
bool res)
mutable {
199 constexpr bool await_resume()
const noexcept {
return m_result; }
201 return awaiter{
this, timeout};
207 template<
typename Rep,
typename Period>
208 auto wait(std::chrono::duration<Rep, Period> timeout)
noexcept {
209 return wait(std::chrono::steady_clock::now() + timeout);
215 template<
typename Clock,
typename Duration>
216 auto wait(std::chrono::time_point<Clock, Duration> timeout) {
217 return wait(timeout - Clock::now());
226 auto wait(std::chrono::steady_clock::time_point timeout, asyncpp::stop_token stoken)
noexcept {
228 timer*
const m_parent;
229 const std::chrono::steady_clock::time_point m_timeout;
230 asyncpp::stop_token m_stoptoken;
232 awaiter(
timer* parent, std::chrono::steady_clock::time_point timeout,
233 asyncpp::stop_token stoken) noexcept
234 : m_parent(parent), m_timeout(timeout), m_stoptoken(std::move(stoken)) {}
236 [[nodiscard]]
bool await_ready()
const noexcept {
237 return std::chrono::steady_clock::now() >= m_timeout;
239 void await_suspend(coroutine_handle<> hndl) {
241 [
this, hndl](
bool res)
mutable {
245 m_timeout, std::move(m_stoptoken));
248 constexpr bool await_resume()
const noexcept {
return m_result; }
250 return awaiter{
this, timeout, std::move(stoken)};
259 template<
typename Rep,
typename Period>
260 auto wait(std::chrono::duration<Rep, Period> timeout, asyncpp::stop_token stoken)
noexcept {
261 return wait(std::chrono::steady_clock::now() + timeout, std::move(stoken));
269 template<
typename Clock,
typename Duration>
270 auto wait(std::chrono::time_point<Clock, Duration> timeout, asyncpp::stop_token stoken) {
271 return wait(timeout - Clock::now(), std::move(stoken));
278 static timer instance;
284 std::condition_variable m_cv{};
285 std::queue<std::function<void()>> m_pushed{};
286 std::multiset<scheduled_entry, scheduled_entry::time_less> m_scheduled_set{};
287 std::multiset<cancellable_scheduled_entry, scheduled_entry::time_less> m_scheduled_cancellable_set{};
288 std::atomic<bool> m_exit{};
289 std::thread m_thread{};
291 void run() noexcept {
293 pthread_setname_np(pthread_self(),
"asyncpp_timer");
296 std::unique_lock lck(m_mtx);
297 while (!m_pushed.empty()) {
298 auto entry = std::move(m_pushed.front());
304 }
catch (...) { std::terminate(); }
308 auto now = std::chrono::steady_clock::now();
309 while (!m_scheduled_set.empty()) {
310 auto elem = m_scheduled_set.begin();
311 if (elem->timepoint > now)
break;
312 auto handle = m_scheduled_set.extract(elem);
313 if (handle.value().invokable) {
316 handle.value().invokable(
true);
317 }
catch (...) { std::terminate(); }
321 while (!m_scheduled_cancellable_set.empty()) {
322 auto elem = m_scheduled_cancellable_set.begin();
323 if (elem->timepoint > now)
break;
324 auto handle = m_scheduled_cancellable_set.extract(elem);
325 if (handle.value().invokable) {
326 handle.value().cancel_token.reset();
329 handle.value().invokable(
true);
330 }
catch (...) { std::terminate(); }
334 now = std::chrono::steady_clock::now();
335 std::chrono::nanoseconds timeout{500 * 1000 * 1000};
336 if (!m_scheduled_set.empty()) timeout = (std::min)(m_scheduled_set.begin()->timepoint - now, timeout);
337 if (!m_scheduled_cancellable_set.empty())
338 timeout = (std::min)(m_scheduled_cancellable_set.begin()->timepoint - now, timeout);
339 if (m_pushed.empty() && timeout.count() > 0) {
341 m_cv.wait_for(lck, timeout);
344 std::unique_lock lck(m_mtx);
345 auto set = std::move(m_scheduled_set);
346 auto cset = std::move(m_scheduled_cancellable_set);
348 for (
const auto& entry : set) {
349 if (entry.invokable) {
351 entry.invokable(
false);
352 }
catch (...) { std::terminate(); }
355 for (
const auto& entry : cset) {
356 if (entry.invokable) {
358 entry.invokable(
false);
359 }
catch (...) { std::terminate(); }
363 m_scheduled_set.clear();
364 m_scheduled_cancellable_set.clear();
368 template<
typename Rep,
typename Period>
369 inline auto operator co_await(std::chrono::duration<Rep, Period> duration) {
370 return timer::get_default().wait(duration);
372 template<
typename Clock,
typename Duration>
373 inline auto operator co_await(std::chrono::time_point<Clock, Duration> timeout) {
374 return timer::get_default().wait(timeout);
Basic dispatcher interface class.
Definition dispatcher.h:8
A dispatcher that provides a way to schedule coroutines based on time.
Definition timer.h:19
static timer & get_default()
Get a global timer instance.
Definition timer.h:277
void schedule(std::function< void(bool)> cbfn, std::chrono::steady_clock::time_point timeout, asyncpp::stop_token stoken)
Schedule a callback to be executed at a specific point in time with a stop_token.
Definition timer.h:145
auto wait(std::chrono::time_point< Clock, Duration > timeout, asyncpp::stop_token stoken)
Get an awaitable that pauses the current coroutine until the specified duration is elapsed,...
Definition timer.h:270
auto wait(std::chrono::duration< Rep, Period > timeout, asyncpp::stop_token stoken) noexcept
Get an awaitable that pauses the current coroutine until the specified duration is elapsed,...
Definition timer.h:260
void schedule(std::function< void(bool)> cbfn, std::chrono::time_point< Clock, Duration > timeout)
Schedule a callback to be executed after a certain duration expires.
Definition timer.h:135
void schedule(std::function< void(bool)> cbfn, std::chrono::time_point< Clock, Duration > timeout, asyncpp::stop_token stoken)
Schedule a callback to be executed after a certain duration expires with a stop_token.
Definition timer.h:170
auto wait(std::chrono::duration< Rep, Period > timeout) noexcept
Get an awaitable that pauses the current coroutine until the specified duration is elapsed.
Definition timer.h:208
void push(std::function< void()> cbfn) override
Push a callback to be executed in the timer thread.
Definition timer.h:103
auto wait(std::chrono::steady_clock::time_point timeout) noexcept
Get an awaitable that pauses the current coroutine until the specified time_point is reached.
Definition timer.h:179
void schedule(std::function< void(bool)> cbfn, std::chrono::nanoseconds timeout, asyncpp::stop_token stoken)
Schedule a callback to be executed after a certain duration expires with a stop_token.
Definition timer.h:160
auto wait(std::chrono::time_point< Clock, Duration > timeout)
Get an awaitable that pauses the current coroutine until the specified duration is elapsed.
Definition timer.h:216
timer()
Construct a new timer.
Definition timer.h:84
void schedule(std::function< void(bool)> cbfn, std::chrono::nanoseconds timeout)
Schedule a callback to be executed after a certain duration expires.
Definition timer.h:126
void schedule(std::function< void(bool)> cbfn, std::chrono::steady_clock::time_point timeout)
Schedule a callback to be executed at a specific point in time.
Definition timer.h:115
auto wait(std::chrono::steady_clock::time_point timeout, asyncpp::stop_token stoken) noexcept
Get an awaitable that pauses the current coroutine until the specified time_point is reached,...
Definition timer.h:226
Provides a consistent import interface for coroutine, experimental/coroutine or a best effort fallbac...
Polyfill implementation of std::stop_token and friends. Check the documentation of the stl version fo...
Callback type used with stop_tokens.
Definition timer.h:37
timer * parent
Pointer to the timer containing this node.
Definition timer.h:39
void operator()() const
Invocation operator called on cancellation.
Definition timer.h:43
std::multiset< cancellable_scheduled_entry, scheduled_entry::time_less >::const_iterator it
Iterator to this node in the entry set.
Definition timer.h:41
Comparator struct that compares the timepoints.
Definition timer.h:28
constexpr bool operator()(const scheduled_entry &lhs, const scheduled_entry &rhs) const noexcept
Compare two entries.
Definition timer.h:30