Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

もしもラムダの中で例外が発生したら(後編)

こんにちは。アキバです。

ゴールデンウィークですね!
皆さんいかがお過ごしですか?

今年は間に平日が多めなので、大型連休!というよりは2回連休があるというイメージの方が強いかもしれません。
cero-tの奥さんは11連休だとか

f:id:acro-engineer:20140311021635j:plain


さて、前回に続いて、ParallelStreamで動かしているラムダ内で、例外が発生した場合の挙動について調べていきます。


まずは、軽くおさらいします。

以下のようなコードを書きました。

try {
    List<String> strArray = Arrays.asList("abc", "def", "xxx", "ghi", "jkl", "xxx", "pqr", "stu");
    strArray.parallelStream().forEach(s -> {
        System.out.println("ラムダ開始: id=" + Thread.currentThread().getId());
        try {
            Thread.sleep(100L);
            if (s.equals("xxx")) throw new RuntimeException("ラムダ内で例外: id=" + Thread.currentThread().getId());
        } catch (RuntimeException ex) {
            System.out.println("ラムダ内で例外発生: id=" + Thread.currentThread().getId());
            throw ex;
        } catch (InterruptedException e) {
            e.printStackTrace(System.out);
        }
        System.out.println("ラムダ終了: id=" + Thread.currentThread().getId());
    });
} catch (Exception th) {
    System.out.println("外側で例外をcatch");
    th.printStackTrace(System.out);
}


そうすると、こんな感じでいくつかのスレッドが終了しない時がありました。

ラムダ開始: id=1
ラムダ開始: id=14
ラムダ開始: id=15
ラムダ開始: id=13
ラムダ開始: id=16
ラムダ開始: id=17
ラムダ開始: id=12
ラムダ開始: id=18
ラムダ終了: id=16
ラムダ終了: id=15
ラムダ内で例外発生: id=13
ラムダ内で例外発生: id=1
ラムダ終了: id=14
ラムダ終了: id=17
ラムダ終了: id=12
外側で例外をcatch
java.lang.RuntimeException: ラムダ内で例外: id=13

それから、なぜかラムダ内では2回例外が発生するようにしているのに、ラムダの外側では1つしかcatchできていません

どうしてなんでしょうか?
というお話でした。

1. 例外のスタックトレースをのぞいてみる

さて、ラムダの中でcatchした例外(2つ)と、ラムダの外側でcatchできた例外(1つ)の違いを見てみましょう。

以下のようにコードを修正します。

try {
    List<String> strArray = Arrays.asList("abc", "def", "xxx", "ghi", "jkl", "xxx", "pqr", "stu");
    strArray.parallelStream().forEach(s -> {
        System.out.println("ラムダ開始: id=" + Thread.currentThread().getId());
        try {
            Thread.sleep(100L);
            if (s.equals("xxx")) throw new RuntimeException("ラムダ内で例外: id=" + Thread.currentThread().getId() + ", s=" + s);
        } catch (RuntimeException ex) {
            System.out.println("ラムダ内で例外発生: id=" + Thread.currentThread().getId() + ", s=" + s);
            // スタックトレースを出力させてみる(ラムダの中でcatchした例外)
            ex.printStackTrace(System.out);
            throw ex;
        } catch (InterruptedException e) {
            e.printStackTrace(System.out);
        }
        System.out.println("ラムダ終了: id=" + Thread.currentThread().getId() + ", s=" + s);
    });
} catch (Exception ex) {
    System.out.println("外側で例外をcatch");
    // スタックトレースを出力させてみる(ラムダの外でcatchした例外)
    ex.printStackTrace(System.out);
    System.out.println("ラムダの外でcatchした例外:ここまで");
}

これを動かすと、ラムダの中で2つ、ラムダの外で1つの例外がcatchできます。
そして、ラムダの外でcatchした例外は、ラムダの中で発生した例外のどちらかになるわけです。

実行した結果は、こうなりました。

ラムダ開始: id=1
ラムダ開始: id=14
ラムダ開始: id=15
ラムダ開始: id=16
ラムダ開始: id=13
ラムダ開始: id=12
ラムダ開始: id=17
ラムダ開始: id=18
ラムダ終了: id=12, s=stu
ラムダ終了: id=17, s=jkl
ラムダ終了: id=14, s=pqr
ラムダ終了: id=16, s=ghi
ラムダ終了: id=15, s=def
ラムダ終了: id=18, s=abc
ラムダ内で例外発生: id=1, s=xxx
ラムダ内で例外発生: id=13, s=xxx
java.lang.RuntimeException: ラムダ内で例外: id=13, s=xxx
	at StreamSample.lambda$parallelStreamExceptionSample2$2(StreamSample.java:121)
	at StreamSample$$Lambda$1/1555009629.accept(Unknown Source)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	・・・略・・・
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:902)
	at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1689)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1644)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
