Active Object


非同期のメッセージを受け取り、それを能動的に処理を行い、結果を返すパターン。


以下は増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編のサンプルを Boost.Thread を使って書いたコード。


このサンプルはたくさんのファイルがあります。以下にまとめました。

ネームスペース ファイル名 解説
(mtdp::active_object) main.cpp 動作テスト用のクラス
(mtdp::active_object) maker_client_thread.h 文字列作成を依頼するスレッド
(mtdp::active_object) display_client_thread.h 文字列表示を依頼するスレッド
activeobject activeobject/active_object.h 「能動的なオブジェクト」のインターフェースクラス
activeobject activeobject/active_object_factory.h 「能動的なオブジェクト」を生成するクラス
activeobject::detail activeobject/detail/proxy.h メソッド呼び出しを method_request のオブジェクトに変換するクラス(active_object インターフェースを実装)
activeobject::detail activeobject/detail/scheduler_thread.h method_request オブジェクトを execute するクラス
activeobject::detail activeobject/detail/activation_queue.h method_request オブジェクトを順序よく保持するクラス
activeobject::detail activeobject/detail/method_request_base.h リクエストを表現する抽象クラス
activeobject::detail activeobject/detail/method_request.h リクエストを表現するテンプレート抽象クラス。
method_request_base クラスのサブクラス
activeobject::detail activeobject/detail/make_string_request.h make_string メソッド(文字列作成)に対応するクラス。
method_request クラスのサブクラス
activeobject::detail activeobject/detail/display_string_request.h display_string メソッド(文字列表示)に対応するクラス。
method_request クラスのサブクラス
activeobject activeobject/result.h 実行結果を表現する抽象クラス
activeobject::detail activeobject/detail/future_result.h Future パターンで、実行結果を表現するクラス
activeobject::detail activeobject/detail/real_result.h 実際の実行結果を表現するクラス
activeobject::detail activeobject/detail/servant.h 実際の処理を行うクラス(active_object インターフェースを実装)

能動的なオブジェクトに関するクラスは activeobject 以下のネームスペースにまとめてあります。


activeobject::detail ネームスペースは内部でしか使用しないクラスです。
つまり、外部から使用するクラスは active_object, active_object_factory, result のみです。


main.cpp

#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/bind.hpp>
#include "activeobject/active_object.h"
#include "activeobject/active_object_factory.h"
#include "maker_client_thread.h"
#include "display_client_thread.h"

namespace mtdp{ namespace active_object
{
    void main()
    {
        boost::shared_ptr<activeobject::active_object> ao(activeobject::active_object_factory::create_active_object());
        boost::thread_group group;
        group.create_thread(boost::bind(&maker_client_thread::run, boost::shared_ptr<maker_client_thread>(new maker_client_thread("Alice", ao))));
        group.create_thread(boost::bind(&maker_client_thread::run, boost::shared_ptr<maker_client_thread>(new maker_client_thread("Bobby", ao))));
        group.create_thread(boost::bind(&display_client_thread::run, boost::shared_ptr<display_client_thread>(new display_client_thread("Chris", ao))));
        group.join_all();
    }
}}

maker_client_thread.h

#ifndef MTDP_ACTIVE_OBJECT_MAKER_CLIENT_THREAD_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_MAKER_CLIENT_THREAD_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include <string>
#include "activeobject/active_object.h"
#include "activeobject/result.h"
#include "../thread_helper.h"

namespace mtdp{ namespace active_object
{
    class maker_client_thread
    {
    private:
        const std::string name_;
        const boost::shared_ptr<activeobject::active_object> active_object_;
        const char fillchar_;

    public:
        maker_client_thread(std::string name, boost::shared_ptr<activeobject::active_object> ao)
            : name_(name), active_object_(ao), fillchar_(name_[0])
        {
        }

