32 explicit thread_pool(
size_t initial_size = std::thread::hardware_concurrency()) { this->
resize(initial_size); }
41 void push(std::function<
void()> cbfn)
override {
43 if (g_current_thread !=
nullptr) {
44 std::unique_lock lck{g_current_thread->mutex};
45 g_current_thread->queue.emplace(std::move(cbfn));
47 std::shared_lock lck{m_threads_mtx};
48 auto size = m_valid_size.load();
49 if (
size == 0)
throw std::runtime_error(
"pool is shutting down");
50 auto thread = m_threads[g_queue_rand() %
size].get();
51 std::unique_lock lck2{thread->mutex};
52 thread->queue.emplace(std::move(cbfn));
53 thread->cv.notify_one();
62 std::unique_lock lck{m_resize_mtx};
63 auto old = m_target_size.load();
64 if (old > target_size) {
66 m_valid_size = target_size;
67 m_target_size = target_size;
70 for (
size_t i = target_size; i < m_threads.size(); i++) {
71 m_threads[i]->cv.notify_all();
72 if (m_threads[i]->thread.joinable()) m_threads[i]->thread.join();
73 assert(m_threads[i]->queue.empty());
75 std::unique_lock lck{m_threads_mtx};
76 m_threads.resize(target_size);
77 }
else if (old < target_size) {
78 m_target_size = target_size;
79 std::unique_lock threads_lck{m_threads_mtx};
80 m_threads.resize(target_size);
83 for (
size_t i = old; i < target_size; i++) {
84 m_threads[i] = std::make_unique<thread_state>(
this, i);
87 m_valid_size = target_size;
89 assert(target_size == m_threads.size());
90 assert(target_size == m_valid_size);
91 assert(target_size == m_target_size);
98 size_t size() const noexcept {
return m_valid_size.load(); }
101 struct thread_state {
103 size_t const thread_index;
105 std::condition_variable cv{};
106 std::queue<std::function<void()>> queue{};
110 : pool{parent}, thread_index{index}, thread{[this]() { this->run(); }} {}
112 std::function<void()> try_steal_task() {
114 if (!pool->m_threads_mtx.try_lock_shared())
return {};
115 std::shared_lock lck{pool->m_threads_mtx, std::adopt_lock};
116 for (
size_t i = 0; i < pool->m_valid_size; i++) {
117 auto& thread = pool->m_threads[i];
118 if (thread.get() ==
this || thread ==
nullptr)
continue;
120 if (!thread->mutex.try_lock())
continue;
121 std::unique_lock th_lck{thread->mutex, std::adopt_lock};
122 if (thread->queue.empty())
continue;
123 auto cbfn = std::move(thread->queue.front());
133 std::string name =
"pool_" + std::to_string(thread_index);
134 pthread_setname_np(pthread_self(), name.c_str());
138 g_current_thread =
this;
141 std::unique_lock lck{mutex};
142 while (!queue.empty()) {
143 auto cbfn = std::move(queue.front());
150 if (thread_index >= pool->m_target_size)
break;
151 if (
auto cbfn = try_steal_task(); cbfn) {
155 if (thread_index < pool->m_target_size) {
156 std::unique_lock lck{mutex};
157 cv.wait_for(lck, std::chrono::milliseconds{100});
160 std::unique_lock lck{mutex};
161 g_current_thread =
nullptr;
162 while (!queue.empty()) {
163 auto& cbfn = queue.front();
173 inline static thread_local thread_state* g_current_thread{
nullptr};
177 inline static thread_local std::minstd_rand g_queue_rand{
178 static_cast<unsigned int>(std::hash<std::thread::id>{}(std::this_thread::get_id()))};
179 std::atomic<size_t> m_target_size{0};
180 std::atomic<size_t> m_valid_size{0};
181 std::mutex m_resize_mtx{};
182 std::shared_mutex m_threads_mtx{};
184 std::vector<std::unique_ptr<thread_state>> m_threads{};