ダウンロードしながらメモリースティックに書き込む処理を非同期でやってみた

環境は DoJa-5.0。リソースのダウンロードと書き込みを別々でやってたら無駄じゃね?とか思ってやってみた。
ちょっといじればいろいろなことに使えるはず。もう少し汎用化できるなら再利用できるクラスにしてもいいような気はする。


各スレッドでのデータのやり取りはこんな感じ。

  • DownloadThread → WriteThread という方向でデータの送信をするために FiniteBufferQueue を使っている。
  • DownloadThread → WriteThread という方向でファイルサイズを通知するために Future を使っている。
  • DownloadThread → DonwloadTask という方向でエラーを通知するために Future を使っている。
  • WriteThread → DownloadTask という方向でエラーを通知するために Future を使っている。

ファイルサイズのやり取りは、初回のアクセスでファイルサイズが決定した時点で WriteThread を生成するような方針にすれば必要なかったりもするんだけど、process() の中がちょっと複雑になるのでやめた。
でもそのせいで結構 WriteThread の中が結構複雑になったのでそこは失敗かも。


以下ソース。結構長い(250 行超え)ので注意。

public final class DownloadTask implements Disposable
{
    // データをキューに入れる用のバッファ
    private static final byte[] tempBuf = new byte[10240];
    // リソースをダウンロードするスレッド
    private static final class DownloadThread implements Runnable, Disposable
    {
        private final FiniteBufferQueue queue;
        private final Thread th;
        private final Promise fileSizePromise;
        private final Promise errorPromise;
        private int downloadBytes;
        
        public DownloadThread(FiniteBufferQueue queue, Promise fileSizePromise, Promise errorPromise)
        {
            this.queue = queue;
            this.fileSizePromise = fileSizePromise;
            this.errorPromise = errorPromise;
            th = new Thread(this);
            th.start();
        }
        public void run()
        {
            int fileSize = 0;
            while (downloadBytes == 0 || downloadBytes < fileSize)
            {
                HttpConnection hc = null;
                InputStream is = null;
                try
                {
                    // リソースのダウンロード。
                    // サーバは X-File-Size というヘッダにファイルサイズを入れて、
                    // ダウンロードしたいリソースの downloadBytes バイト目から 150KB のデータを返す。
                    hc = (HttpConnection)Connector.open(makeUrl(downloadBytes, 150 * 1024), Connector.READ, true);
                    hc.setRequestMethod(HttpConnection.GET);
                    hc.connect();
                    // 初回のデータ取得のみファイルサイズを調べる
                    if (downloadBytes == 0)
                    {
                        fileSize = Integer.parseInt(hc.getHeaderField("X-File-Size"));
                        fileSizePromise.setValue(new Integer(fileSize));
                    }
                    is = hc.openInputStream();
                    // ひたすらダウンロード。queue に空きが無ければ queue の中でブロックする。
                    while (true)
                    {
                        int len = is.read(tempBuf);
                        if (len <= 0) break;
                        queue.push(tempBuf, 0, len);
                        downloadBytes += len;
                    }
                }
                catch (InterruptedException e)
                {
                    return;
                }
                catch (IOException e)
                {
                    errorPromise.setException(e);
                    // 既に値をセットしている可能性がある
                    try { fileSizePromise.setException(e); } catch (Exception ex) { }
                    return;
                }
                catch (Exception e)
                {
                    errorPromise.setException(e);
                    // 既に値をセットしている可能性がある
                    try { fileSizePromise.setException(e); } catch (Exception ex) { }
                    return;
                }
                finally
                {
                    if (is != null) try { is.close(); } catch (IOException e) { }
                    if (hc != null) try { hc.close(); } catch (IOException e) { }
                }
            }
        }
        public void dispose()
        {
            th.interrupt();
        }
        private String makeUrl(int off, int length)
        {
            // ここらで PHP やら CGI やらを叩くための URL を作る
            ...
        }
    }
    // ストレージデバイスに書き込むスレッド
    private static final class WriteThread implements Runnable, Disposable
    {
        private final FiniteBufferQueue queue;
        private final Thread th;
        private final Future fileSizeFuture;
        private final Promise errorPromise;
        private volatile int writeSize;
        
