try-and-back-off アルゴリズム
C++11 には、複数の Lockable なオブジェクトをロックしてくれる std::lock 関数があります。
template<class L1, class L2, class... L3> void lock(L1& m1, L2& m2, L3&... m3);
この関数の最大の特徴は、決してデッドロックしないことです。
そして、このデッドロックしないという要件を満たすために使われるアルゴリズムが、try-and-back-off アルゴリズムと呼ばれるものです。
今回はこの try-and-back-off アルゴリズムについて説明します。
(この記事には独自解釈が含まれているので、間違ってる部分とかあれば指摘して頂けると嬉しいです)
デッドロックの条件
デッドロックは、正確ではないですが、大まかに言って以下の条件を満たすと発生します。
- ロックを取得するために待機を行なう
- あるスレッドが、2つ以上の Lockable オブジェクトのロックを(待機するロックで)獲得している
- それらの Lockable オブジェクトのロックする順番が決まっていない
このどれかを崩せばデッドロックは発生しません。
つまり、待機しないロック(try_lock)を使ったり、ロックを1個だけにしたり、ロックする順番を常に一定にすれば、デッドロックは発生しないのです。
単純な解決策
try_lock を使ったり、ロックの取得を1個だけにするのは難しいので、ロックする順番を一定にするというのが一番簡単な解決策です。
template<class Lock> void lock_simple(Lock& m1, Lock& m2) { if (m1.native_handle() < m2.native_handle()) { m1.lock(); m2.lock(); } else { m2.lock(); m1.lock(); } }
何らかの一意な方法で順序付けをしてやる必要があるので、Lockable オブジェクトが保持しているミューテックスのアドレスで順序付けしてやっています。(native_handle は Lockable の要件に無いというのはキニシナイ。)
こうすることで、パラメータにどんな順序でやってきても、常に一定の順序でロックすることができます。
しかし、これでは lock 関数の要件を満たせません。なぜなら、lock_simple 関数を使わずに手動でロックを行った場合、デッドロックが起こる可能性があるからです。
void threadA() { unique_lock<mutex> m1(mutex1, std::defer_lock); unique_lock<mutex> m2(mutex2, std::defer_lock); lock_simple(m1, m2); ... } void threadB() { unique_lock<mutex> m1(mutex1); unique_lock<mutex> m2(mutex2); }
threadB は m1, m2 という順序でロックしています。
もし lock_simple 関数の中で m2, m1 という順序でロックしていた場合、順序が一定でなくなるため、デッドロックが起こる可能性があるのです。
try-and-back-off アルゴリズム
そこで try-and-back-off アルゴリズムの出番です。
このアルゴリズムは簡単に言うと「全部のロックが取れるまでひたすらループする」というものです。
一番簡単な実装は、以下のようになります。
template<class L1, class L2> void lock(L1& m1, L2& m2) { while (true) { if (m1.try_lock()) { if (m2.try_lock()) { return; // m1, m2 のロック獲得に成功 } m1.unlock(); } } }
try_lock を使ってひたすらスピンロックしています。try_lock を使っているのでデッドロックは発生しません。
また、ループする前に途中まで獲得したロックを解放している(back-off)ので、他のスレッドが m1, m2 のロックを取れずに進行不能になるということはありません。
他のスレッドが進行できるので、m1, m2 のロックはいつか解放され、いつかは両方のロックが成功します。
(m1, m2 のロックが運悪く必ず交互に取られていた場合は一生取れない(ライブロック)のですが、そういうのは考えないことにします)
しかしこの実装には問題があります。スピンロックであるため、CPU リソースを取り過ぎるのです。
これを解決するために、以下の様な実装にします。
template<class L1, class L2> void lock(L1& m1, L2& m2) { while (true) { m1.lock(); if (m2.try_lock()) { return; // m1, m2 のロック獲得に成功 } m1.unlock(); } }
m1 のロックには lock 関数を使うようにしました。m1 のロックが取れるまで待機するため、ここで止まってくれれば CPU リソースを取りません。
しかもこれはデッドロックになりません。なぜなら、待機するロックを1回だけしか行なっていないからです。
デッドロックの発生条件である「2つ以上のロックを行なっている」を、ギリギリまで攻めて、そこから try_lock を使うようにしているのです。
ただ、これでもまだ問題があります。例えば、
void threadA() { ... lock(m1, m2); ... } void threadB() { ... m2.lock(); ... // すごく長い処理 }
もし threadB を実行しているときに threadA の lock 関数が呼ばれた場合、m1.lock() は待機すること無く成功し、m2.try_lock() に失敗する、というループを繰り返します。
このループは待機を行なわないため、これも CPU リソースを取りすぎてしまいます。
これを解決するため、以下のように実装します。
template<class L1, class L2> void lock(L1& m1, L2& m2) { while (true) { m1.lock(); if (m2.try_lock()) { return; // m1, m2 のロック獲得に成功 } m1.unlock(); m2.lock(); if (m1.try_lock()) { return; // m1, m2 のロック獲得に成功 } m2.unlock(); } }
これなら、m2 だけロックが掛かっている場合であっても、m2.lock() で待機するようになるため、CPU リソースを取りません。
また、1つしか待機するロックをしていないので、デッドロックも発生しません。
これが try-and-back-off アルゴリズムです。
std::lock
上記の例では2個のロックを考えましたが、std::lock は可変長引数なので、実装はもう少し複雑になります。
にちゃんとした説明と実装があります。