Producer-Consumer


あるスレッドから別のスレッドにデータを渡すためのパターン。


というか Guarded Suspension とどう違うのかよく分かんない。
中身はほとんど同じだけど、その目的が違うのかなぁ……。


以下は増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編のサンプルを Boost.Thread を使って書いたコード。


maker_thread.h の next_id() で static 変数が使われてたんだけど、cpp ファイルを作って定義するのが面倒なので static でローカルに作ったんだけど、これって最初に next_id() を通るときに排他制御で問題になったりするのかな?


main.cpp

#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include "table.h"
#include "maker_thread.h"
#include "eater_thread.h"

namespace mtdp{ namespace producer_consumer
{
    void main()
    {
        boost::shared_ptr<table> t(new table(3)); // ケーキを3個まで置けるテーブルを作る

        boost::thread_group group;
        group.create_thread(boost::bind(&maker_thread::run, boost::shared_ptr<maker_thread>(new maker_thread("MakerThread-1", t, 31415))));
        group.create_thread(boost::bind(&maker_thread::run, boost::shared_ptr<maker_thread>(new maker_thread("MakerThread-2", t, 92653))));
        group.create_thread(boost::bind(&maker_thread::run, boost::shared_ptr<maker_thread>(new maker_thread("MakerThread-3", t, 58979))));
        group.create_thread(boost::bind(&eater_thread::run, boost::shared_ptr<eater_thread>(new eater_thread("EaterThread-1", t, 32384))));
        group.create_thread(boost::bind(&eater_thread::run, boost::shared_ptr<eater_thread>(new eater_thread("EaterThread-2", t, 62643))));
        group.create_thread(boost::bind(&eater_thread::run, boost::shared_ptr<eater_thread>(new eater_thread("EaterThread-3", t, 32327))));
        group.join_all();
    }
}}

maker_thread.h

#ifndef MTDP_PRODUCER_CONSUMER_MAKER_THREAD_H_INCLUDED
#define MTDP_PRODUCER_CONSUMER_MAKER_THREAD_H_INCLUDED

#include <boost/random.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/detail/interlocked.hpp>
#include <string>
#include "../thread_helper.h"

namespace mtdp{ namespace producer_consumer
{
    class maker_thread
    {
    private:
        const std::string name_;
        const boost::mt19937 mt_;
        const boost::shared_ptr<table> table_;

    public:
        maker_thread(std::string name, boost::shared_ptr<table> t, boost::uint32_t seed)
            : name_(name), table_(t), mt_(seed)
        {
        }

        void run()
        {
            boost::variate_generator<boost::mt19937, boost::uniform_int<> > random(mt_, boost::uniform_int<>(0, 1000));

            while (true)
            {
                thread_helper::sleep(random());

                std::string cake("[ Cake No." + to_string(next_id()) + " by " + name() + " ]");

                table_->put(this, cake);
            }
        }

        std::string name() const
        {
            return name_;
        }

    private:
        static int next_id()
        {
            static volatile long id = -1;
            return BOOST_INTERLOCKED_INCREMENT(&id);
        }
    };
}}

#endif // MTDP_PRODUCER_CONSUMER_MAKER_THREAD_H_INCLUDED

eater_thread.h

#ifndef MTDP_PRODUCER_CONSUMER_EATER_THREAD_H_INCLUDED
#define MTDP_PRODUCER_CONSUMER_EATER_THREAD_H_INCLUDED

#include <boost/random.hpp>
#include <boost/shared_ptr.hpp>
#include <string>

namespace mtdp{ namespace producer_consumer
{
    class eater_thread
    {
    private:
        const std::string name_;
        const boost::mt19937 mt_;
        const boost::shared_ptr<table> table_;

    public:
        eater_thread(std::string name, boost::shared_ptr<table> t, boost::uint32_t seed)
            : name_(name), table_(t), mt_(seed)
        {
        }

        void run()
        {
            boost::variate_generator<boost::mt19937, boost::uniform_int<> > random(mt_, boost::uniform_int<>(0, 1000));

            while (true)
            {
                std::string cake = table_->take(this);
                thread_helper::sleep(random());
            }
        }

        std::string name() const
        {
            return name_;
        }
    };
}}

#endif // MTDP_PRODUCER_CONSUMER_EATER_THREAD_H_INCLUDED

table.h

#ifndef MTDP_PRODUCER_CONSUMER_TABLE_H_INCLUDED
#define MTDP_PRODUCER_CONSUMER_TABLE_H_INCLUDED

#include <boost/thread.hpp>
#include <vector>
#include <string>
#include <iostream>
#include "../thread_helper.h"

namespace mtdp{ namespace producer_consumer
{
    class table
    {
    private:
        std::vector<std::string> buffer_;
        volatile std::size_t tail_;         // 次に put する場所
        volatile std::size_t head_;         // 次に take する場所
        volatile std::size_t count_;        // buffer 内のケーキ数

        boost::mutex mutex_;
        boost::condition condition_;

    public:
        table(int count)
            : buffer_(count), tail_(0), head_(0), count_(0)
        {
        }
        // ケーキを置く
        template<class Type>
        void put(Type* p, std::string cake)
        {
            boost::mutex::scoped_lock lock(mutex_);

            thread_helper::shared_cout(p->name() + " puts " + cake + "\n");
            while (count_ >= buffer_.size())
            {
                condition_.wait(lock);
            }
            buffer_[tail_] = cake;
            tail_ = (tail_ + 1) % buffer_.size();
            count_++;
            condition_.notify_all();
        }
        // ケーキを取る
        template<class Type>
        std::string take(Type* p)
        {
            boost::mutex::scoped_lock lock(mutex_);

            while (count_ <= 0)
            {
                condition_.wait(lock);
            }
            std::string cake = buffer_[head_];
            head_ = (head_ + 1) % buffer_.size();
            count_--;
            condition_.notify_all();
            thread_helper::shared_cout(p->name() + " takes " + cake + "\n");
            return cake;
        }
    };
}}

#endif // MTDP_PRODUCER_CONSUMER_TABLE_H_INCLUDED