        void run()
        {
            for (int i = 0; true; i++)
            {
                // 戻り値のある呼び出し
                boost::shared_ptr<activeobject::result<std::string> > result = active_object_->make_string(i, fillchar_);
                thread_helper::sleep(10);
                std::string value = result->get_result_value();
                thread_helper::shared_cout(name_ + ": value = " + value + "\n");
            }
        }
    };
}}

#endif // MTDP_ACTIVE_OBJECT_MAKER_CLIENT_THREAD_H_INCLUDED

display_client_thread.h

#ifndef MTDP_ACTIVE_OBJECT_DISPLAY_CLIENT_THREAD_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_DISPLAY_CLIENT_THREAD_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include <string>
#include "activeobject/active_object.h"
#include "../thread_helper.h"

namespace mtdp{ namespace active_object
{
    class display_client_thread
    {
    private:
        const std::string name_;
        const boost::shared_ptr<activeobject::active_object> active_object_;

    public:
        display_client_thread(std::string name, boost::shared_ptr<activeobject::active_object> ao)
            : name_(name), active_object_(ao)
        {
        }

        void run()
        {
            for (int i = 0; true; i++)
            {
                // 戻り値のない呼び出し
                std::string str(name_ + " " + to_string(i));
                active_object_->display_string(str);
                thread_helper::sleep(200);
            }
        }
    };
}}

#endif // MTDP_ACTIVE_OBJECT_DISPLAY_CLIENT_THREAD_H_INCLUDED

activeobject/active_object.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_ACTIVE_OBJECT_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_ACTIVE_OBJECT_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include <string>
#include "result.h"

namespace mtdp{ namespace active_object{ namespace activeobject
{
    class active_object
    {
    public:
        virtual boost::shared_ptr<result<std::string> > make_string(int count, char fillchar) = 0;
        virtual void display_string(std::string str) = 0;
    };
}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_ACTIVE_OBJECT_H_INCLUDED

activeobject/active_object_factory.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_ACTIVE_OBJECT_FACTORY_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_ACTIVE_OBJECT_FACTORY_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
#include "active_object.h"
#include "detail/servant.h"
#include "detail/activation_queue.h"
#include "detail/scheduler_thread.h"
#include "detail/proxy.h"

namespace mtdp{ namespace active_object{ namespace activeobject
{
    class active_object_factory
    {
    public:
        static boost::shared_ptr<active_object> create_active_object()
        {
            boost::shared_ptr<detail::servant> serv(new detail::servant());
            boost::shared_ptr<detail::activation_queue> queue(new detail::activation_queue());
            boost::shared_ptr<detail::scheduler_thread> sch(new detail::scheduler_thread(queue));
            boost::shared_ptr<detail::proxy> px(new detail::proxy(sch, serv));

            static boost::thread_group group;
            group.create_thread(boost::bind(&detail::scheduler_thread::run, sch));

            return px;
        }
    };
}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_ACTIVE_OBJECT_FACTORY_H_INCLUDED

activeobject/detail/proxy.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_PROXY_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_PROXY_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include "servant.h"
#include "future_result.h"
#include "make_string_request.h"
#include "display_string_request.h"
#include "../active_object.h"
#include "../result.h"

namespace mtdp{ namespace active_object{ namespace activeobject{ namespace detail
{
    class proxy : public active_object
    {
    private:
        boost::shared_ptr<scheduler_thread> scheduler_;
        boost::shared_ptr<servant> servant_;

    public:
        proxy(boost::shared_ptr<scheduler_thread> scheduler, boost::shared_ptr<servant> serv)
            : scheduler_(scheduler), servant_(serv)
        {
        }

        virtual boost::shared_ptr<result<std::string> > make_string(int count, char fillchar)
        {
            boost::shared_ptr<future_result<std::string> > future(new future_result<std::string>());
            scheduler_->invoke(boost::shared_ptr<make_string_request>(new make_string_request(servant_, future, count, fillchar)));
            return future;
        }

