スレッドの同期呼び出し

を作ってみた。

#ifndef ANMELT_THREAD_INVOKE_H_INCLUDED
#define ANMELT_THREAD_INVOKE_H_INCLUDED

// Boost
#include <boost/thread.hpp>
#include <boost/optional.hpp>
#include <boost/noncopyable.hpp>
#include <boost/ref.hpp>
#include <boost/utility/enable_if.hpp>
#include <boost/utility/result_of.hpp>
#include <boost/type_traits/is_void.hpp>

namespace anmelt{ namespace thread
{

namespace detail
{

class oneshot_thread
{
private:
    boost::thread* thread_;

public:
    oneshot_thread() : thread_(0) { }
    ~oneshot_thread() { delete thread_; }

    template<class F>
    void request(F f)
    {
        BOOST_ASSERT(thread_ == 0);
        thread_ = new boost::thread(f);
    }
};

template<class F>
struct wait_invoke_t : boost::noncopyable
{
private:
    F f_;
    bool end_;
    boost::mutex mutex_;
    boost::condition condition_;

public:
    wait_invoke_t(F f) : f_(f), end_(false) { }

    void operator()()
    {
        f_();

        boost::mutex::scoped_lock lock(mutex_);
        end_ = true;
        condition_.notify_all();
    }

    void wait()
    {
        boost::mutex::scoped_lock lock(mutex_);
        while (!end_)
        {
            condition_.wait(lock);
        }
    }
};

template<class Result, class F>
struct with_result_t : boost::noncopyable
{
private:
    F f_;
    boost::optional<Result> r_;

public:
    with_result_t(F f) : f_(f) { }

    void operator()()
    {
        r_ = f_();
    }

    Result result() { return *r_; }
};

template<class F, class Thread>
void wait_invoke(F f, Thread& th)
{
    wait_invoke_t<F> w(f);
    th.request(boost::ref(w));
    w.wait();
}

template<class Result, class F, class Thread>
Result wait_invoke_result(F f, Thread& th)
{
    with_result_t<Result, F> r(f);
    wait_invoke_t<with_result_t<Result, F>&> w(r);
    th.request(boost::ref(w));
    w.wait();
    return r.result();
}

} // namespace detail

template<class F>
void invoke(F f,
    typename boost::enable_if<boost::is_void<typename boost::result_of<F()>::type> >::type* = 0)
{
    detail::oneshot_thread th;
    invoke(f, th);
}

template<class F>
typename boost::result_of<F()>::type invoke(F f,
    typename boost::disable_if<boost::is_void<typename boost::result_of<F()>::type> >::type* = 0)
{
    detail::oneshot_thread th;
    return invoke(f, th);
}

template<class F, class Thread>
void invoke(F f, Thread& th,
    typename boost::enable_if<boost::is_void<typename boost::result_of<F()>::type> >::type* = 0)
{
    detail::wait_invoke<F, Thread>(f, th);
}

template<class F, class Thread>
typename boost::result_of<F()>::type invoke(F f, Thread& th,
    typename boost::disable_if<boost::is_void<typename boost::result_of<F()>::type> >::type* = 0)
{
    return detail::wait_invoke_result<typename boost::result_of<F()>::type, F, Thread>(f, th);
}

}}

#endif // ANMELT_THREAD_INVOKE_H_INCLUDED

テスト

void func_invoke1(int n)
{
    std::cout << "id: " << boost::this_thread::get_id() << " " << (n * 10) << std::endl;
}

int func_invoke2(int n)
{
    std::cout << "id: " << boost::this_thread::get_id() << " ";
    return n * 100;
}

void main()
{
    // 新しくスレッドを作って invoke
    anmelt::thread::invoke(boost::bind(func_invoke1, 10));
    std::cout << anmelt::thread::invoke(boost::bind(func_invoke2, 10)) << std::endl;

    // worker_thread を使って invoke
    typedef anmelt::thread::blocking_queue<boost::function<void ()> > queue_t;
    anmelt::thread::worker_thread<queue_t> th(boost::shared_ptr<queue_t>(new queue_t));

    anmelt::thread::invoke(boost::bind(func_invoke1, 10), th);
    std::cout << anmelt::thread::invoke(boost::bind(func_invoke2, 10), th) << std::endl;

    // thread_pool を使って invoke
    anmelt::thread::thread_pool pool(5);

    anmelt::thread::invoke(boost::bind(func_invoke1, 10), pool);
    std::cout << anmelt::thread::invoke(boost::bind(func_invoke2, 10), pool) << std::endl;
}

結果

id: 001535B0 100
id: 001535B0 1000
id: 001535B0 100
id: 001535B0 1000
id: 001531C0 100
id: 00153850 1000

invoke() に関数を渡すと、別のスレッドでその関数を呼び出し、実行が完了するのを待ち、結果を返すというもの。


普通に関数呼び出せばええやん?という感じだけど、こういうのを作ると何が嬉しいかというと、同一のスレッドからしかアクセスできないようなクラス(COM クラスとか)に対して、複数のスレッドから invoke 経由でアクセスできるようになる。