Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

MQTT Meetup Tokyo 2014.08に参加してきました!

id:KenichiroMurata(@ )です。

皆さんはMQTTに興味はありますか?私は昨年末くらいからMQTTに興味を持ち、そこからあれこれ調べています。

また社内でも、Stormのエコシステムとして開発しているAcroMUASHI Streamの開発チームが、MQTTと連係した分散ストリーム処理を実現しており、私の周辺では盛り上がっています。

そんな中、先日 MQTT Meetup Tokyo 2014.08が開催され、幸運にも参加することができましたので、今回はその参加レポートをお届けします。

イベントの概要はこちらをご覧下さい。イベントは非常に内容の濃いものであり、面白かったです。
少しでも何かお手伝いできることがあればとイベント当日のtweettogetterにまとめましたので、当日の雰囲気や反応をご覧になりたい方はMQTT Meetup Tokyo 2014.08 まとめも合わせてご覧下さい。

1. MQTTの機能概要 - ツキノワ 若山さん(@r_rudi)

MQTTについてのまとめ — そこはかとなく書くよん。を書かれた方による発表。MQTTの概要についての説明であり、特にWill、Retain、CleanSessionについての説明は理解を復習する意味でも非常に分かりやすかったです。Retainの活用方法の例として、Subscribeしてくるデバイスに最新の設定情報をRetainを使って渡すという話は、なるほどでした。また、Microservicesについても少し触れられていました。

2. IBM MessageSight - IBM 鈴木徹さん

IBM MessageSightはMQTTのアプライアンスサーバーで、なんと100万デバイスを常時接続可能、毎秒1500万メッセージを処理できるとのこと。100万のデバイスを60秒以内で全て接続できるとか、毎秒1万メッセージを平均85マイクロ秒でさばけるとか、サーバの冗長化も当然可能という凄いものでした。お値段は怖くて聞けないレベルです。

プレゼンをされた鈴木さんは、普段はMQTTがどれだけ良いかを説明するのにほとんどの時間を使うのに対して、今日は参加者が皆MQTTのことは良いモノと知っているので、その説明がいらないのが最高!とあれこれネタを仕込んだマシンガントークを展開し、非常に面白かったです。

話の焦点はMQTTうんぬんではなく、IoTをどのようにしてビジネスにつなげて利益を生み出すのか?という内容になり、車内センサーで運転手の状態を常時接続で情報収集し、その情報を保険会社が活用するといった事例紹介もありました。

あとは、IBMさんでは、MQTT/IoTで収集したデータをどうやって活用するのか、既存の様々なシステムと結合できるように仕組み化している点についても説明があり、ここでもMicroservicesを連想させる「3年後に取り替え可能な部品によるシステムを作る。疎結合に」なんて話が聞けました。

3. IoT/M2M Hot Topics in IETF - レピダム 林さん(@lef)

CoAPとその周辺に関する話題について。CoAPについては私は初めて知りました。HTTP/RESTと親和性のあるIoT/M2M用の軽量プロトコルであり、UDPとのこと。MQTTがTCPで常時接続であるのに対して、UDPを利用するMQTT-SNと対応付く関係にあるものだと理解しました。

CoAPそのものというよりもIETFの仕様策定の裏側(仕様に関するユースケースを話してしまうと、ビジネス上問題なので、ユースケースの話になると大人の事情で議論が進まなくなる)みたいな話がきけて面白かったです。

4. 時雨堂 MQTT ブローカー (AKANE) - @voluntas さん

本イベントの主催者であり、イベント前日にMQTT as a Seriviceであるsangoを発表した時雨堂の@voluntasさんによるsangoの中で使われている MQTT ブローカー (AKANE) の開発苦労談。

仕様としてはシンプルなMQTTですが、MQTT Brokerを本気で開発しようとすると、どのような問題があって、苦労するのかという話で非常に面白かったです。特に、仕様には含まれていないが、現実的にシステムとして運用できるようにするために実装した機能についてはとても参考になりました。

仕事上、数万、数十万の監視対象ノードを集中監視制御するシステムを開発してきたので、リトライ、ノード別の状態遷移制御、時系列制御、優先キュー、一斉接続などなど、それに類する話題が基本なのでイメージしやすかったし、共感できました。MQTTでは、さばくクライアント数が多いのは当然として、トピックのSubscribeの指定方法によっては1クライアントに配信するメッセージが爆発的に増えるし、QoSダウングレードはさらに複雑になるので、その実装はたしかにつらそうです。なぜに世のMQTT Brokerの実装が(私が観測する範囲では)どれも途中に見えるのは、このような背景があるからなんですね。

5. MQTT+RaspberryPi+Arduino+センサーで制御とGUIを実装した話 - 小松電機産業 廣江さん (@hiroe_orz17)

RaspberryPi+Arduino、オムロンPLC、Intel Galileo でMQTTしてみた話 - ごろねこ日記を書かれた方による発表。具体的にアプリケーションを作る上で、どのように考えて、何を試して、結果どう実現したかが具体的に分かる内容でした。MQTTのペイロード部をどのようにするのか、3G環境下で通信量を削減/効率化するために配列やバイト列を使った表現やMessagePackの利用など、試行錯誤も含めての説明だったので、イメージが沸きました。

おまけ sangoを試してみる

githubアカウントで簡単にsangoを使い始められるということで、イベント終了後に早速アカウントを作成し、試してみました。mosquittoを使ってCLIで試すのは直ぐにできたので、以前から気になっていたMQTTInspectorというiOSアプリ(有料)を使って試してみました。

アプリの使い方そのものに慣れるのに手探り状態でしたが、以下のように簡単に動作させることができました。ビューによっては通信状態が見えるのがよいですね。

メッセージを送受信している例


通信ログ

最後に

MQTT Meetup Tokyo 2014.08 はMQTTの概要から始まって、仕様の話、ビジネス活用の話、Broker実装の裏話、クライアントアプリ開発の話と幅広い内容で、しかもそれぞれが濃い話でした。このようなイベントを企画してくださり、どうもありがとうございました!

Acroquest Technologyでは、キャリア採用を行っています。

  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Storm-0.9.2の新機能/修正点を紹介します

こんにちは。kimukimuです。
f:id:acro-engineer:20140709095247p:plain

Stormインストーラ更新のお知らせ

まず最初にお知らせを。
読者の皆さんにも愛用頂いている(?)Stormのインストーラですが、
Storm-0.9.2対応バージョンをアップしました。

