こんばんは、さかもとです。
今回は、JDK7で追加されたconcurrentユーティリティを見てみます。
その名は「Phaser」。
一言でいえば、Java5から導入されたCyclicBarrierに機能を追加したクラス。
フェーズという概念が組み込まれており、
何回処理したら終了するか、などの条件を付けられるようになっている。
public class PhaserTest { private static final int ITERATIONS = 3; private Phaser phaser_ = new Phaser(1) { protected boolean onAdvance(int phase, int registeredParties) { // runTasks()を3回実行するか、処理するスレッドがなくなった場合は // trueを返して終了する return (phase >= ITERATIONS || registeredParties == 0); } }; public void process() { Queue<Runnable> tasks = new ConcurrentLinkedQueue<>(); do { // tasksにデータ処理タスクを追加する // データ処理タスクをまとめて実行する runTasks(tasks); } while (this.phaser_.isTerminated() == false); } /** * キューに蓄積されているタスクをすべて一斉に処理する。 * * @param tasks タスク */ public void runTasks(Queue<Runnable> tasks) { Runnable task; while ((task = tasks.poll()) != null) { final Runnable taskAwait = task; // 今後到着するであろうスレッドを事前に登録する this.phaser_.register(); Thread thread = new Thread() { public void run() { // ここで何らかの処理を行う // 到着したことを通知し、 // register()された他のスレッドが到着するのを待つ phaser_.arriveAndAwaitAdvance(); // arriveAndDeregister()が呼ばれたら、 // 複数スレッドが一斉に解放され、データ処理タスクが実行される taskAwait.run(); } }; thread.start(); } // arriveAndAwaitAdvance()で待っているスレッドを一斉実行する this.phaser_.arriveAndDeregister(); } }
上記のサンプルでは、
PhaserTest#process() を呼び出すと、
tasksキューに入っているタスクをまとめて実行することを、
3フェーズ分実行します。
流れてくるデータを、あるタイミングで同時処理を行いたいときに使用できそうです。
#具体的に思いつかないですが。。
では。