Worker Thread


スレッドを起動する時間をケチるのと、スレッド数の管理を行うパターン。


以下は増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編のサンプルを Boost.Thread を使って書いたコード。


これ、request じゃなくて boost::function0 にすれば汎用的な worker_thread クラスが作れそう。


というか namespace worker_thread と class worker_thread で同じ名前なのにちゃんとコンパイルできるのには驚いた。


main.cpp

#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include "channel.h"
#include "client_thread.h"

namespace mtdp{ namespace worker_thread
{
    void main()
    {
        boost::shared_ptr<channel> c(new channel(5)); // ワーカースレッドの個数
        c->start_workers();

        boost::thread_group group;
        group.create_thread(boost::bind(&client_thread::run, boost::shared_ptr<client_thread>(new client_thread("Alice", c))));
        group.create_thread(boost::bind(&client_thread::run, boost::shared_ptr<client_thread>(new client_thread("Bobby", c))));
        group.create_thread(boost::bind(&client_thread::run, boost::shared_ptr<client_thread>(new client_thread("Chris", c))));
        group.join_all();
    }
}}

client_thread.h

#ifndef MTDP_WORKER_THREAD_CLIENT_THREAD_H_INCLUDED
#define MTDP_WORKER_THREAD_CLIENT_THREAD_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include <boost/random.hpp>
#include "channel.h"
#include "request.h"
#include "../thread_helper.h"

namespace mtdp{ namespace worker_thread
{
    class client_thread
    {
    private:
        const std::string name_;
        const boost::shared_ptr<channel> channel_;

    public:
        client_thread(std::string name, boost::shared_ptr<channel> c)
            : name_(name), channel_(c)
        {
        }

        void run()
        {
            static boost::variate_generator<boost::mt19937, boost::uniform_int<> > random(
                boost::mt19937(), boost::uniform_int<>(0, 1000));

            for (int i = 0; true; i++)
            {
                boost::shared_ptr<request> r(new request(name(), i));
                channel_->put_request(r);
                thread_helper::sleep(random());
            }
        }

        std::string name() const
        {
            return name_;
        }
    };
}}

#endif // MTDP_WORKER_THREAD_CLIENT_THREAD_H_INCLUDED

request.h

#ifndef MTDP_WORKER_THREAD_REQUEST_H_INCLUDED
#define MTDP_WORKER_THREAD_REQUEST_H_INCLUDED

#include <string>
#include "../thread_helper.h"

namespace mtdp{ namespace worker_thread
{
    class request
    {
    private:
        const std::string name_;        // 依頼者
        const int number_;              // リクエストの番号

    public:
        request(std::string name, int number)
            : name_(name), number_(number)
        {
        }

        template<class T>
        void execute(T* p)
        {
            static boost::variate_generator<boost::mt19937, boost::uniform_int<> > random(
                boost::mt19937(), boost::uniform_int<>(0, 1000));

            thread_helper::shared_cout(p->name() + " executes " + this->to_string() + "\n");
            thread_helper::sleep(random());
        }

        std::string to_string()
        {
            return "[ request from " + name_ + " No." + mtdp::to_string(number_) + " ]";
        }
    };
}}

#endif // MTDP_WORKER_THREAD_REQUEST_H_INCLUDED

channel.h

#ifndef MTDP_WORKER_THREAD_CHANNEL_H_INCLUDED
#define MTDP_WORKER_THREAD_CHANNEL_H_INCLUDED

#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/foreach.hpp>
#include <boost/bind.hpp>
#include <vector>
#include <queue>
#include "request.h"
#include "worker_thread.h"

namespace mtdp{ namespace worker_thread
{
    class channel
    {
    private:
        static const std::size_t MAX_REQUEST = 100;

        typedef std::queue<boost::shared_ptr<request> > request_queue_t;
        request_queue_t request_queue_;

        typedef std::vector<boost::shared_ptr<worker_thread> > worker_thread_list_t;
        worker_thread_list_t thread_pool_;

        boost::mutex mutex_;
        boost::condition condition_;

        boost::thread_group group_;

    public:
        channel(std::size_t threads)
            : thread_pool_(threads)
        {
            for (std::size_t i = 0; i < thread_pool_.size(); i++)
            {
                thread_pool_[i].reset(new worker_thread("Worker-" + to_string(i), this));
            }
        }
        ~channel()
        {
            group_.join_all();
        }

        void start_workers()
        {
            BOOST_FOREACH (boost::shared_ptr<worker_thread>& wt, thread_pool_)
            {
                group_.create_thread(boost::bind(&worker_thread::run, wt));
            }
        }

        void put_request(boost::shared_ptr<request> r)
        {
            boost::mutex::scoped_lock lock(mutex_);

            while (request_queue_.size() >= MAX_REQUEST)
            {
                condition_.wait(lock);
            }
            request_queue_.push(r);
            condition_.notify_all();
        }

        boost::shared_ptr<request> take_request()
        {
            boost::mutex::scoped_lock lock(mutex_);

            while (request_queue_.size() <= 0)
            {
                condition_.wait(lock);
            }
            boost::shared_ptr<request> r = request_queue_.front();
            request_queue_.pop();
            condition_.notify_all();
            return r;
        }
    };
}}

#endif // MTDP_WORKER_THREAD_CHANNEL_H_INCLUDED

worker_thread.h

#ifndef MTDP_WORKER_THREAD_WORKER_THREAD_H_INCLUDED
#define MTDP_WORKER_THREAD_WORKER_THREAD_H_INCLUDED

#include <string>
#include "request.h"

namespace mtdp{ namespace worker_thread
{
    class channel;

    class worker_thread
    {
    private:
        const std::string name_;
        channel* const channel_;

    public:
        worker_thread(std::string name, channel* c)
            : name_(name), channel_(c)
        {
        }

        void run()
        {
            _run(0);
        }

        std::string name() const
        {
            return name_;
        }

    private:
        // 循環参照になるので template にして実体化を遅らせる。
        template<class T>
        void _run(T)
        {
            while (true)
            {
                boost::shared_ptr<request> r = channel_->take_request();
                r->execute(this);
            }
        }
    };
}}

#endif // MTDP_WORKER_THREAD_WORKER_THREAD_H_INCLUDED