        virtual void display_string(std::string str)
        {
            scheduler_->invoke(boost::shared_ptr<display_string_request>(new display_string_request(servant_, str)));
        }
    };
}}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_PROXY_H_INCLUDED

activeobject/detail/scheduler_thread.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_SCHEDULER_THREAD_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_SCHEDULER_THREAD_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include "activation_queue.h"
#include "method_request_base.h"

namespace mtdp{ namespace active_object{ namespace activeobject{ namespace detail
{
    class scheduler_thread
    {
    private:
        boost::shared_ptr<activation_queue> queue_;

    public:
        scheduler_thread(boost::shared_ptr<activation_queue> queue)
            : queue_(queue)
        {
        }

        void invoke(boost::shared_ptr<method_request_base> req)
        {
            queue_->put_request(req);
        }

        void run()
        {
            while (true)
            {
                boost::shared_ptr<method_request_base> request = queue_->take_request();
                request->execute();
            }
        }
    };
}}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_SCHEDULER_THREAD_H_INCLUDED

activeobject/detail/activation_queue.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_ACTIVATION_QUEUE_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_ACTIVATION_QUEUE_H_INCLUDED

#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <queue>
#include "method_request.h"

namespace mtdp{ namespace active_object{ namespace activeobject{ namespace detail
{
    class activation_queue
    {
    private:
        static const std::size_t MAX_METHOD_REQUEST = 100;
        std::queue<boost::shared_ptr<method_request_base> > queue_;

        boost::mutex mutex_;
        boost::condition condition_;

    public:
        void put_request(boost::shared_ptr<method_request_base> request)
        {
            boost::mutex::scoped_lock lock(mutex_);

            while (queue_.size() >= MAX_METHOD_REQUEST)
            {
                condition_.wait(lock);
            }
            queue_.push(request);
            condition_.notify_all();
        }

        boost::shared_ptr<method_request_base> take_request()
        {
            boost::mutex::scoped_lock lock(mutex_);

            while (queue_.size() <= 0)
            {
                condition_.wait(lock);
            }
            boost::shared_ptr<method_request_base> request = queue_.front();
            queue_.pop();
            condition_.notify_all();
            return request;
        }
    };
}}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_ACTIVATION_QUEUE_H_INCLUDED

activeobject/detail/method_request_base.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_METHOD_REQUEST_BASE_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_METHOD_REQUEST_BASE_H_INCLUDED

namespace mtdp{ namespace active_object{ namespace activeobject{ namespace detail
{
    class method_request_base
    {
    public:
        virtual void execute() = 0;
    };
}}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_METHOD_REQUEST_BASE_H_INCLUDED

activeobject/detail/method_request.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_METHOD_REQUEST_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_METHOD_REQUEST_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include "servant.h"
#include "future_result.h"
#include "method_request_base.h"

namespace mtdp{ namespace active_object{ namespace activeobject{ namespace detail
{
    template<class T>
    class method_request : public method_request_base
    {
    protected:
        boost::shared_ptr<servant> servant_;
        boost::shared_ptr<future_result<T> > future_;

        method_request(boost::shared_ptr<servant> serv, boost::shared_ptr<future_result<T> > future)
            : servant_(serv), future_(future)
        {
        }
    };
}}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_METHOD_REQUEST_H_INCLUDED

activeobject/detail/make_string_request.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_MAKE_STRING_REQUEST_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_MAKE_STRING_REQUEST_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include "method_request.h"
#include "../result.h"
#include "future_result.h"

namespace mtdp{ namespace active_object{ namespace activeobject{ namespace detail
{
    class make_string_request : public method_request<std::string>
    {
    private:
        const int count_;
        const char fillchar_;

    public:
        make_string_request(boost::shared_ptr<servant> serv, boost::shared_ptr<future_result<std::string> > future, int count, char fillchar)
            : method_request<std::string>(serv, future), count_(count), fillchar_(fillchar)
        {
        }

