Producer-Consumer
あるスレッドから別のスレッドにデータを渡すためのパターン。
というか Guarded Suspension とどう違うのかよく分かんない。
中身はほとんど同じだけど、その目的が違うのかなぁ……。
以下は増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編のサンプルを Boost.Thread を使って書いたコード。
maker_thread.h の next_id() で static 変数が使われてたんだけど、cpp ファイルを作って定義するのが面倒なので static でローカルに作ったんだけど、これって最初に next_id() を通るときに排他制御で問題になったりするのかな?
main.cpp
#include <boost/thread.hpp> #include <boost/shared_ptr.hpp> #include <boost/bind.hpp> #include "table.h" #include "maker_thread.h" #include "eater_thread.h" namespace mtdp{ namespace producer_consumer { void main() { boost::shared_ptr<table> t(new table(3)); // ケーキを3個まで置けるテーブルを作る boost::thread_group group; group.create_thread(boost::bind(&maker_thread::run, boost::shared_ptr<maker_thread>(new maker_thread("MakerThread-1", t, 31415)))); group.create_thread(boost::bind(&maker_thread::run, boost::shared_ptr<maker_thread>(new maker_thread("MakerThread-2", t, 92653)))); group.create_thread(boost::bind(&maker_thread::run, boost::shared_ptr<maker_thread>(new maker_thread("MakerThread-3", t, 58979)))); group.create_thread(boost::bind(&eater_thread::run, boost::shared_ptr<eater_thread>(new eater_thread("EaterThread-1", t, 32384)))); group.create_thread(boost::bind(&eater_thread::run, boost::shared_ptr<eater_thread>(new eater_thread("EaterThread-2", t, 62643)))); group.create_thread(boost::bind(&eater_thread::run, boost::shared_ptr<eater_thread>(new eater_thread("EaterThread-3", t, 32327)))); group.join_all(); } }}
maker_thread.h
#ifndef MTDP_PRODUCER_CONSUMER_MAKER_THREAD_H_INCLUDED #define MTDP_PRODUCER_CONSUMER_MAKER_THREAD_H_INCLUDED #include <boost/random.hpp> #include <boost/shared_ptr.hpp> #include <boost/detail/interlocked.hpp> #include <string> #include "../thread_helper.h" namespace mtdp{ namespace producer_consumer { class maker_thread { private: const std::string name_; const boost::mt19937 mt_; const boost::shared_ptr<table> table_; public: maker_thread(std::string name, boost::shared_ptr<table> t, boost::uint32_t seed) : name_(name), table_(t), mt_(seed) { } void run() { boost::variate_generator<boost::mt19937, boost::uniform_int<> > random(mt_, boost::uniform_int<>(0, 1000)); while (true) { thread_helper::sleep(random()); std::string cake("[ Cake No." + to_string(next_id()) + " by " + name() + " ]"); table_->put(this, cake); } } std::string name() const { return name_; } private: static int next_id() { static volatile long id = -1; return BOOST_INTERLOCKED_INCREMENT(&id); } }; }} #endif // MTDP_PRODUCER_CONSUMER_MAKER_THREAD_H_INCLUDED
eater_thread.h
#ifndef MTDP_PRODUCER_CONSUMER_EATER_THREAD_H_INCLUDED #define MTDP_PRODUCER_CONSUMER_EATER_THREAD_H_INCLUDED #include <boost/random.hpp> #include <boost/shared_ptr.hpp> #include <string> namespace mtdp{ namespace producer_consumer { class eater_thread { private: const std::string name_; const boost::mt19937 mt_; const boost::shared_ptr<table> table_; public: eater_thread(std::string name, boost::shared_ptr<table> t, boost::uint32_t seed) : name_(name), table_(t), mt_(seed) { } void run() { boost::variate_generator<boost::mt19937, boost::uniform_int<> > random(mt_, boost::uniform_int<>(0, 1000)); while (true) { std::string cake = table_->take(this); thread_helper::sleep(random()); } } std::string name() const { return name_; } }; }} #endif // MTDP_PRODUCER_CONSUMER_EATER_THREAD_H_INCLUDED
table.h
#ifndef MTDP_PRODUCER_CONSUMER_TABLE_H_INCLUDED #define MTDP_PRODUCER_CONSUMER_TABLE_H_INCLUDED #include <boost/thread.hpp> #include <vector> #include <string> #include <iostream> #include "../thread_helper.h" namespace mtdp{ namespace producer_consumer { class table { private: std::vector<std::string> buffer_; volatile std::size_t tail_; // 次に put する場所 volatile std::size_t head_; // 次に take する場所 volatile std::size_t count_; // buffer 内のケーキ数 boost::mutex mutex_; boost::condition condition_; public: table(int count) : buffer_(count), tail_(0), head_(0), count_(0) { } // ケーキを置く template<class Type> void put(Type* p, std::string cake) { boost::mutex::scoped_lock lock(mutex_); thread_helper::shared_cout(p->name() + " puts " + cake + "\n"); while (count_ >= buffer_.size()) { condition_.wait(lock); } buffer_[tail_] = cake; tail_ = (tail_ + 1) % buffer_.size(); count_++; condition_.notify_all(); } // ケーキを取る template<class Type> std::string take(Type* p) { boost::mutex::scoped_lock lock(mutex_); while (count_ <= 0) { condition_.wait(lock); } std::string cake = buffer_[head_]; head_ = (head_ + 1) % buffer_.size(); count_--; condition_.notify_all(); thread_helper::shared_cout(p->name() + " takes " + cake + "\n"); return cake; } }; }} #endif // MTDP_PRODUCER_CONSUMER_TABLE_H_INCLUDED