        public WriteThread(FiniteBufferQueue queue, Future fileSizeFuture, Promise errorPromise)
        {
            this.queue = queue;
            this.fileSizeFuture = fileSizeFuture;
            this.errorPromise = errorPromise;
            th = new Thread(this);
            th.start();
        }
        public void run()
        {
            // まずは全体のファイルサイズを取得。
            // 取得に失敗したら終了する。
            int maxSize;
            try
            {
                fileSizeFuture.waitResult();
                maxSize = ((Integer)fileSizeFuture.get()).intValue();
            }
            catch (InterruptedException e) { return; }
            catch (FutureException e) { return; }
            
            FileEntity entity = null;
            OutputStream os = null;
            try
            {
                // 中途半端に残っていた(かもしれない)ファイルを削除
                deleteFile("data.bin");
                
                // ファイルを開く
                Folder folder = StorageDevice.getInstance("/ext0")
                                    .getFolder(DoJaStorageService.getAccessToken(
                                            DoJaAccessToken.ACCESS_PLATFORM, DoJaStorageService.SHARE_APPLICATION));
                File file = folder.createFile("data.bin");
                entity = file.open(File.MODE_WRITE_ONLY);
                os = entity.openOutputStream();
                
                // ひたすら書き込み。queue が空きになったら queue の中でブロックする。
                // ダウンロードサイズと書き込んだサイズが一致すれば終了。
                // maxSize が間違っているということは考えない。
                while (writeSize != maxSize)
                {
                    byte[] buf = queue.pop();
                    os.write(buf);
                    writeSize += buf.length;
                }
            }
            catch (InterruptedException e)
            {
            }
            catch (IOException e)
            {
                errorPromise.setException(e);
            }
            catch (Exception e)
            {
                errorPromise.setException(e);
            }
            finally
            {
                if (os != null) try { os.close(); } catch (IOException e) { }
                if (entity != null) try { entity.close(); } catch (IOException e) { }
            }
        }
        // あまり良くないインターフェースな気がするけど面倒なのでこれで。
        public float getProgress()
        {
            try
            {
                return fileSizeFuture.hasValue() ?
                        (float)writeSize / ((Integer)fileSizeFuture.get()).intValue() :
                        0;
            }
            catch (Exception e) // InterruptedException, FutureException
            {
                // ここに来ることはないはず
                Debug.Assert(false);
                return 0;
            }
        }
        public boolean writeCompleted()
        {
            try
            {
                return fileSizeFuture.hasValue() &&
                        writeSize == ((Integer)fileSizeFuture.get()).intValue() &&
                        !th.isAlive();
            }
            catch (Exception e) // InterruptedException, FutureException
            {
                // ここに来ることはないはず
                Debug.Assert(false);
                return false;
            }
        }
        public void dispose()
        {
            th.interrupt();
        }
        
        private static void deleteFile(String file)
        {
            try
            {
                Folder folder = StorageDevice.getInstance("/ext0")
                                    .getFolder(DoJaStorageService.getAccessToken(
                                            DoJaAccessToken.ACCESS_PLATFORM, DoJaStorageService.SHARE_APPLICATION));
                                    .getFile(file)
                                    .delete();
            }
            catch (Exception e) { }
        }
    }
    
    public static final int RESULT_DOWNLOADING = 0;
    public static final int RESULT_DOWNLOAD_ERROR = 1;
    public static final int RESULT_WRITING_ERROR = 2;
    public static final int RESULT_DOWNLOAD_COMPLETED = 3;
    
    private final DownloadThread downloadThread;
    private final Future downloadErrorFuture;
    private final WriteThread writeThread;
    private final Future writeErrorFuture;
    
    public DownloadTask()
    {
        // 15 個分のデータが込められるキューを用意。
        // tempBuf が 10KB なので、最大で 150KB のデータを保持することになる。
        FiniteBufferQueue queue = new FiniteBufferQueue(15);
        // ファイルサイズやり取り用
        Promise fileSizePromise = new Promise();
        // ダウンロードエラーやり取り用
        Promise downloadErrorPromise = new Promise();
        downloadErrorFuture = downloadErrorPromise.getFuture();
        // 書き込みエラーやり取り用
        Promise writeErrorPromise = new Promise();
        writeErrorFuture = writeErrorPromise.getFuture();
        
        downloadThread = new DownloadThread(queue, fileSizePromise, downloadErrorPromise);
        writeThread = new WriteThread(queue, fileSizePromise.getFuture(), writeErrorPromise);
    }
    
    // これは毎フレーム呼び出す
    public int process()
    {
        // ダウンロードに失敗した場合
        if (downloadErrorFuture.hasException())
        {
            return RESULT_DOWNLOAD_ERROR;
        }
        // メモリースティックへの書き込みに失敗した場合
        if (writeErrorFuture.hasException())
        {
            return RESULT_WRITING_ERROR;
        }
        // 正常に全てのデータが書き込みできた
        if (writeThread.writeCompleted())
        {
            return RESULT_DOWNLOAD_COMPLETED;
        }
        return RESULT_DOWNLOADING;
    }
    
    // 破棄時に必ず呼び出すこと
    public void dispose()
    {
        downloadThread.dispose();
        writeThread.dispose();
    }
    
    // 他、外部とのインターフェース部分を作る
    ...
}