スレッドプール
Worker Thread パターンでスレッドプールを作ってみた。
worker_thread.h
#ifndef ANMELT_THREAD_WORKER_THREAD_H_INCLUDED #define ANMELT_THREAD_WORKER_THREAD_H_INCLUDED // Boost #include <boost/thread.hpp> #include <boost/shared_ptr.hpp> #include <boost/function.hpp> #include <boost/bind.hpp> #include <boost/noncopyable.hpp> namespace anmelt{ namespace thread { template<class Container> class worker_thread : boost::noncopyable { private: const boost::shared_ptr<Container> container_; const boost::shared_ptr<boost::thread> thread_; public: worker_thread(const boost::shared_ptr<Container>& container) : container_(container) { const_cast<boost::shared_ptr<boost::thread>&>(thread_).reset( new boost::thread(boost::bind(&worker_thread<Container>::run, this))); } boost::thread& this_thread() { return *thread_; } template<class F> void request(F f) { container_->put(f); } private: void run() { try { while (true) { boost::function<void ()> f; container_->get(f); if (!f.empty()) { f(); } } } catch (boost::thread_interrupted&) { } } }; }} #endif // ANMELT_THREAD_WORKER_THREAD_H_INCLUDED
thread_pool.h
#ifndef ANMELT_THREAD_THREAD_POOL_H_INCLUDED #define ANMELT_THREAD_THREAD_POOL_H_INCLUDED #include <anmelt/thread/mutexed.h> #include <anmelt/thread/worker_thread.h> #include <anmelt/thread/containers.h> // Boost #include <boost/bind.hpp> #include <boost/cstdint.hpp> #include <boost/noncopyable.hpp> // STL #include <algorithm> #include <list> namespace anmelt{ namespace thread { class thread_pool : boost::noncopyable { private: boost::mutex mutex_; boost::condition condition_; typedef blocking_queue<boost::function<void ()> > queue; typedef worker_thread<queue> worker_thread; typedef boost::shared_ptr<worker_thread> thread_ptr; typedef std::list<thread_ptr> thread_list; typedef boost::shared_ptr<queue> queue_ptr; const queue_ptr queue_; mutexed<thread_list> threads_; public: thread_pool(boost::uint32_t size) : queue_(new queue) { for (boost::uint32_t i = 0; i < size; i++) { threads_.value().push_back(thread_ptr(new worker_thread(queue_))); } } ~thread_pool() { for_each(boost::bind(&boost::thread::interrupt, boost::bind(&worker_thread::this_thread, _1))); for_each(boost::bind(&boost::thread::join, boost::bind(&worker_thread::this_thread, _1))); } template<class F> void request(F f) { queue_->put(f); } std::size_t size() const { return threads_.apply(boost::bind(&thread_list::size, _1)); } private: template<class F> void for_each(F f) { struct pred : std::unary_function<thread_list, void> { F& f_; pred(F& f) : f_(f) { } void operator()(thread_list& list) const { std::for_each(list.begin(), list.end(), f_); } }; threads_.apply(pred(f)); } }; }} #endif // ANMELT_THREAD_THREAD_POOL_H_INCLUDED
テスト
boost::mutex g_mutex; void func() { boost::mutex::scoped_lock lock(g_mutex); std::cout << "id: " << boost::this_thread::get_id() << std::endl; } void main() { anmelt::thread::thread_pool pool(5); for (int i = 0; i < 10; i++) { pool.request(func); } }
結果
id: 00153460 id: 00153460 id: 00153490 id: 00153460 id: 00153490 id: 001531C0 id: 001535E0 id: 00153490 id: 001531C0 id: 00153310
スレッドプールにリクエストを出すと、それをプールにあるどれかのスレッドが実行します。
動的にスレッドの数を増やしたり減らしたりとか設定が出来るといいんだけど、とりあえずはこれだけ。