下記のページからダウンロードできますので、
Stormを簡単にインストールしたい方、Stormを試してみたいと考えている方は是非使ってみてください。

acromusashi/storm-installer · GitHub

インストーラのアナウンスと併せて、Storm-0.9.2の新機能/修正点を紹介しますね。

Storm-0.9.2の主な新機能/修正点

Storm-0.9.2の新機能/修正点として、主なものは下記の4点があります。

  1. Storm UIにTopology可視化ツールの追加
  2. Storm-Kafkaの更新/媒体への取り込み
  3. Storm-Starterの媒体への取り込み
  4. Storm-UIにREST APIが追加

Storm UIにTopology可視化ツールの追加

Storm-0.9.2の目玉機能が、Topology可視化ツールです。
Spout/Bolt間でどれだけのメッセージが流れているかの概要を即見ることができます。
加えて、Topology可視化ツール自体はStorm-UIに内蔵され、即使うことができます。

使い方は、Storm-UIのTopology Summaryのページから
「Topology Visualization」のボタンを押すだけです。
f:id:acro-engineer:20140713002118j:plain

すると、下記のようにSpout/Bolt間を流れるメッセージの数を表示するグラフが表示されます。
f:id:acro-engineer:20140713002219j:plain
かつ、このグラフはStormが複数のStreamを含む場合、
どのStreamをグラフ中に表示するか絞り込みも可能です。
f:id:acro-engineer:20140713002343j:plain
ただ、Stormのメッセージ失敗検知機構を使用しておらず、Streamの分割も行っていないTopologyの場合、
0件のStreamが表示されるだけなので、Stream絞り込みにはあまり意味はないのですが^^;
f:id:acro-engineer:20140713002530j:plain

尚、グラフに表示されるメッセージ数は「過去10分間」で固定のようです。

Storm-Kafkaの更新/媒体への取り込み

Stormとよくペアで使用されるOSSとしてApache Kafkaがあります。
Kafkaは一言で言うと「PubSub型のメッセージバス」で、Stormが処理するためのデータを保持しておき、
StormがKafkaから取得して処理する、という関係になっています。

StormにはStorm-KafkaというKafkaと結合するためのコンポーネントが存在したのですが、
Kafkaのバージョンアップに追従しておらず、最新版(0.8系)のKafkaには使用できない状態になっていました。
Storm-0.9.2では最新版に合わせて更新され、改めて使用できるようになりました。

また、Storm-KafkaはStormのダウンロード媒体にも含まれるようになりました。
同じApacheプロジェクトだから、というのもあるとは思いますが、StormとKafkaの連携というのは有力な選択肢になりそうですね。

Storm-Starterの媒体への取り込み

Storm-Kafkaと併せて、Storm-StarterもStormのダウンロード媒体に取り込まれました。

これまではStormクラスタを構築後、自分で何か簡単なTopologyを作成するか、
または別個Storm-Starterからソースをダウンロード後、最新版のStormに合わせて更新して使用する必要がありました。
#実はStorm-StarterにはStormの最新版を適用するとコンパイルエラーが発生する時期がありました^^;

ですが、Storm-0.9.2からはStormの媒体に常に最新のStormに合わせたStorm-Starterが用意されるようになりました。
よりStormを簡単に試すことが出来るようになったと言えると思います。

Storm-UIにREST APIが追加

Topology可視化ツールとも関連するのですが、
Storm-UIに下記のことが可能なREST APIが追加されました。

  • Stormクラスタの情報取得
  • StormSupervisorの情報取得
  • StormTopologyの情報取得
  • Topology中の各コンポーネントの情報取得
  • Topologyの停止/再開/リバランス/終了

それまでは画面から操作するか、Storm-Nimbusに対してThriftでメッセージを飛ばすしかなかったため、
REST APIの追加によってStormクラスタの管理を別プロセスから行う・・ということが行いやすくなりました。

その他のStorm-0.9.2の新機能/修正点

これまでで主要な更新点を説明してきましたが、
その他にも下記のようにStorm-0.9.2には多くの修正点があります。
Storm-0.9.0系に更新する際に大きく構成が変更になっているため、
そこから発生した問題を一気に修正しているようです。

  • CPU数に応じたスケーラビリティ向上
  • Nettyによるメッセージ転送の改善
  • 冗長な性能メトリクス送信の除去
  • Storm-Kafkaの信頼性向上
  • ZooKeeperに対するHeartBeat負荷の低減
  • 言語間のやり取りを行う際のシリアライザをプラガブルに修正

Storm-0.9.2になって結局何が嬉しいか?

Storm-0.9.2はマイナーバージョンアップという位置づけのためか、
Stormのコア部分に機能が追加されるということはありませんでした。

ですが、Topology可視化ツールをはじめとした「使いやすくする」修正が行われています。

Topology可視化ツール自体が表示している情報自体はStorm-UIで元々表示していた情報です。
ですが、これまではStormに慣れた人間でないと正確にその情報を読み解けない、というのが今まででした。
可視化ツールの追加によってそれが誰であっても概要を読み取れるようになったのは大きいと思います。

そのため、Storm-0.9.2は「Stormをより使いやすくするバージョンアップ」と位置づけられると思います。
これを機に、これまで使ったことが無い方も是非Stormを使ってみてください。

それでは。

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Apache版Storm初回リリースの新機能の使い方

こんにちは。kimukimuです。
f:id:acro-engineer:20140709095247p:plain

夏なのか梅雨なのか微妙な気候になっているような感覚を覚える今日この頃です。
いきなり暑くなってきているので、バテないよう気をつける必要がありますね。

さて、前回Apache版Stormの新機能の概要について紹介しましたが、
今回は実際に新機能がどういう風に使えるのか、について確認してみようと思います。

尚、Storm-0.9.2-incubatingもリリースされていますが、それは次回に回すとして、
今回はStorm-0.9.1-incubatingの新機能です。

・・・といっても、ビルドツールの変更などは確認してもあまり嬉しいことはないため、
下記の2つの機能に絞って確認を行ってみることにします。

  1. Storm-UIの各項目にツールチップで解説を表示
  2. NimbusにTopologyをSubmitする際、設定に対するバリデーションが追記

1.Storm-UIの各項目にツールチップで解説を表示

Storm-UIで表示される各種項目に対してマウスオーバーした際にツールチップで解説が表示されるようになった」という新機能です。

Storm-UIを使ってさえいれば常時有効となります。
これは実際にStorm-UIで見てみた方が早いため、実際どんな内容が表示されるかを見てみましょう。

