スレッドプール

Worker Thread パターンでスレッドプールを作ってみた。


worker_thread.h

#ifndef ANMELT_THREAD_WORKER_THREAD_H_INCLUDED
#define ANMELT_THREAD_WORKER_THREAD_H_INCLUDED

// Boost
#include <boost/thread.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/function.hpp>
#include <boost/bind.hpp>
#include <boost/noncopyable.hpp>

namespace anmelt{ namespace thread
{

template<class Container>
class worker_thread : boost::noncopyable
{
private:
    const boost::shared_ptr<Container> container_;
    const boost::shared_ptr<boost::thread> thread_;

public:
    worker_thread(const boost::shared_ptr<Container>& container)
        : container_(container)
    {
        const_cast<boost::shared_ptr<boost::thread>&>(thread_).reset(
            new boost::thread(boost::bind(&worker_thread<Container>::run, this)));
    }

    boost::thread& this_thread()
    {
        return *thread_;
    }

    template<class F>
    void request(F f)
    {
        container_->put(f);
    }

private:
    void run()
    {
        try
        {
            while (true)
            {
                boost::function<void ()> f;
                container_->get(f);
                if (!f.empty())
                {
                    f();
                }
            }
        }
        catch (boost::thread_interrupted&)
        {
        }
    }
};

}}

#endif // ANMELT_THREAD_WORKER_THREAD_H_INCLUDED

thread_pool.h

#ifndef ANMELT_THREAD_THREAD_POOL_H_INCLUDED
#define ANMELT_THREAD_THREAD_POOL_H_INCLUDED

#include <anmelt/thread/mutexed.h>
#include <anmelt/thread/worker_thread.h>
#include <anmelt/thread/containers.h>

// Boost
#include <boost/bind.hpp>
#include <boost/cstdint.hpp>
#include <boost/noncopyable.hpp>

// STL
#include <algorithm>
#include <list>

namespace anmelt{ namespace thread
{

class thread_pool : boost::noncopyable
{
private:
    boost::mutex mutex_;
    boost::condition condition_;

    typedef blocking_queue<boost::function<void ()> > queue;
    typedef worker_thread<queue> worker_thread;
    typedef boost::shared_ptr<worker_thread> thread_ptr;
    typedef std::list<thread_ptr> thread_list;
    typedef boost::shared_ptr<queue> queue_ptr;

    const queue_ptr queue_;
    mutexed<thread_list> threads_;

public:
    thread_pool(boost::uint32_t size)
        : queue_(new queue)
    {
        for (boost::uint32_t i = 0; i < size; i++)
        {
            threads_.value().push_back(thread_ptr(new worker_thread(queue_)));
        }
    }

    ~thread_pool()
    {
        for_each(boost::bind(&boost::thread::interrupt, boost::bind(&worker_thread::this_thread, _1)));
        for_each(boost::bind(&boost::thread::join, boost::bind(&worker_thread::this_thread, _1)));
    }

    template<class F>
    void request(F f)
    {
        queue_->put(f);
    }

    std::size_t size() const
    {
        return threads_.apply(boost::bind(&thread_list::size, _1));
    }

private:
    template<class F>
    void for_each(F f)
    {
        struct pred : std::unary_function<thread_list, void>
        {
            F& f_;
            pred(F& f) : f_(f) { }
            void operator()(thread_list& list) const
            {
                std::for_each(list.begin(), list.end(), f_);
            }
        };
        threads_.apply(pred(f));
    }
};

}}

#endif // ANMELT_THREAD_THREAD_POOL_H_INCLUDED

テスト

boost::mutex g_mutex;

void func()
{
    boost::mutex::scoped_lock lock(g_mutex);
    std::cout << "id: " << boost::this_thread::get_id() << std::endl;
}

void main()
{
    anmelt::thread::thread_pool pool(5);
    for (int i = 0; i < 10; i++)
    {
        pool.request(func);
    }
}

結果

id: 00153460
id: 00153460
id: 00153490
id: 00153460
id: 00153490
id: 001531C0
id: 001535E0
id: 00153490
id: 001531C0
id: 00153310

スレッドプールにリクエストを出すと、それをプールにあるどれかのスレッドが実行します。
動的にスレッドの数を増やしたり減らしたりとか設定が出来るといいんだけど、とりあえずはこれだけ。