Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Java SE 7(JDK7)の見どころをまとめてみました!(その3)

こんばんは、さかもとです。


今回は、JDK7で追加されたconcurrentユーティリティを見てみます。

その名は「Phaser」。
一言でいえば、Java5から導入されたCyclicBarrierに機能を追加したクラス。

フェーズという概念が組み込まれており、
何回処理したら終了するか、などの条件を付けられるようになっている。

public class PhaserTest
{
    private static final int ITERATIONS = 3;

    private Phaser phaser_ = new Phaser(1) {
        protected boolean onAdvance(int phase, int registeredParties)
        {
            // runTasks()を3回実行するか、処理するスレッドがなくなった場合は
            // trueを返して終了する
            return (phase >= ITERATIONS || registeredParties == 0);
        }
    };

    public void process()
    {
        Queue<Runnable> tasks = new ConcurrentLinkedQueue<>();
        do
        {
            // tasksにデータ処理タスクを追加する

            // データ処理タスクをまとめて実行する
            runTasks(tasks);
        }
        while (this.phaser_.isTerminated() == false);
    }

    /**
     * キューに蓄積されているタスクをすべて一斉に処理する。
     *
     * @param tasks タスク
     */
    public void runTasks(Queue<Runnable> tasks)
    {
        Runnable task;
        while ((task = tasks.poll()) != null)
        {
            final Runnable taskAwait = task;

            // 今後到着するであろうスレッドを事前に登録する
            this.phaser_.register();

            Thread thread = new Thread() {
                public void run()
                {
                    // ここで何らかの処理を行う

                    // 到着したことを通知し、
                    // register()された他のスレッドが到着するのを待つ
                    phaser_.arriveAndAwaitAdvance();

                    // arriveAndDeregister()が呼ばれたら、
                    // 複数スレッドが一斉に解放され、データ処理タスクが実行される
                    taskAwait.run();
                }
            };
            thread.start();
        }

        // arriveAndAwaitAdvance()で待っているスレッドを一斉実行する
        this.phaser_.arriveAndDeregister();
    }
}


上記のサンプルでは、
PhaserTest#process() を呼び出すと、
tasksキューに入っているタスクをまとめて実行することを、
3フェーズ分実行します。

流れてくるデータを、あるタイミングで同時処理を行いたいときに使用できそうです。
#具体的に思いつかないですが。。


では。