非同期でデータをやりとりする用の有限キューを作ってみた

public final class FiniteBufferQueue
{
    int first;
    int size;
    final byte[][] bufs;
    public FiniteBufferQueue(int size)
    {
        bufs = new byte[size][];
    }
    
    public synchronized void push(byte[] buf)
        throws InterruptedException
    {
        push(buf, 0, buf.length);
    }
    public synchronized void push(byte[] buf, int index, int length)
        throws InterruptedException
    {
        while (size == bufs.length)
        {
            wait();
        }
        byte[] copyBuf = new byte[length];
        System.arraycopy(buf, index, copyBuf, 0, length);
        bufs[(first + size) % bufs.length] = copyBuf;
        size++;
        notifyAll();
    }
    
    public synchronized byte[] pop()
        throws InterruptedException
    {
        while (size == 0)
        {
            wait();
        }
        byte[] buf = bufs[first];
        first = (first + 1) % bufs.length;
        --size;
        notifyAll();
        return buf;
    }
}

どっかのスレッドがひたすら push() しまくって、どっかのスレッドがひたすら pop() しまくればやりとりできるはず。


Object じゃなくて byte 配列なのは、コピーしないとオブジェクトの中身が書きかえられちゃうから。byte 配列じゃなくて Cloneable でもいいような気はするけど、byte 配列だけあれば十分でしょという感じで。
中身を書き換えないからコピーしなくていいよ!書き換える場合はこっちでコピーするよ!という要求もあるだろうから、ユーザ側で安全に使う必要のあるクラスとか作ってみてもいいかも。これなら Object 型でもいける。
多分こんな感じ。

public final class UnsafeFiniteQueue
{
    int first;
    int size;
    final Object[] objs;
    public UnsafeFiniteQueue(int size)
    {
        objs = new Object[size];
    }
    
    public synchronized void push(Object obj)
        throws InterruptedException
    {
        while (size == objs.length)
        {
            wait();
        }
        objs[(first + size) % objs.length] = obj;
        size++;
        notifyAll();
    }
    
    public synchronized Object pop()
        throws InterruptedException
    {
        while (size == 0)
        {
            wait();
        }
        Object obj = objs[first];
        first = (first + 1) % objs.length;
        --size;
        notifyAll();
        return obj;
    }
}