まずはTop画面のバージョンから。こういった形で項目にマウスオーバーすることで項目の説明が表示されます。
f:id:acro-engineer:20140519072554j:plain
同じように、Topology Summaryの画面でも下記のような項目の説明が表示されます。
Capasityをはじめとした「生データから算出される項目」についてはどのように値が算出されているかも記述されています。
f:id:acro-engineer:20140519072902j:plain
f:id:acro-engineer:20140519072906j:plain
Executor SummaryにおいてもHostNameの算出の方法について記述されるなど、かゆい所にも手が届きます。
f:id:acro-engineer:20140519072909j:plain

Storm-UIは基本的にStormがZooKeeper上に保持している性能情報を
Nimbusから取得して表にしているだけのため、パっと見はわかりにくい画面なのですが、
今回各項目に解説が表示されるようになったため、使いやすくなったとは思います。

2.NimbusにTopologyをSubmitする際、設定に対するバリデーションが追記

では、次は「TopologyをSubmitする際、設定に対する定型的なバリデーションを行えるようになった」についてです。

これは内容としては、TopologySubmit時に型として不正な値が設定されている設定値を検出してバリデーションを行うものです。
バリデーション対象となるのはStorm自体が動作するために必要な設定値です。
Topology固有の設定値についてはこれまでと同じくTopologyValidatorを自前で作成し、チェックを行う必要があります。

では、実際にどういう場面で使われるのかを確認してみましょう。

今回ベースとするのはincubator-storm/examples/storm-starter at master · apache/incubator-storm · GitHubです。
この中で一番単純なExclamationTopologyを例にとります。
ExclamationTopologyをStormクラスタにSubmitする際のConfigオブジェクトに
ZooKeeperのポート設定を文字列("Test")として詰めて起動してみます。
当然ながら、ポート設定のため本来数値で設定されている必要があります。

  • ExclamationTopology
  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("word", new TestWordSpout(), 10);
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

    Config conf = new Config();
    conf.put(Config.STORM_ZOOKEEPER_PORT, "Test"); // 本来数値でないと動作しない設定に文字列を設定

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }

この状態でStormクラスタにSubmitを行うと・・?

# bin/storm jar storm-starter-0.9.1-incubating-jar-with-dependencies.jar storm.starter.ExclamationTopology ExclamationTopology-3
(省略)
354  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
359  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar storm-starter-0.9.1-incubating-jar-with-dependencies.jar to assigned location: /opt/storm/nimbus/inbox/stormjar-06306ca5-a1d6-4991-a47a-98b87126186b.jar
409  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/storm/nimbus/inbox/stormjar-06306ca5-a1d6-4991-a47a-98b87126186b.jar
409  [main] INFO  backtype.storm.StormSubmitter - Submitting topology ExclamationTopology-3 in distributed mode with conf {"topology.workers":3,"storm.zookeeper.port":"Test"}
415  [main] WARN  backtype.storm.StormSubmitter - Topology submission exception: field storm.zookeeper.port 'Test' must be a 'java.lang.Number'
Exception in thread "main" InvalidTopologyException(msg:field storm.zookeeper.port 'Test' must be a 'java.lang.Number')
        at backtype.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:2466)
        at org.apache.thrift7.TServiceClient.receiveBase(TServiceClient.java:78)
        at backtype.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:162)
        at backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:146)
        at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:98)
        at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:58)
        at storm.starter.ExclamationTopology.main(ExclamationTopology.java:76)

このように、「storm.zookeeper.port」が"Test"という設定になっており、NumberではないからSubmit出来ない、
クラスタに投入する前にはじくことができました。

これは今までだと設定が誤っていることに気付かずにStormクラスタにSubmitしてしまい、
クラスタで起動する際にWorkerプロセスが起動して死ぬを繰り返す・・・という厄介な状態に陥っていました。

特に、実際に使っている方だと
JSONYamlといったファイルに設定値を外だしして読み込ませる方も多いと思いますが、
設定ファイルに記述していた内容が誤っていた場合に予め検出してくれるのでかなり便利に使えると思います。

3.Apache版Stormの初回リリース機能についてのまとめ

  1. Storm-UIに解説が加わり、各画面の項目の意味がわかりやすくなりました。
  2. TopologySubmit時に明らかに誤った設定は事前にはじけるようになりました。

特に新しい機能が追加された・・・というわけではないのですが、使いやすさが確実に増すバージョンアップだったと思います。

尚、別の投稿で紹介しますが、Storm-0.9.2-incubatingでリリースした内容は
使いやすくなる、ではなく実際に使える機能が追加されたリリースになっています。
Apacheに移り、今後も期待していけるStormになった、と言えるでしょう。

それでは。

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Spring BootによるWebアプリお手軽構築

こんにちは、阪本です。


Springのメジャーバージョンアップに伴い、Spring Bootも晴れて1.0となりました。
Spring Bootは、Spring周りの依存関係をシンプルに解決してくれるフレームワークですが、今流行りの(流行る予定の?)Dropwizardを意識した作りになっています。

どれだけシンプルにできるのか、見てみようと思います。

超シンプルなWebアプリの作成

では早速、Webアプリを作ってみましょう。
手始めに、サーバにアクセスすると固定文字列を返す(いわゆるHello World的な)アプリを作ってみます。


まず、下ごしらえとして、Mavenのpom.xmlを作成します。

<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>springboot</groupId>
  <artifactId>sample</artifactId>
  <packaging>jar</packaging>
  <version>1.0.0</version>
  <name>SpringBootSample</name>

  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.0.2.RELEASE</version>
  </parent>

  <dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-maven-plugin</artifactId>
      </plugin>
    </plugins>
  </build>
</project>

なんと、dependencyは「spring-boot-starter-web」を指定するのみ!
ちなみに、packagingが「jar」になっていることに注意。

・・・そう、単体のアプリケーションで動作するんです。
(ちなみに、warファイルを作成することも可能。)


次は、コントローラを用意します。特に何の変哲もない、普通のコントローラですね。

package springboot.sample.controller;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.ResponseBody;

@Controller
public class SampleController {
    @RequestMapping("/home")
    @ResponseBody
    public String home() {
        return "Hello Spring Boot!!";
    }

}


最後に(もう最後!)メインクラスを用意します。

package springboot.sample;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.context.annotation.ComponentScan;

@ComponentScan
@EnableAutoConfiguration
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

}


あとは、maven packageして、jarを実行するだけ。

java -jar sample-1.0.0.jar 

