Read-Write Lock
書き込みを行っている場合にだけ排他制御を行うパターン。
Boost には read_write_mutex というのがあって、これを使えば Read-Write Lock を実現することがでk……ってバージョン1.34.1 だとドキュメントにはあるのにファイルが無いよ!消えてるよ!
どうやら read_write_mutex にバグがあったみたいで、今は消えてしまっているようです。
→追記: バージョン 1.35.0 以降からは boost::shared_mutex というのが用意されているので、それを使えばよさそうです。
ということで実現するためにはちゃんとサンプル通りの read_write_mutex(サンプルでは ReadWriteLock という名前だったのだけれども、namespace と被ってしまったので read_write_mutex という名前にしている)を実装してやる必要がありそうです。
ということで以下は増補改訂版 Java言語で学ぶデザインパターン入門 マルチスレッド編のサンプルを Boost.Thread を使って書いたコード。
scoped_read_lock や scoped_write_lock といったデストラクタで確実に Before/After パターンの After 処理を行えるクラスが使えるのが C++ の特徴ですね。
ただまあ、やっぱり finally 節は欲しいですが……。
ちなみに thread_helper::sleep() というのは、sleep 処理を長々と書くのが面倒になったので mtdp ネームスペースにヘルパ用のクラスを作っておきました。
main.cpp
#include <boost/thread.hpp> #include <boost/shared_ptr.hpp> #include <boost/bind.hpp> #include "data.h" #include "reader_thread.h" #include "writer_thread.h" namespace mtdp{ namespace read_write_lock { void main() { boost::shared_ptr<data> d(new data(10)); boost::thread_group group; group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d)))); group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d)))); group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d)))); group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d)))); group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d)))); group.create_thread(boost::bind(&reader_thread::run, boost::shared_ptr<reader_thread>(new reader_thread(d)))); group.create_thread(boost::bind(&writer_thread::run, boost::shared_ptr<writer_thread>(new writer_thread(d, "ABCDEFGHIJKLMNOPQRSTUVWXYZ")))); group.create_thread(boost::bind(&writer_thread::run, boost::shared_ptr<writer_thread>(new writer_thread(d, "abcdefghijklmnopqrstuvwxyz")))); group.join_all(); } }}
data.h
#ifndef MTDP_READ_WRITE_LOCK_DATA_H_INCLUDED #define MTDP_READ_WRITE_LOCK_DATA_H_INCLUDED #include <boost/thread.hpp> #include <boost/foreach.hpp> #include <vector> #include "read_write_mutex.h" #include "../thread_helper.h" namespace mtdp{ namespace read_write_lock { class data { private: std::vector<char> buffer_; read_write_mutex rw_mutex_; public: data(std::size_t size) : buffer_(size, L'*') { } std::vector<char> read() { scoped_read_lock lock(rw_mutex_); return do_read(); } void write(char c) { scoped_write_lock lock(rw_mutex_); do_write(c); } private: std::vector<char> do_read() { slowly(); return buffer_; } void do_write(char c) { BOOST_FOREACH (char& _c, buffer_) { _c = c; slowly(); } } void slowly() { thread_helper::sleep(50); } }; }} #endif // MTDP_READ_WRITE_LOCK_DATA_H_INCLUDED
writer_thread.hpp
#ifndef MTDP_READ_WRITE_LOCK_WRITER_THREAD_H_INCLUDED #define MTDP_READ_WRITE_LOCK_WRITER_THREAD_H_INCLUDED #include <boost/shared_ptr.hpp> #include <boost/random.hpp> #include <boost/nondet_random.hpp> #include <time.h> #include "data.h" #include "../thread_helper.h" namespace mtdp{ namespace read_write_lock { class writer_thread { private: const boost::shared_ptr<data> data_; const std::string filler_; std::size_t index_; public: writer_thread(boost::shared_ptr<data> d, std::string filler) : data_(d), filler_(filler), index_(0) { } void run() { static boost::variate_generator<boost::mt19937, boost::uniform_int<> > random( boost::mt19937((boost::uint32_t)::time(0)), boost::uniform_int<>(0, 3000)); while (true) { char c = nextchar(); data_->write(c); thread_helper::sleep(random()); } } private: char nextchar() { char c = filler_[index_]; index_++; if (index_ >= filler_.size()) { index_ = 0; } return c; } }; }} #endif // MTDP_READ_WRITE_LOCK_WRITER_THREAD_H_INCLUDED
reader_thread.hpp
#ifndef MTDP_READ_WRITE_LOCK_READER_THREAD_H_INCLUDED #define MTDP_READ_WRITE_LOCK_READER_THREAD_H_INCLUDED #include <boost/shared_ptr.hpp> #include <vector> #include <string> #include <iostream> #include "data.h" #include "../thread_helper.h" // for ::GetCurrentThreadId #include <windows.h> namespace mtdp{ namespace read_write_lock { class reader_thread { private: const boost::shared_ptr<data> data_; public: reader_thread(boost::shared_ptr<data> d) : data_(d) { } void run() { while (true) { std::vector<char> readbuf = data_->read(); thread_helper::shared_cout(to_string(::GetCurrentThreadId()) + " reads " + std::string(readbuf.begin(), readbuf.end()) + "\n"); } } }; }} #endif // MTDP_READ_WRITE_LOCK_READER_THREAD_H_INCLUDED
read_write_mutex.hpp
#ifndef MTDP_READ_WRITE_LOCK_READ_WRITE_MUTEX_H_INCLUDED #define MTDP_READ_WRITE_LOCK_READ_WRITE_MUTEX_H_INCLUDED #include <boost/thread.hpp> namespace mtdp{ namespace read_write_lock { class read_write_mutex { private: volatile std::size_t reading_readers_; // (A) 実際に読んでいる最中のスレッドの数 volatile std::size_t waiting_writers_; // (B) 書くのを待っているスレッドの数 volatile std::size_t writing_writers_; // (C) 実際に書いている最中のスレッドの数 volatile bool prefer_writer_; // 書くのを優先するなら true boost::mutex mutex_; boost::condition condition_; public: read_write_mutex() : reading_readers_(0), waiting_writers_(0), writing_writers_(0), prefer_writer_(true) { } void read_lock() { boost::mutex::scoped_lock lock(mutex_); while (writing_writers_ > 0 || (prefer_writer_ && waiting_writers_ > 0)) { condition_.wait(lock); } reading_readers_++; // (A) 実際に読んでいるスレッドの数を1増やす } void read_unlock() { boost::mutex::scoped_lock lock(mutex_); reading_readers_--; // (A) 実際に読んでいるスレッドの数を1減らす prefer_writer_ = true; condition_.notify_all(); } void write_lock() { boost::mutex::scoped_lock lock(mutex_); waiting_writers_++; // (B) 書くのを待っているスレッドの数を1増やす try { while (reading_readers_ > 0 || writing_writers_ > 0) { condition_.wait(lock); } } catch (...) { waiting_writers_--; // (B) 書くのを待っているスレッドの数を1減らす throw; } waiting_writers_--; // (B) 書くのを待っているスレッドの数を1減らす writing_writers_++; // (C) 実際に書いているスレッドの数を1増やす } void write_unlock() { boost::mutex::scoped_lock lock(mutex_); writing_writers_--; // (C) 実際に書いているスレッドの数を1減らす prefer_writer_ = false; condition_.notify_all(); } }; struct scoped_read_lock { read_write_mutex& mutex_; scoped_read_lock(read_write_mutex& mutex) : mutex_(mutex) { mutex_.read_lock(); } ~scoped_read_lock() { mutex_.read_unlock(); } }; struct scoped_write_lock { read_write_mutex& mutex_; scoped_write_lock(read_write_mutex& mutex) : mutex_(mutex) { mutex_.write_lock(); } ~scoped_write_lock() { mutex_.write_unlock(); } }; }} #endif // MTDP_READ_WRITE_LOCK_READ_WRITE_MUTEX_H_INCLUDED