        virtual void execute()
        {
            boost::shared_ptr<result<std::string> > res = servant_->make_string(count_, fillchar_);
            future_->set_result(res);
        }
    };
}}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_MAKE_STRING_REQUEST_H_INCLUDED

activeobject/detail/display_string_request.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_DISPLAY_STRING_REQUEST_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_DISPLAY_STRING_REQUEST_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include <string>
#include "method_request.h"

namespace mtdp{ namespace active_object{ namespace activeobject{ namespace detail
{
    class display_string_request : public method_request<boost::shared_ptr<void> >
    {
    private:
        const std::string string_;

    public:
        display_string_request(boost::shared_ptr<servant> serv, std::string str)
            : method_request<boost::shared_ptr<void> >(serv, boost::shared_ptr<future_result<boost::shared_ptr<void>> >()), string_(str)
        {
        }

        virtual void execute()
        {
            servant_->display_string(string_);
        }
    };
}}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_DISPLAY_STRING_REQUEST_H_INCLUDED

activeobject/result.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_RESULT_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_RESULT_H_INCLUDED

#include <boost/shared_ptr.hpp>

namespace mtdp{ namespace active_object{ namespace activeobject
{
    template<class T>
    class result
    {
    public:
        virtual T get_result_value() = 0;
    };
}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_RESULT_H_INCLUDED

activeobject/detail/future_result.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_FUTURE_RESULT_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_FUTURE_RESULT_H_INCLUDED

#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include "../result.h"

namespace mtdp{ namespace active_object{ namespace activeobject{ namespace detail
{
    template<class T>
    class future_result : public result<T>
    {
    private:
        boost::shared_ptr<result<T> > result_;
        volatile bool ready_;

        boost::mutex mutex_;
        boost::condition condition_;

    public:
        future_result() : ready_(false) { }

        void set_result(boost::shared_ptr<result<T> > res)
        {
            boost::mutex::scoped_lock lock(mutex_);

            result_ = res;
            ready_ = true;
            condition_.notify_all();
        }

        virtual T get_result_value()
        {
            boost::mutex::scoped_lock lock(mutex_);

            while (!ready_)
            {
                condition_.wait(lock);
            }
            return result_->get_result_value();
        }
    };
}}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_FUTURE_RESULT_H_INCLUDED

activeobject/detail/real_result.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_REAL_RESULT_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_REAL_RESULT_H_INCLUDED

#include "../result.h"

namespace mtdp{ namespace active_object{ namespace activeobject{ namespace detail
{
    template<class T>
    class real_result : public result<T>
    {
    private:
        T result_value_;

    public:
        real_result(T value)
            : result_value_(value)
        {
        }

        virtual T get_result_value()
        {
            return result_value_;
        }
    };
}}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_REAL_RESULT_H_INCLUDED

activeobject/detail/servant.h

#ifndef MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_SERVANT_H_INCLUDED
#define MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_SERVANT_H_INCLUDED

#include <boost/shared_ptr.hpp>
#include <vector>
#include <string>
#include "../active_object.h"
#include "../result.h"
#include "real_result.h"
#include "../../../thread_helper.h"

namespace mtdp{ namespace active_object{ namespace activeobject{ namespace detail
{
    class servant : public active_object
    {
    public:
        virtual boost::shared_ptr<result<std::string> > make_string(int count, char fillchar)
        {
            std::vector<char> buffer;
            buffer.reserve(count);
            for (int i = 0; i < count; i++)
            {
                buffer.push_back(fillchar);
                thread_helper::sleep(100);
            }
            return boost::shared_ptr<real_result<std::string> >(new real_result<std::string>(std::string(buffer.begin(), buffer.end())));
        }
        virtual void display_string(std::string str)
        {
            thread_helper::shared_cout("display_string: " + str + "\n");
            thread_helper::sleep(10);
        }
    };
}}}}

#endif // MTDP_ACTIVE_OBJECT_ACTIVEOBJECT_DETAIL_SERVANT_H_INCLUDED