java.lang.RuntimeException: ラムダ内で例外: id=1, s=xxx
	at StreamSample.lambda$parallelStreamExceptionSample2$2(StreamSample.java:121)
	at StreamSample$$Lambda$1/1555009629.accept(Unknown Source)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	・・・略・・・
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:400)
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:728)
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
	at StreamSample.parallelStreamExceptionSample2(StreamSample.java:117)
	at StreamSample.main(StreamSample.java:58)
外側で例外をcatch
java.lang.RuntimeException: ラムダ内で例外: id=13, s=xxx
	at StreamSample.lambda$parallelStreamExceptionSample2$2(StreamSample.java:121)
	at StreamSample$$Lambda$1/1555009629.accept(Unknown Source)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	・・・略・・・
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:902)
	at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1689)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1644)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
ラムダの外でcatchした例外:ここまで
finished.


上記の例では、Parallelで動かした処理の1つはアプリケーションのmainスレッドでした。
もう一つのスレッドは、ForkJoinTaskのワーカースレッドとして起動しています。

そうです。ParallelStreamは、Fork/Join Frameworkで動いているんです。

Fork/Join Framework、というかForkJoinTaskの実行は、1つのmainスレッドと複数のワーカースレッドで行われます。
この辺の仕様はForkJoinTaskクラスのJavadocにも書かれているので、なにやら小難しいですが読んでみると参考になるかもしれません。

上記の例ではmainスレッドで実行中のTaskで例外が発生しましたが、当然、ワーカースレッドでだけ例外が発生する場合もあります。
(むしろ、その方が普通かもしれませんね)

実際に、上記のコードで"xxx"の位置を変えてやると、例外が発生するスレッドが変わります。

つまり、例外が発生したスレッドがメインかワーカーかに依存してはいけないということになります。


Fork/Joinの実行は、mainスレッドが各Taskの終了状態を見ていて、いずれかのスレッドで例外が発生すると、外側に再スロー(rethrow)する仕組みになっています。
なので、最初に発生した例外だけがラムダの外側でcatchできるということになるのです。

2. 例外が発生したワーカースレッドは、いつ終了するのか?

ラムダ内で例外が発生して外側でcatchした後も、他のワーカースレッドは動き続けています

例外が発生した場合、mainスレッドとしてはラムダの実行は終わっているのですが、それをワーカースレッドが知ることはできないわけですね。
この動作は、例外が発生するスレッドがmainかワーカーかによらず、同じになります。

3. "ラムダ終了"が出なかったワケ

ForkJoinTaskで動くワーカースレッドは、daemonスレッドです。
要は、daemonスレッド以外のスレッドが全て終了すると、プロセスが終了になるということです。


前回、"ラムダ終了"が出なかったスレッドがあると書いたのは、この辺にカラクリがあります。

ForkJoinで実行中のタスクの1つで例外が発生する

mainスレッドが外側に例外を再スローする

mainスレッドが先に終了する

他のForkJoinスレッドはdaemonスレッドで動いているため、実行中でもJavaVMが終了

メッセージ出ない(><

となるわけです。

4. まとめると

これまでの内容をまとめると、ラムダ内で例外が発生した場合の動作は以下の3点になるかと思います。

  • 1. 最初に発生した例外だけ、ラムダの外側でcatchすることが出来ます
  • 2. ラムダ内で例外をスローすると、全スレッドの終了を待つことはできません
  • 3. ワーカーはdaemonスレッドなので、mainスレッドが終了すると、途中でもワーカーの処理は終了します


上の動作から注意すべきなのは、
他のスレッドはラムダの例外終了を検知できないので、異常時に全ロールバックみたいな処理をParallelStreamでやってはいけない
ということでしょうか。

Webアプリケーションなど、処理が終了してもプロセスが終了しないようなサーバーでは、
どちらかというと、ラムダ内で例外は発生させないか、発生しても外側に投げない仕組みにした方がよいでしょう。


いかがでしたか?

また面白そうなネタがあれば調べてみます。


それでは!

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