Worker Thread
スレッドを起動する時間をケチるのと、スレッド数の管理を行うパターン。
以下は増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編のサンプルを Boost.Thread を使って書いたコード。
これ、request じゃなくて boost::function0
というか namespace worker_thread と class worker_thread で同じ名前なのにちゃんとコンパイルできるのには驚いた。
main.cpp
#include <boost/thread.hpp> #include <boost/shared_ptr.hpp> #include "channel.h" #include "client_thread.h" namespace mtdp{ namespace worker_thread { void main() { boost::shared_ptr<channel> c(new channel(5)); // ワーカースレッドの個数 c->start_workers(); boost::thread_group group; group.create_thread(boost::bind(&client_thread::run, boost::shared_ptr<client_thread>(new client_thread("Alice", c)))); group.create_thread(boost::bind(&client_thread::run, boost::shared_ptr<client_thread>(new client_thread("Bobby", c)))); group.create_thread(boost::bind(&client_thread::run, boost::shared_ptr<client_thread>(new client_thread("Chris", c)))); group.join_all(); } }}
client_thread.h
#ifndef MTDP_WORKER_THREAD_CLIENT_THREAD_H_INCLUDED #define MTDP_WORKER_THREAD_CLIENT_THREAD_H_INCLUDED #include <boost/shared_ptr.hpp> #include <boost/random.hpp> #include "channel.h" #include "request.h" #include "../thread_helper.h" namespace mtdp{ namespace worker_thread { class client_thread { private: const std::string name_; const boost::shared_ptr<channel> channel_; public: client_thread(std::string name, boost::shared_ptr<channel> c) : name_(name), channel_(c) { } void run() { static boost::variate_generator<boost::mt19937, boost::uniform_int<> > random( boost::mt19937(), boost::uniform_int<>(0, 1000)); for (int i = 0; true; i++) { boost::shared_ptr<request> r(new request(name(), i)); channel_->put_request(r); thread_helper::sleep(random()); } } std::string name() const { return name_; } }; }} #endif // MTDP_WORKER_THREAD_CLIENT_THREAD_H_INCLUDED
request.h
#ifndef MTDP_WORKER_THREAD_REQUEST_H_INCLUDED #define MTDP_WORKER_THREAD_REQUEST_H_INCLUDED #include <string> #include "../thread_helper.h" namespace mtdp{ namespace worker_thread { class request { private: const std::string name_; // 依頼者 const int number_; // リクエストの番号 public: request(std::string name, int number) : name_(name), number_(number) { } template<class T> void execute(T* p) { static boost::variate_generator<boost::mt19937, boost::uniform_int<> > random( boost::mt19937(), boost::uniform_int<>(0, 1000)); thread_helper::shared_cout(p->name() + " executes " + this->to_string() + "\n"); thread_helper::sleep(random()); } std::string to_string() { return "[ request from " + name_ + " No." + mtdp::to_string(number_) + " ]"; } }; }} #endif // MTDP_WORKER_THREAD_REQUEST_H_INCLUDED
channel.h
#ifndef MTDP_WORKER_THREAD_CHANNEL_H_INCLUDED #define MTDP_WORKER_THREAD_CHANNEL_H_INCLUDED #include <boost/thread.hpp> #include <boost/shared_ptr.hpp> #include <boost/foreach.hpp> #include <boost/bind.hpp> #include <vector> #include <queue> #include "request.h" #include "worker_thread.h" namespace mtdp{ namespace worker_thread { class channel { private: static const std::size_t MAX_REQUEST = 100; typedef std::queue<boost::shared_ptr<request> > request_queue_t; request_queue_t request_queue_; typedef std::vector<boost::shared_ptr<worker_thread> > worker_thread_list_t; worker_thread_list_t thread_pool_; boost::mutex mutex_; boost::condition condition_; boost::thread_group group_; public: channel(std::size_t threads) : thread_pool_(threads) { for (std::size_t i = 0; i < thread_pool_.size(); i++) { thread_pool_[i].reset(new worker_thread("Worker-" + to_string(i), this)); } } ~channel() { group_.join_all(); } void start_workers() { BOOST_FOREACH (boost::shared_ptr<worker_thread>& wt, thread_pool_) { group_.create_thread(boost::bind(&worker_thread::run, wt)); } } void put_request(boost::shared_ptr<request> r) { boost::mutex::scoped_lock lock(mutex_); while (request_queue_.size() >= MAX_REQUEST) { condition_.wait(lock); } request_queue_.push(r); condition_.notify_all(); } boost::shared_ptr<request> take_request() { boost::mutex::scoped_lock lock(mutex_); while (request_queue_.size() <= 0) { condition_.wait(lock); } boost::shared_ptr<request> r = request_queue_.front(); request_queue_.pop(); condition_.notify_all(); return r; } }; }} #endif // MTDP_WORKER_THREAD_CHANNEL_H_INCLUDED
worker_thread.h
#ifndef MTDP_WORKER_THREAD_WORKER_THREAD_H_INCLUDED #define MTDP_WORKER_THREAD_WORKER_THREAD_H_INCLUDED #include <string> #include "request.h" namespace mtdp{ namespace worker_thread { class channel; class worker_thread { private: const std::string name_; channel* const channel_; public: worker_thread(std::string name, channel* c) : name_(name), channel_(c) { } void run() { _run(0); } std::string name() const { return name_; } private: // 循環参照になるので template にして実体化を遅らせる。 template<class T> void _run(T) { while (true) { boost::shared_ptr<request> r = channel_->take_request(); r->execute(this); } } }; }} #endif // MTDP_WORKER_THREAD_WORKER_THREAD_H_INCLUDED