jarの中に必要なライブラリが含まれているため、クラスパスの指定も不要!
このあたり、Dropwizardライクな感じですね。


コンソールにはこんな出力があり、約2秒で起動しました。
組み込みTomcatが起動しています。(ちなみにJettyにも変更可能です。)

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::        (v1.0.2.RELEASE)

2014-06-01 10:55:28.993  INFO 7740 --- [           main] springboot.sample.Application            : Starting Application on ...
2014-06-01 10:55:29.026  INFO 7740 --- [           main] ationConfigEmbeddedWebApplicationContext : Refreshing org.springframework.boot.context.embedded.AnnotationConfigEmbeddedWebApplicationContext@16022d9d: startup date [Tue Apr 25 01:55:29 JST 2014]; root of context hierarchy
2014-06-01 10:55:29.824  INFO 7740 --- [           main] .t.TomcatEmbeddedServletContainerFactory : Server initialized with port: 8080
2014-06-01 10:55:30.016  INFO 7740 --- [           main] o.apache.catalina.core.StandardService   : Starting service Tomcat
2014-06-01 10:55:30.016  INFO 7740 --- [           main] org.apache.catalina.core.StandardEngine  : Starting Servlet Engine: Apache Tomcat/7.0.52
2014-06-01 10:55:30.109  INFO 7740 --- [ost-startStop-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring embedded WebApplicationContext
2014-06-01 10:55:30.110  INFO 7740 --- [ost-startStop-1] o.s.web.context.ContextLoader            : Root WebApplicationContext: initialization completed in 1087 ms
2014-06-01 10:55:30.490  INFO 7740 --- [ost-startStop-1] o.s.b.c.e.ServletRegistrationBean        : Mapping servlet: 'dispatcherServlet' to [/]
2014-06-01 10:55:30.492  INFO 7740 --- [ost-startStop-1] o.s.b.c.embedded.FilterRegistrationBean  : Mapping filter: 'hiddenHttpMethodFilter' to: [/*]
2014-06-01 10:55:30.760  INFO 7740 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Mapped URL path [/**/favicon.ico] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2014-06-01 10:55:30.875  INFO 7740 --- [           main] s.w.s.m.m.a.RequestMappingHandlerMapping : Mapped "{[/],methods=[],params=[],headers=[],consumes=[],produces=[],custom=[]}" onto public java.lang.String springboot.sample.controller.SampleController.home()
2014-06-01 10:55:30.888  INFO 7740 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Mapped URL path [/webjars/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2014-06-01 10:55:30.889  INFO 7740 --- [           main] o.s.w.s.handler.SimpleUrlHandlerMapping  : Mapped URL path [/**] onto handler of type [class org.springframework.web.servlet.resource.ResourceHttpRequestHandler]
2014-06-01 10:55:30.981  INFO 7740 --- [           main] o.s.j.e.a.AnnotationMBeanExporter        : Registering beans for JMX exposure on startup
2014-06-01 10:55:30.999  INFO 7740 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 8080/http
2014-06-01 10:55:31.001  INFO 7740 --- [           main] springboot.sample.Application            : Started Application in 2.326 seconds (JVM running for 2.7)

そして、http://localhost:8080/home にアクセスすると、こんな感じに、文字が表示されました。

f:id:acro-engineer:20140424005318p:plain

web.xmlいらず、applicationContext.xmlいらずでWebアプリが立ち上がるなんて、超シンプルですね!

Thymeleafを用いた画面作成

先ほどは、URLにアクセスすると単に文字列を返すだけのものでした。
今度は、Thymeleafを用いて画面を作成します。

まず、pom.xmlにThymeleafを追加します。

<dependency>
  <groupId>org.thymeleaf</groupId>
  <artifactId>thymeleaf-spring4</artifactId>
</dependency>

次に、ThymeleafのHTMLテンプレートファイルを作成します。
HTMLテンプレートファイルはsrc/main/resources/templatesディレクトリの下に配置します。

hello.html

<!DOCTYPE html>
<html xmlns:th="http://www.thymeleaf.org">
<body>
  <h1>
    Hello Spring <span style="color: red">Boot!!</span>
  </h1>
</body>
</html>

最後に、上で作ったhello.htmlを表示するよう、Controllerを書き換えます。

package springboot.sample.controller;

import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
public class SampleController {
    @RequestMapping("/home")
    public String home() {
        return "hello";
    }

}

先ほどと同じく http://localhost:8080/home にアクセスすると、hello.htmlの内容が表示されました。

f:id:acro-engineer:20140424005711p:plain

監視&デバッグ

デバッグ時に、Springコンテキストに何が登録されているか、分かると便利ですよね?
Spring Bootでは、pom.xmlに以下を追加するだけで、Web画面にコンテキストの内容を表示できます。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

起動した後、http://localhost:8080/beans にアクセスすると、以下のようなJSONデータが表示されます。

[
  {
    "beans": [
      {
        "bean": "application", 
        "dependencies": [], 
        "resource": "null", 
        "scope": "singleton", 
        "type": "springboot.sample.Application"
      }, 
      {
        "bean": "sampleController", 
・・・

実際には、改行されずに出力されるので、整形する必要はあります。

コンテキスト以外にも、以下のような様々な情報をWebから取得できるようになります。

  1. 環境変数http://localhost:8080/env
  2. Controllerのマッピング状態(http://localhost:8080/mapping
  3. HTTPリクエストトレース(http://localhost:8080/trace
  4. アクセスカウンタ/メトリクス(http://localhost:8080/metrics
  5. スレッドダンプ(http://localhost:8080/dump

ここもDropwizard的ですね。

WebアプリにSSH接続

なんと!起動したWebアプリにSSHで接続してコマンドを実行することができます!!

pom.xmlに以下を追加して、ビルド&起動。

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-remote-shell</artifactId>
</dependency>

起動途中に、コンソールに以下のような内容が出力されます。これ(以下では662dc881-c2e3-4ad6-802e-73a36e4fc7e3)がデフォルトのログインパスワード。
(デフォルトのログインパスワードは、起動の度に変わります。)

2014-06-01 11:21:34.180  INFO 4464 --- [           main] roperties$SimpleAuthenticationProperties : 

Using default password for shell access: 662dc881-c2e3-4ad6-802e-73a36e4fc7e3

デフォルトのユーザ名はuser、SSH接続ポートは2000のため、この設定で接続してみると・・・

f:id:acro-engineer:20140424005923p:plain

ログインできました!

デフォルトでは、metrics、beans、autoconfig、endpointコマンドが使用できます。
もちろん、JavaやGroovyで自作のコマンドを定義することもできます。

例えば、src/main/resources/commandsディレクトリに、以下の内容でhello.groovyファイルを作成しておけば、
helloコマンドが実行できるようになります。

package commands

import org.crsh.cli.Usage
import org.crsh.cli.Command

class hello {

    @Usage("Say Hello")
    @Command
    def main(InvocationContext context) {
        return "Hello"
    }

}

f:id:acro-engineer:20140424005934p:plain

Webアプリのコントロールを行えるようなコマンドを、簡単に提供できそうですね。
ちなみに、このSSHの機能はCRaSHを用いて実現されています。

おわりに

ここで紹介した内容は、設定ファイルはMavenのみで、Springの設定ファイルを何一つ作成していません。

  • Springのライブラリ依存や設定ファイルに悩まされず手軽にWebアプリを構築したい!
  • Webアプリの管理もRESTやCLIでできるようにしたい!
  • Dropwizardのような「ポータブルなWebアプリケーション」を作成したい!

という要望に、Spring Bootは応えてくれそうですね。

では。


Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

もしもラムダの中で例外が発生したら(後編)

こんにちは。アキバです。

ゴールデンウィークですね!
皆さんいかがお過ごしですか?

今年は間に平日が多めなので、大型連休!というよりは2回連休があるというイメージの方が強いかもしれません。
cero-tの奥さんは11連休だとか

f:id:acro-engineer:20140311021635j:plain


さて、前回に続いて、ParallelStreamで動かしているラムダ内で、例外が発生した場合の挙動について調べていきます。


まずは、軽くおさらいします。

以下のようなコードを書きました。

try {
    List<String> strArray = Arrays.asList("abc", "def", "xxx", "ghi", "jkl", "xxx", "pqr", "stu");
    strArray.parallelStream().forEach(s -> {
        System.out.println("ラムダ開始: id=" + Thread.currentThread().getId());
        try {
            Thread.sleep(100L);
            if (s.equals("xxx")) throw new RuntimeException("ラムダ内で例外: id=" + Thread.currentThread().getId());
        } catch (RuntimeException ex) {
            System.out.println("ラムダ内で例外発生: id=" + Thread.currentThread().getId());
            throw ex;
        } catch (InterruptedException e) {
            e.printStackTrace(System.out);
        }
        System.out.println("ラムダ終了: id=" + Thread.currentThread().getId());
    });
} catch (Exception th) {
    System.out.println("外側で例外をcatch");
    th.printStackTrace(System.out);
}


そうすると、こんな感じでいくつかのスレッドが終了しない時がありました。

ラムダ開始: id=1
ラムダ開始: id=14
ラムダ開始: id=15
ラムダ開始: id=13
ラムダ開始: id=16
ラムダ開始: id=17
ラムダ開始: id=12
ラムダ開始: id=18
ラムダ終了: id=16
ラムダ終了: id=15
ラムダ内で例外発生: id=13
ラムダ内で例外発生: id=1
ラムダ終了: id=14
ラムダ終了: id=17
ラムダ終了: id=12
外側で例外をcatch
java.lang.RuntimeException: ラムダ内で例外: id=13

それから、なぜかラムダ内では2回例外が発生するようにしているのに、ラムダの外側では1つしかcatchできていません

どうしてなんでしょうか?
というお話でした。

1. 例外のスタックトレースをのぞいてみる

さて、ラムダの中でcatchした例外(2つ)と、ラムダの外側でcatchできた例外(1つ)の違いを見てみましょう。

以下のようにコードを修正します。

try {
    List<String> strArray = Arrays.asList("abc", "def", "xxx", "ghi", "jkl", "xxx", "pqr", "stu");
    strArray.parallelStream().forEach(s -> {
        System.out.println("ラムダ開始: id=" + Thread.currentThread().getId());
        try {
            Thread.sleep(100L);
            if (s.equals("xxx")) throw new RuntimeException("ラムダ内で例外: id=" + Thread.currentThread().getId() + ", s=" + s);
        } catch (RuntimeException ex) {
            System.out.println("ラムダ内で例外発生: id=" + Thread.currentThread().getId() + ", s=" + s);
            // スタックトレースを出力させてみる(ラムダの中でcatchした例外)
            ex.printStackTrace(System.out);
            throw ex;
        } catch (InterruptedException e) {
            e.printStackTrace(System.out);
        }
        System.out.println("ラムダ終了: id=" + Thread.currentThread().getId() + ", s=" + s);
    });
} catch (Exception ex) {
    System.out.println("外側で例外をcatch");
    // スタックトレースを出力させてみる(ラムダの外でcatchした例外)
    ex.printStackTrace(System.out);
    System.out.println("ラムダの外でcatchした例外:ここまで");
}

これを動かすと、ラムダの中で2つ、ラムダの外で1つの例外がcatchできます。
そして、ラムダの外でcatchした例外は、ラムダの中で発生した例外のどちらかになるわけです。

実行した結果は、こうなりました。

ラムダ開始: id=1
ラムダ開始: id=14
ラムダ開始: id=15
ラムダ開始: id=16
ラムダ開始: id=13
ラムダ開始: id=12
ラムダ開始: id=17
ラムダ開始: id=18
ラムダ終了: id=12, s=stu
ラムダ終了: id=17, s=jkl
ラムダ終了: id=14, s=pqr
ラムダ終了: id=16, s=ghi
ラムダ終了: id=15, s=def
ラムダ終了: id=18, s=abc
ラムダ内で例外発生: id=1, s=xxx
ラムダ内で例外発生: id=13, s=xxx
java.lang.RuntimeException: ラムダ内で例外: id=13, s=xxx
	at StreamSample.lambda$parallelStreamExceptionSample2$2(StreamSample.java:121)
	at StreamSample$$Lambda$1/1555009629.accept(Unknown Source)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	・・・略・・・
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:902)
	at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1689)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1644)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
java.lang.RuntimeException: ラムダ内で例外: id=1, s=xxx
	at StreamSample.lambda$parallelStreamExceptionSample2$2(StreamSample.java:121)
	at StreamSample$$Lambda$1/1555009629.accept(Unknown Source)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	・・・略・・・
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:400)
	at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:728)
	at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
	at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
	at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
	at StreamSample.parallelStreamExceptionSample2(StreamSample.java:117)
	at StreamSample.main(StreamSample.java:58)
外側で例外をcatch
java.lang.RuntimeException: ラムダ内で例外: id=13, s=xxx
	at StreamSample.lambda$parallelStreamExceptionSample2$2(StreamSample.java:121)
	at StreamSample$$Lambda$1/1555009629.accept(Unknown Source)
	at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
	・・・略・・・
	at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
	at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:902)
	at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1689)
	at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1644)
	at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
ラムダの外でcatchした例外:ここまで
finished.


上記の例では、Parallelで動かした処理の1つはアプリケーションのmainスレッドでした。
もう一つのスレッドは、ForkJoinTaskのワーカースレッドとして起動しています。

そうです。ParallelStreamは、Fork/Join Frameworkで動いているんです。

Fork/Join Framework、というかForkJoinTaskの実行は、1つのmainスレッドと複数のワーカースレッドで行われます。
この辺の仕様はForkJoinTaskクラスのJavadocにも書かれているので、なにやら小難しいですが読んでみると参考になるかもしれません。

上記の例ではmainスレッドで実行中のTaskで例外が発生しましたが、当然、ワーカースレッドでだけ例外が発生する場合もあります。
(むしろ、その方が普通かもしれませんね)

実際に、上記のコードで"xxx"の位置を変えてやると、例外が発生するスレッドが変わります。

つまり、例外が発生したスレッドがメインかワーカーかに依存してはいけないということになります。


Fork/Joinの実行は、mainスレッドが各Taskの終了状態を見ていて、いずれかのスレッドで例外が発生すると、外側に再スロー(rethrow)する仕組みになっています。
なので、最初に発生した例外だけがラムダの外側でcatchできるということになるのです。

2. 例外が発生したワーカースレッドは、いつ終了するのか?

ラムダ内で例外が発生して外側でcatchした後も、他のワーカースレッドは動き続けています

例外が発生した場合、mainスレッドとしてはラムダの実行は終わっているのですが、それをワーカースレッドが知ることはできないわけですね。
この動作は、例外が発生するスレッドがmainかワーカーかによらず、同じになります。

3. "ラムダ終了"が出なかったワケ

ForkJoinTaskで動くワーカースレッドは、daemonスレッドです。
要は、daemonスレッド以外のスレッドが全て終了すると、プロセスが終了になるということです。


前回、"ラムダ終了"が出なかったスレッドがあると書いたのは、この辺にカラクリがあります。

ForkJoinで実行中のタスクの1つで例外が発生する

mainスレッドが外側に例外を再スローする

mainスレッドが先に終了する

他のForkJoinスレッドはdaemonスレッドで動いているため、実行中でもJavaVMが終了

メッセージ出ない(><

となるわけです。

4. まとめると

これまでの内容をまとめると、ラムダ内で例外が発生した場合の動作は以下の3点になるかと思います。

  • 1. 最初に発生した例外だけ、ラムダの外側でcatchすることが出来ます
  • 2. ラムダ内で例外をスローすると、全スレッドの終了を待つことはできません
  • 3. ワーカーはdaemonスレッドなので、mainスレッドが終了すると、途中でもワーカーの処理は終了します


上の動作から注意すべきなのは、
他のスレッドはラムダの例外終了を検知できないので、異常時に全ロールバックみたいな処理をParallelStreamでやってはいけない
ということでしょうか。

Webアプリケーションなど、処理が終了してもプロセスが終了しないようなサーバーでは、
どちらかというと、ラムダ内で例外は発生させないか、発生しても外側に投げない仕組みにした方がよいでしょう。


いかがでしたか?

また面白そうなネタがあれば調べてみます。


それでは!

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

もしもラムダの中で例外が発生したら(前編)

ある日、 id:cero-tJJUGの重鎮たちと話している中で、とある宿題をもらいましたとさ。

「Java8のラムダの中で例外が発生したら、どうなるんだろう?」


こんにちは、アキバです。
もう皆さんはJava8を使ってみましたか?
f:id:acro-engineer:20140311021635j:plain

とりあえずインストールしてみた人!

・・はーい (おまえか


という冗談はさておき、
今回は、id:cero-t に代わって私が冒頭のお題を調べてみました。

1. SerialStreamで動かしたラムダで例外が発生したら

まずは、小手調べにシングルスレッドの場合を見てみましょう。


検査例外が発生するようなコードをラムダに書いてみると、コンパイルエラーになります。
こんなコードです。

try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(W_FILENAME))) {
    // writer.write() がIOExceptionをスローするので、catchしろと言われる
    lines.forEach(s -> writer.write(s + '\n'));
} catch (IOException ioex) {
    System.out.println("IOException in Writer-try.");
    ioex.printStackTrace(System.out);
    throw new UncheckedIOException(ioex);
}


そこで、ラムダの中で例外をハンドリングしてみると、確かにエラーが出なくなります。
こんなコードです。

try (BufferedWriter writer = Files.newBufferedWriter(Paths.get(W_FILENAME))) {
    lines.forEach(s -> {
        // ラムダの中で例外をcatchする
        try {
            writer.write(s + '\n');
        } catch (IOException ex) {
            System.out.println("IOException in lambda.");
            ex.printStackTrace(System.out);
            throw new UncheckedIOException(ex);
        }
    });
} catch (IOException | RuntimeException ex) {
    System.out.println("Exception in Writer-try.");
    ex.printStackTrace(System.out);
    throw ex;
}


動かしてみるとどうなるんでしょうね。

あ、今回は、writer.write()で例外を発生させるのも面倒なので、

writer.write(s + '\n');

throw new IOException("IOException in writer");

に変えて動かしています。


動かした結果、こうなりました。

IOException in lambda.
java.io.IOException: IOException in writer
	at study.java8.lambda.StreamSample.lambda$streamExceptionSample1$0(StreamSample.java:68)
	at study.java8.lambda.StreamSample$$Lambda$1/149928006.accept(Unknown Source)
	at java.util.ArrayList.forEach(ArrayList.java:1234)
	at study.java8.lambda.StreamSample.streamExceptionSample1(StreamSample.java:65)
	at study.java8.lambda.StreamSample.main(StreamSample.java:33)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:483)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)
Exception in Writer-try.
java.io.UncheckedIOException: java.io.IOException: IOException in writer
	at study.java8.lambda.StreamSample.lambda$streamExceptionSample1$0(StreamSample.java:72)
	at study.java8.lambda.StreamSample$$Lambda$1/149928006.accept(Unknown Source)
           :
          (略)
Caused by: java.io.IOException: IOException in writer
	at study.java8.lambda.StreamSample.lambda$streamExceptionSample1$0(StreamSample.java:68)
	... 9 more

最初の行に出た例外のメッセージが「IOException in lambda.」となっていることから、ラムダ呼び出しの中で例外をcatchしていることがわかります。

次に、「Exception in Writer-try.」と出ています。

おぉ、ラムダ呼出しの外側で、ラムダがスローした例外をcatch出来ていますね。
UncheckedIOExceptionの中身は、ラムダ処理自体で発生した例外でした。


つまり、ラムダの中で検査例外が発生するようなコードは、
無名内部クラスと同じように例外をcatchして処理する必要があるということがわかりました。
ただし、無名内部クラスと大きく違うのは、メソッド定義ではないので、例外の宣言は出来ません。
よって、スロー出来るのは検査例外以外(RuntimeExceptionのサブクラスなど)でなくてはいけないということになりますね。


ふむふむ。


では、いよいよ本題です。

2. ParallelStreamで動かしたラムダで例外が発生したら

Parallelってことは、マルチスレッド動作なわけじゃないですか。

  • 発生した例外は誰が受け取るのか?どこまで伝播するのか?
  • 例外が発生したスレッド全てでcatch処理が行われるのか?
  • 例外が発生しなかった他のスレッドはどのような影響を受けるのか?
  • そもそもちゃんと全部のスレッド処理が終了するのか?

皆さんも気になりますよね?
私も気になります。
(だから、それが本題なんだってば)


まずは、こんなコードを書いてみます。

try {
    List<String> strArray = Arrays.asList("abc", "def", "xxx", "ghi", "jkl", "xxx", "pqr", "stu");
    strArray.parallelStream().forEach(s -> {
        System.out.println("ラムダ開始: id=" + Thread.currentThread().getId());
        try {
            Thread.sleep(100L);
            if (s.equals("xxx")) throw new RuntimeException("ラムダ内で例外: id=" + Thread.currentThread().getId());
        } catch (RuntimeException ex) {
            System.out.println("ラムダ内で例外発生: id=" + Thread.currentThread().getId());
            throw ex;
        } catch (InterruptedException e) {
            e.printStackTrace(System.out);
        }
        System.out.println("ラムダ終了: id=" + Thread.currentThread().getId());
    });
} catch (Exception th) {
    System.out.println("外側で例外をcatch");
    th.printStackTrace(System.out);
}

要するに、文字列が "xxx" だったら例外を吐く、それ以外は正常終了するラムダ処理です。
これをParallelStreamで実行するものです。

"xxx" は2つあるので、2回例外が発生するようになっていますよね。

さて、これを動かしてみるとこうなります。

ラムダ開始: id=1
ラムダ開始: id=15
ラムダ開始: id=14
ラムダ開始: id=16
ラムダ開始: id=12
ラムダ開始: id=17
ラムダ開始: id=13
ラムダ開始: id=18
ラムダ終了: id=18
ラムダ終了: id=13
ラムダ終了: id=17
ラムダ終了: id=14
ラムダ内で例外発生: id=12
ラムダ内で例外発生: id=1
ラムダ終了: id=16
ラムダ終了: id=15
外側で例外をcatch
java.lang.RuntimeException: ラムダ内で例外: id=12

ふむふむ、例外もラムダ呼出しの外側でcatchできました

…と思ったら、ラムダ呼出しの外側でcatchできたのは、id=12のスレッドのみでした。


id=1のスレッドで発生した例外はどこに行ったのでしょうか?


では、もう一回実行してみましょう。

ラムダ開始: id=1
ラムダ開始: id=14
ラムダ開始: id=15
ラムダ開始: id=13
ラムダ開始: id=16
ラムダ開始: id=17
ラムダ開始: id=12
ラムダ開始: id=18
ラムダ終了: id=16
ラムダ終了: id=15
ラムダ内で例外発生: id=13
ラムダ内で例外発生: id=1
ラムダ終了: id=14
ラムダ終了: id=17
ラムダ終了: id=12
外側で例外をcatch
java.lang.RuntimeException: ラムダ内で例外: id=13

ふむふむ...あれ?
今度は、id=18のスレッドで「ラムダ終了」が出ませんでしたよ!


頻度は高くないようですが、ParallelStreamの並列処理には、実はこんな不安定なところがあるようです。


まとめると、

  • なぜ、最初の例外しかラムダの外側でcatchできないのか?
  • なぜ、終了が出なかったスレッドが存在するのか?
  • 終了が出なかったスレッドは、実際には終了したのか、それとも残っているのか?
  • ParallelStreamって、Fork-Joinで待ってないの?

そんなところが疑問ですよね。



後編ではその謎に迫ります!

では。

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

あなたのJavaコードをスッキリさせる、地味に便利な新API 10選(後編)

こんにちは。
アキバです。

本日3/18、ついに、Java8が正式リリースされますね!
もうダウンロードされましたか?ってまだですかね?私はまだです(だって公開前にエントリ書いてるんだもんね)

2014/03/19追記:Oracleのページが更新されました!→こちら


さて、前回に続いて、Java8で追加された地味で便利なAPIを紹介していきます。

今回は、みんな大好きMapConcurrent、あとちょびっとComparatorです。

f:id:acro-engineer:20140311021635j:plain

3. Map操作編

(1) Map#getOrDefault()

これまでは、Mapから値を取得してnullだったらデフォルト値を使用する、みたいなコードを以下のように書いていたと思います。

Map<String, String> map;    // 何らかのMap

String value = map.get("key");
if (value == null) {
    value = defaultValue;
}

Map#getOrDefault()を使うと、この処理が1行になります。

String value = map.getOrDefault("key", defaultValue);

それだけです。ちょっとしたところからスッキリしますね。

(2) Map#replace()

Map#replace()を使うと、該当するキーに対する値を入れ替えられます。

キーが存在することが必須条件ですが、値の指定には2種類あります。

1つ目は、キーさえあれば、強制的に値を書き換えるもの。

Map<String, String> map;    // 何らかのMap
map.replace("key1", "newValue");

この場合は、key1が存在すれば、値はかならずnewValueになります。
key1が存在しないと、何も起こりません。

2つ目は、キーがあって値が一致したものだけ書き換えるもの。

Map<String, String> map;    // 何らかのMap
map.replace("key1", "oldValue", "newValue");

この場合、key1がoldValueである場合に限り、newValueになります。
それ以外の場合は、何も起こりません。

いずれの呼び出しも、戻り値として置換前の値が得られます。
置換しなかった場合は、nullとなるので、書き換わったかどうかを調べることもできます。

(3) Map#computeIfPresent()

Map#computeIfPresent()は、キーが存在している(かつnullではない)時に、値を加工するためのメソッドです。

...なんとなく便利そうですが、いまいちイメージしづらいですね。
長々と説明するよりも、実際のコードを見てみましょう。

例として、Mapのキーを探索して、存在する場合に先頭に「★」を追加する処理を書いてみます。


今までは、以下のように書いていたと思います。

Map<String, String> map;   // 何らかのMap

// キーが存在する名前だけ、先頭に「★」を追加する
String[] keys = new String[] { "a", "c", "d" };
for (String key : keys) {
    if (map.get(key) != null) {
        String oldValue = map.get(key);
        String newValue = '★' + oldValue;
        map.put(key, newValue);
    }
}


Java8では、以下のように書けます。

Map<String, String> map;   // 何らかのMap

// キーが存在する名前だけ、先頭に「★」を追加する
String[] keys = new String[] { "a", "c", "d" };
for (String key : keys) {
    map.computeIfPresent(key, (k, s) -> '★' + s);
}

イディオム的にコードを書くよりも、computeIfPresentという名前で何をしたいのか/何をしているのかがハッキリして良いと思います。
しかも、コードはスッキリ。

ラムダ式になっているところは、BiFunctionというJava8で追加された関数インタフェースです。
ちなみに、Map#computeIfPresent() は default メソッドになっていて、以下のコードであるとAPIドキュメントに書かれています。

if (map.get(key) != null) {
    V oldValue = map.get(key);
    V newValue = remappingFunction.apply(key, oldValue);
    if (newValue != null)
        map.put(key, newValue);
    else
        map.remove(key);
}

最初に書いたコードとほとんど同じですね。
また、これを見ると、BiFunctionに渡される2つの引数は、1つ目がキー、2つ目が(処理前の)値であることがわかります。

4. Concurrent編

(1) LongAdder

Javadocを見ると、「初期値をゼロとした、1つ以上の値の合計を扱う」とあります。
要は、値を合計するためのクラスなのですが、java.util.concurrent.atomicパッケージに属しているだけあって、マルチスレッドからのアクセスに対応しています。

シングルスレッドで使っているとつまらないのですが、こんな感じです。

long[] longArray = { 1, 2, 3, 4, 5 };

LongAdder adder = new LongAdder();
LongAdder count = new LongAdder();
for (long longValue : longArray) {
    adder.add(longValue);
    count.increment();
}

System.out.println("elements count=" + count.sum() + ", sum=" + adder.sum());
// → elements count=5, sum=15 と表示される

他にも、sumThenReset() があるので、一定期間毎の合計を出したりできるかもしれません。

似たような用途に AtomicLong も使えるのですが、APIドキュメントを見ると
「スレッドの競合が高い状況下では LongAdderの方がメモリを消費する代わりに、高速に動作する」
という主旨のコメントが書かれています。

Java8のソースを見る限りでは、合計値の算出タイミングの違いが影響しているようですね。

AtomicLongは、値を追加(addAndGet)する毎に計算を行っていますが、LongAdderの方はCellという内部用のオブジェクトを配列で保持するようになっていて、合計を参照する(sumなど)のタイミングで初めて合計値を計算する仕組みになっているようです。

ということは、複数スレッドが競合するタイミングで計算を含んだ処理でロックの取り合いにはならないということでしょうか。


どのような条件で、どれだけ高速なのでしょうか?
今回もベンチマークしようと思っていたのですが、海外のエントリでベンチマークをとった結果が出ていました。
→日本語訳の引用あり:Java 8ニュース:新しいアトミックナンバーを含むRC版を公開、モジュール化は外れる
 (「新しいアトミックナンバーの実装」という項を見てください)
→原文:Java 8 Performance Improvements: LongAdder vs AtomicLong - Palomino Labs Blog

これによると、シングルスレッドではAtomicLongの方が速いけど、マルチスレッドで競合アクセスさせた場合はLongAdderの方が性能が良いとなっています。純粋なカウントアップ処理を行っているとのことですが、特性としてはドキュメントに書かれている通りになりました。

(2) LongAccumulator

Javadocを見ると、「1つ以上の値の集合を提供された関数で更新する」とあります。
なんとなくLongAdderと同じようなクラスですが、コンストラクタにはLongBinaryOperatorを
指定することになっており、ちょっと凝った動作ができるようです。

簡単な例として、値の2乗を合計するようにしてみましょう。

long[] longArray = { 1, 2, 3, 4, 5 };

LongAccumulator accumulator = new LongAccumulator((x, y) -> x + y * y, 0L);
for (long longValue : longArray) {
    accumulator.accumulate(longValue);
}

System.out.println("square sum=" + accumulator.get());
// → square sum=55 と表示される
//    (1 + 4 + 9 + 16 + 25 = 55)

こちらも、マルチスレッドで処理する場合には利用を検討してみましょう。

5. Comparator編

(1) naturalOrder() / reverseOrder()

最後に、皆さんもよく使っているであろう Comparator で見つけたメソッドです。

唐突ですが、今までJava8の勉強をしてきて、たくさんラムダ式を見たりしてきましたよね。
なので、「文字列を自然順序付けの昇順/降順のソートをする」と聞くと、つい、以下のように書きたくなるかもしれません。

// 昇順にソートする
Arrays.sort(array, (s1, s2) -> s1.compareTo(s2));

// 降順にソートする
Arrays.sort(array, (s1, s2) -> s2.compareTo(s1));

このくらいの処理ならば、ラムダ式でも十分にわかりやすいとは思いますが、Comparator#naturalOrder() と Comparator#reverseOrder() を使うとよりシンプルに記述できます。

// 昇順にソートする
Arrays.sort(array, Comparator.naturalOrder());

// 降順にソートする
Arrays.sort(array, Comparator.reverseOrder());

メソッドの名前からソート順を理解しやすく、さらにコードもスッキリしましたね。


f:id:acro-engineer:20140311021635j:plain:small

いかがでしたでしょうか。

他にも追加されたAPIはまだまだたくさんあります。

前回書いたベンチマークのように、構文は便利になったが、果たして実用に耐えられるのか?ということは常に注意して使わなければなりませんし、その為に中身の動作をよく理解しておく必要があります。

それでも、知らなければ使おうとも思わないという面もあると思います。

ぜひ、皆さん自身でも試してみて、開発効率を上げていきましょう!


ではでは~

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