Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Spring XMLでCamelを書いて、twitterとelasticsearchを連携(Apache Camel入門 その3 Spring XMLについて)

こんにちは、もっと多くのJava技術者がCamelで楽できるはずと信じているツカノ(@)です。

前回は、発達したJavaのエコシステムの恩恵にあずかり、実質数行のJavaコードで、twitterとelasticsearchを連携させることができました。
今回は、Spring XMLでCamelを記述することで、twitterとelasticsearchを連携させてみましょう。

f:id:acro-engineer:20131125064348p:plain

「Camelって何?」って人は、前回までの内容を確認しておきましょう。

CamelのDSLについて

これまで説明してきたCamelのサンプルは、Javaのコードでした。実はこれはCamelの一面でしかありません。Camelは様々なDSLで表現することができ、Java DSLはその中のひとつです。CamelのDSLページによると、今のところ、以下のようなDSLを使うことができます。

プロジェクトの特性などに合わせて好きなDSLを使えるのは、Camelの良いところです。今後、Clojureとか、JRubyのDSLが出てきたら面白いですね。
さて、Springを使っているプロジェクトは多いと思いますので、Spring XMLを使ってみましょう。今までJavaのコードで表現していたCamelをSpringのXMLで表現します。このDSLを使うと、Camelのoption設定等をハードコードせずにCamelを利用することができます。また、Springの資産(DI等)を利用することができます。今回の例では登場しませんが、Springのscopeを利用すれば、グローバルなオブジェクトを利用することも、スレッド単位で異なるオブジェクトを利用することもできます。

Spring XMLで実装してみましょう

mavenをインストールしてあれば、アプリケーション開発のひな型を生成できます。以下のコマンドを実行してください(groupId, artifactId, versionは作成するアプリケーションに合わせて読み換えてください)。

mvn archetype:generate -DgroupId=snuffkingit \
    -DartifactId=camel-example-spring \
    -Dversion=1.0.0-SNAPSHOT \
    -DarchetypeGroupId=org.apache.camel.archetypes \
    -DarchetypeArtifactId=camel-archetype-spring \
    -DarchetypeVersion=2.12.1

このシリーズの1回目で似たようなこと行いましたね。そこではarchetypeArtifactIdとして「camel-archetype-java」を指定していました。それを「camel-archetype-spring」に変更しただけです。

さて、mavenでひな型を生成ましたが、今回はJavaコードは生成されていません。代わりに生成される、以下の設定ファイルを変更します。

src/main/resources/META-INF/spring/camel-context.xml

前回作成したTwitterCrowler.javaをcamel-context.xmlに置き換えると以下の通りです。(twitter APIのconsumerKey, consumerSecret, accessToken, accessTokenSecretについてはマスクしています)

<?xml version="1.0" encoding="UTF-8"?>
<!-- Configures the Camel Context-->

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camel:camelContext>
    <camel:route>
      <camel:from uri="twitter://search?type=direct&amp;keywords=camel&amp;consumerKey=xxx&amp;consumerSecret=xxx&amp;accessToken=xxx&amp;accessTokenSecret=xxx"/>
      <camel:marshal>
        <camel:json library="Jackson"/>
      </camel:marshal>
      <camel:to uri="elasticsearch://elasticsearch?operation=INDEX&amp;indexName=twitter&amp;indexType=tweet"/>
    </camel:route>
  </camel:camelContext>

</beans>

直観的にイメージは伝わると思いますが、詳細はSpring XMLのページを確認してください。また、前回のコンポーネント説明でcamel-twittercamel-elasticsearchの使い方ページを確認した際に気がつかれたかもしれませんが、Camelの公式サイトでは多くのページでJava DSLとSpring XMLの両方で解説してあります。

Spring XMLを使ったCamelの概要は以下の通りです。

  • camelContextタグに囲った中にCamelの処理を記述します。このタグの外側は通常のSpringのXMLとして使えます。
  • camelContextタグの中にrouteタグを書き、その中にrouteの構成を記述します。
  • fromやtoはタグ名に置き換えて、uriは属性として記述します。
  • 注意点としては、Javaでは文字列に「&」を使えますが、XMLの属性には「&」は使えません。「&」はエスケープが必要なため、「&」→「&amp;」に置き換えます。

また、JSONに変換する部分はCamelでのJSONを説明したページが参考になります。

前回同様、pom.xmlに以下のdependencyを入れることも必要です。

    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-twitter</artifactId>
      <version>${camel-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-elasticsearch</artifactId>
      <version>${camel-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-jackson</artifactId>
      <version>${camel-version}</version>
    </dependency>

ちなみに、archetypeArtifactIdにcamel-archetype-springを指定してgenerateしているため、生成されたpom.xmlのdependencyにはcamel-springが予め記述されています。

Spring XMLを実行してみましょう

では、これを実行してみましょう。
手っ取り早く実行するには、mavenで実行して下さい。

mvn camel:run

Javaコマンドから実行するには、以下のように実行してください。(Camelから環境変数を参照している訳ではないため、${CLASSPATH}は直接記述しても問題ありません)

java -cp ${CLASSPATH} org.apache.camel.spring.Main

デフォルトでは「${CLASSPATH}/META-INF/spring/*.xml」というファイルを探し、それを設定ファイルとして利用します。具体的にファイルを指定する場合は、引数「-fa」で指定します。(Camelから環境変数を参照している訳ではないため、${CLASSPATH}、${CONTEXT_PATH}は直接記述しても問題ありません)

java -cp ${CLASSPATH} org.apache.camel.spring.Main -fa ${CONTEXT_PATH}/camel-context.xml

実行結果をkibanaで見てみると以下のようになりました。
f:id:acro-engineer:20131211081825j:plain

と言う訳で、Spring XMLを使って、twitterとelasticsearchを連携させることができました。

まとめ

3回にわたりApache Camelの入り口について説明してきましたが、その強力さの一端は伝わったでしょうか。
このようなことができる背景には以下のような要因があります。

  • Java製のOSSは非常に多いこと(今回の例ではtwitter、elasticsearchへのアクセス)
  • OSS同士をCamelで連携できること
  • Spring XMLを使えばハードコーディングせずCamelを使ったアプリケーションを書けること

Camelのコンポーネント一覧を見ると、twitterやelasticsearchだけでなく、多数のOSSに関するコンポーネントが提供されていることが分かります。また、対応しているOSSは日々増えています。「Camelを積極的に使い、使いづらければCamelのコンポーネントを修正してコミュニティにフィードバック」という流れができると、より使いやすさが増すのではないかと思います。

このシリーズで説明したCamelの機能は本当に触りの部分です。Camelでは多くの処理を実現できます。分岐させたり、データ変換したり、特定部分をマルススレッド化したり、、、気になった方は、是非、Camelを利用して今使っているOSSにアクセスしてみてください。
Camelを使って楽をしましょう!
 

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

ラムダ禁止について本気出して考えてみた - 9つのパターンで見るStream API

こんにちは @ です。

今日のテーマは・・・ラピュタ禁止令!

バルス!

いや違う。ラムダ禁止令、です。


さて、なかなかの滑り出しですが、今日はただのラムダの紹介ではなく、禁止令に主眼を置いて語ります。

このエントリーは、Java Advent Calendar 2013の12/16分の投稿です。
http://www.adventar.org/calendars/145

前日は @sugarlife さんの JDK 8 新機能ダイジェスト (JDK 8 Features) です。
翌日は @setoazusa さんです。

ラムダ禁止令はあり得るのか?

勉強会やその懇親会などで、たびたび「ラムダ禁止令が出るのではないか」が話題に上ることがあります。
「そりゃ禁止する組織もあるでしょうね」というのがお決まりの答えなのですが、ただそれに従うだけでは面白くありませんし、要素技術の発展も滞ってしまうでしょう。そもそも新しい技術を食わず嫌いせず、必要に応じて利用できる文化にしたいですよね。

そんな事を考えながらも、ただStream APIについてはちょっと考えないといけないな、と思わされたのが、Java Advent Calendar 5日目の記事でした。

ToIntFunction<Map.Entry<Car, List<Sale>>> toSize = e -> e.getValue().size();
Optional<Car> mostBought;
mostBought = sales.collect(Collectors.groupingBy(Sale::getCar))
        .entrySet()
        .stream()
        .sorted(Comparator.comparingInt(toSize).reversed())
        .map(Map.Entry::getKey)
        .findFirst();
if(mostBought.isPresent()) {
    System.out.println(mostBought.get());
}

Java Advent Calendar 2013 5日目 - Java 8 Stream APIを学ぶ - kagamihogeの日記

ここで取り上げられている例は、特にStream APIに慣れていないうちは、パッと見ても何をしたいのかがよく分かりません。これを見て「可読性が低い」と捉える向きがいてもおかしくありません。

こういう事がエスカレートすると・・・

若者が複雑なStream APIを書く
 ↓
先輩がレビューができない or テスト不十分で見落としが起きる
 ↓
問題が発生する
 ↓
なぜなぜ分析でStream APIの可読性が槍玉にあがる
 ↓
Stream API全面禁止
 ↓
ついでにラムダも全面禁止

_人人人人人人人人人人_
> 突然のラムダ禁止 <
 ̄Y^Y^Y^Y^Y^Y^Y^Y^Y ̄

こんな事態が起きるかも知れない、と思いました。特にStream APIが何なのかをよく分かっていない人たちが、Stream APIラムダ式を混同して「ラムダ式禁止」と言いだすことは十分にあり得ます。

今日はこの辺りの話、つまり「業務でStream APIをどのぐらいまでなら使えるか」「使うための注意点は何か」ということを本気出して考えたいと思います。

ラムダ式禁止」だけは抵抗しよう

まず最初に言っておくと「ラムダ式」の禁止はあり得ません。言ってしまえばほら、ラムダ式はただの匿名クラス(無名クラス)のシンタックスシュガーみたいなものであって、何も難しいところはないからです。

簡単なおさらいとして、Comparatorの例を見てみましょう。

Comparator<Emp> comp1 = new Comparator<Emp>() {
    @Override
    public int compare(Emp emp1, Emp emp2) {
        return emp1.getSal() - emp2.getSal();
    }
};

Comparator<Emp> comp2 = (emp1, emp2) -> emp1.getSal() - emp2.getSal();

Comparator<Emp> comp3 = Comparator.comparingInt(Emp::getSal);

このcomp1、comp2、comp3はいずれも同じ処理をするComparatorです。comp3はやや見慣れない形だとしても、comp2までなら昔ながらのエンジニアにもまだ理解できる記述だと思います。

そのため、匿名クラス(無名クラス)自体を禁止している組織ならまだしも、そうでない組織でラムダ式自体が禁止されることは賛同できません。それこそ「食わず嫌い」の典型なので、もし皆さんの組織(の標準化グループ)がラムダ式禁止だと判断しそうなら、ぜひ抵抗してください。

あるいは、ラムダ式禁止を逆手に取って、Stream APIを匿名クラスまみれで書くことで、抵抗しても良いかも知れません(笑)
# 半年後、そこには元気に走り回るStream API禁止令の姿が!

ただし、単に禁止に反対するだけでなく、相手の懸念している「可読性が下がって困る」というところに応えるためにも、「ここまでなら使っても問題は起きないでしょう」という提案も同時に行なうべきだと思います。

その辺りが、今日のテーマになります。

Stream APIは、どこまで許されるのか?

さて、繰り返しになりますが、今日の本題はラムダ式ではなく、Stream APIです。
ラムダ式自体は簡単なので禁止にする理由がないと書きましたが、Stream APIの方は簡単ではなく、これをむやみやたらに使って炎上すると、Stream API自体を禁止とする組織が出てきてもおかしくありません。

では、どこまでならサクサクと読めるのか、考えてみましょう。

今回は、特に「あまりStream APIに慣れていない人」をターゲットにして書きますので、「自分、streamの数珠つなぎ200行ぐらい普通に読めるんで」のようなマサカリを装備する必要ありません。


なお、今回紹介する機能や処理の「禁止度」について、以下のようにレベルづけをします。

C : 業務で使っても全く問題ないレベル。
B : やや疑問を呈されるけど、積極的に使いたいレベル。
A : 業務では使わない方が良いレベル。
S : 積極的に禁止したいっていうか、使ったら書き直させるレベル。

特に禁止度Bあたりは、古豪のエンジニアから「読めないから使うな」と言われかねないところなので、組織のレベルを見ながら利用するようにすべきです。

1. 同じオブジェクトに対する連続した操作(禁止度:C)

まずStream APIを使った典型的な例が、同じオブジェクトに対してフィルタやソートなどの連続した操作を行なうというものです。

List<Emp> filterAndSort1(List<Emp> list) {
    return list.stream()
            .filter(emp -> emp.getSal() > 1000)
            .sorted((emp1, emp2) -> emp2.getSal() - emp1.getSal())
            .collect(Collectors.toList());
}

このぐらいであれば、「給料が1000より大きい」ものを抽出して「降順にソート」していることを読み取ることは容易です。問題ありませんね。

2. Comparatorのstaticメソッドを使ったソート(禁止度:B)

上に書いたソースを少し修正して、ソートのところをComparator.comparingIntにすることもできます。

List<Emp> filterAndSort2(List<Emp> list) {
    return list.stream()
            .filter(emp -> emp.getSal() > 1000)
            .sorted(Comparator.comparingInt(Emp::getSal).reversed())
            .collect(Collectors.toList());
}

IntelliJ IDEA 12.1.6を使っていると、Emp::getSalのところで「cyclic inference」というエラーを出してくるのですが、実際には問題なく動作します。IDEがエラーを出してくる辺りに、少し不吉なにおいを感じますね。そういう背景もあって禁止度を一つ上げてBにしました。


さらに、OpenJDK8 build111では、以下のような記述は正常にコンパイルされるのですが、

.sorted(Comparator.comparingInt(emp -> emp.sal))
.sorted(Comparator.comparingInt(emp -> emp.getSal()))
.sorted(Comparator.comparingInt(Emp::getSal).reversed())

以下のようにすると、コンパイルエラーになってしまいます。

.sorted(Comparator.comparingInt(emp -> emp.sal).reversed())
.sorted(Comparator.comparingInt(emp -> emp.getSal()).reversed())

どうもreversedをつけると、comparingIntに渡すToIntFunctionの型解決ができなくなってしまうようです。

開発途中とは言え、コンパイラーですら理解できなくなるのだから、人間にも理解しにくい構文なのでしょうか。
昔ながらのJavaエンジニアに配慮するなら、Comparator.comparing*などは使わず、最初の例に挙げたように、

.sorted((emp1, emp2) -> emp2.getSal() - emp1.getSal())

こう書くのが、一番分かりやすいでしょう。

3. mapとreduceを使った操作(禁止度:B)

続いて、特定の項目のみを抽出して集計するような処理を考えます。
下の例は、1000より大きい給料のうち、その2倍の値の平均を取るという、やや謎の処理です。

Double averageSal(List<Emp> list) {
    return list.stream()
            .filter(emp -> emp.getSal() > 1000)
            .mapToInt(emp -> emp.getSal() * 2)
            .average()
            .getAsDouble();
}

これも「map*」メソッドの役割さえきちんと理解してもらえれば、前からスラスラ読むことができます。

続いて、reduceを使う例も見てみましょう。下の例は、1000より大きい給料のうち、給与から1000を引いたものを合計するという、これまたやや謎の処理です。もちろんsum()でも合計はできますが、あえてreduceを使ってみました。

int someCalc(List<Emp> list) {
    return list.stream()
            .filter(emp -> emp.getSal() > 1000)
            .mapToInt(emp -> emp.getSal() - 1000)
            .reduce(0, (x, y) -> x + y);
}

これも「reduce」メソッドの役割さえきちんと理解すれば、スラスラ読んでもらえるものでしょう。

しかしながら、いわゆる関数型プログラミングに慣れていないエンジニアには、mapやreduceという処理には親しみがないため、これも読みにくいと言われかねません。

この辺りを標準的に使いたいのであれば、社内勉強会などを開催して、まずは「filter」「sorted」「map」「reduce」あたりを説明すると良いのではないかと思います。ここまで使えれば、だいたい勝てます(何に?)

4. groupingByやtoMapを使ったMapへの変換(禁止度:B)

Stream APIを使っていて、便利だなと感じるのがこのgroupingByによる集計処理。SQLのgroup byと同じようなものです。
これは型を変えてしまうだけに処理がやや分かりにくくなるのですが、やはりこれも積極的に使いたいですね。2例続けて見てみましょう。

Map<Dept, List<Emp>> groupByDept(List<Emp> list) {
    return list.stream()
            .sorted((emp1, emp2) -> emp2.sal - emp1.sal)
            .collect(Collectors.groupingBy(emp -> emp.dept));
}
Map<Integer, List<Emp>> rankedBySal(List<Emp> list) {
    return list.stream()
            .sorted((emp1, emp2) -> emp2.sal - emp1.sal)
            .collect(Collectors.groupingBy(emp -> {
                if (emp.sal > 4000) return 1;
                if (emp.sal > 2500) return 2;
                if (emp.sal > 1000) return 3;
                return 4;
            }));
}

前者は部署(dept)によるグループ化を行い、後者は給与水準によるグループ化を行なっています。groupingByがCollectionからMapに変換するための処理であることさえ把握していれば、このソースもサクサクと読むことができます。


また、groupingByでは、SQLと同じように値に集計結果を入れることもできます。

Map<Dept, Long> groupByDeptAndAve(List<Emp> list) {
    return list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept, Collectors.averagingInt(Emp::getSal)));
}

この例では部署ごとに平均給与を計算しています。とてもありそうな処理ですね。


しかしながら、SQLにおいても「集計関数がよく分からない勢」が一定数存在することが現在までに確認されています。
数学の勉強をしてから出直してこいと思うのですが 集計関数によく習熟したメンバでチームを作り、お互いにレビューをできるような体制を作れば、事故ることを減らせると思います。

5. groupingByからのentrySet.stream大作戦(禁止度:A)

続いて、部署(dept)を、人数の少ない順に並べるという処理です。

List<Dept> sortDept1(List<Emp> list) {
    return list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept, Collectors.counting()))
            .entrySet()
            .stream()
            .sorted(Comparator.comparingLong(Entry::getValue))
            .map(Entry::getKey)
            .collect(Collectors.toList());
    }
}

Mapのstream処理自体が分かりにくいという事情はあるにせよ、もともとListであったものをMapにして、さらにentrySetを取り出してstream処理を続けるというのは、型が分かりにくくなり、可読性が落ちると言わざるを得ません。というか、私のこの日本語の説明自体も、よく分かりません

しかもこの辺りから、IntelliJ IDEAの自動補完はほぼ効かなくなりますし、エラーもたくさん出るようになります(でも実際にはコンパイルが通って実行できるので、余計に厄介)

こうなってくると、禁止度はさらに上がってAクラスになるでしょう。


では、このソースのどこが可読性を落としているのでしょうか。ポイントは「entrySet().stream()」にあると私は思います。ここで、せめてcollectした後にローカル変数に代入すれば、まだしも読みやすくなるのではないでしょうか。

List<Dept> sortDept2(List<Emp> list) {
    Map<Dept, Long> map = list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept, Collectors.counting()));

    return map.entrySet()
            .stream()
            .sorted(Comparator.comparingLong(Entry::getValue))
            .map(Entry::getKey)
            .collect(Collectors.toList());
}

部署ごとの人数を一度Mapに代入してから、人数でソートしてkeyのみ(部署のみ)取り出しています。このようにすれば、禁止度はBクラスまで落とせるように思います。

「collectした後は、一時変数に代入する」という新しい鉄則を作るという案、どうでしょうかね。


それにしても、Mapの(entrySetの)Streamは分かりづらいですよね。実は2012年前半の時点ではMapStreamというクラスがあり、分かりやすくMapのstream処理を記述することができました。
なぜ消えたのか、背景や理由はよく把握していませんが、いずれにせよMapStreamが消えたせいで、いまのようなstream処理しかできなくなり、可読性が低くなっているというのが現状です。

6. ネストしたstream(禁止度:A)

Mapの集計を考えると、streamをネストさせたくなることがあります。これも2つ例を続けて掲載します。

Map<Dept, Long> groupByDeptAndFilter1(List<Emp> list) {
    return list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept, Collectors.collectingAndThen(Collectors.toList(),
                    emps -> emps.stream()
                            .filter(e -> e.sal > 1000)
                            .count())));
}
Map<Dept, Long> groupByDeptAndFilter2(List<Emp> list) {
    return list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept))
            .entrySet()
            .stream()
            .collect(Collectors.toMap(entry -> entry.getKey(),
                    entry -> entry.getValue()
                            .stream()
                            .filter(emp -> emp.sal > 1000)
                            .count()));
}

ソースコードの意味、分かりますか?

部署をキーにしたMapを作ったうえで、Mapの値のほうには給与が1000を超える社員の人数を入れています。フィルタ処理を入れたいがために、ただの集計処理が使えず、Streamを利用しています。
ここまで来るとかなり可読性が下がり、事故の原因にもなります。

※2013/12/20 修正 - 初出時のソースに誤りがあり、訂正しました。
Streamをネストせざるを得ない時は、たとえば昔ながらのfor文も併用するというのも、ひとつ読みやすくするための手段になると思います。

Map<Dept, Long> groupByDeptAndFilter3(List<Emp> list) {
    Map<Dept, List<Emp>> map = list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept));

    Map<Dept, Long> result = new HashMap<>();
    for (Entry<Dept, List<Emp>> entry : map.entrySet()) {
        long count = entry.getValue()
                .stream()
                .filter(emp -> emp.sal > 1000)
                .count();
        result.put(entry.getKey(), count);
    }

    return result;
}

ちょっと微妙? ただ、Java8時代には意外とこんなコードが出てくるかも知れません。


もしかすると、業務要件を考えて「あれ、事前にフィルタリングすれば良いだけじゃないの?」と思った方もいるかも知れません。

Map<Dept, Long> groupByDeptAndFilter4(List<Emp> list) {
    return list.stream()
            .filter(emp -> emp.sal > 1000)
            .collect(Collectors.groupingBy(emp -> emp.dept, Collectors.counting()));
}

事前にフィルタリングすれば、シンプルに記述することができます。しかしこうしてしまうと、処理の結果が少し変わってしまうのです。
「給与が1000未満の人しかいない部署」があった場合、groupByDeptAndFilter3までは「0人」として結果を取得できますが、groupByDeptAndFilter4ではそもそも結果のMapに当該の部署は表れません。
今回の例ではそれでも良いかも知れませんが、より複雑な業務処理になると、そのような差分が問題になることも多々あるでしょう。

このように、集計結果が分かりづらくなってくるところも、SQLとよく似ていますね。SQLのエキスパートが必要なことと同様に、Stream APIのエキスパートも必要だと思いますね。

7. streamの外に結果を残す(禁止度:A)

給与の平均と合計を同時に算出したい、という場合です。

void averageAndSum1(List<Emp> list) {
    final Map<String, Integer> dummy = new HashMap<>();
    dummy.put("RESULT", 0);
    double ave = list.stream()
            .mapToInt(emp -> {
                int sal = emp.getSal();
                dummy.put("RESULT", dummy.get("RESULT") + sal);
                return sal;
            })
            .average()
            .getAsDouble();
    System.out.println("ave=" + ave + ",sum=" + dummy.get("RESULT"));
}

ラムダ式がアクセスできる対象はfinalのみなので、ここではdummyというfinalのオブジェクトを定義して、そのオブジェクトに対して値の出し入れをしています。
特にparallelStreamで並列化した時には、確実に問題が起きることも踏まえると(たとえConcurrentHashMapにしていてもです)この記述は避けるべきでしょう。

ちなみに合計と平均の両方を計算したいだけであれば、summaryStatisticsメソッドを利用することで、代表的な集計結果をできます

void averageAndSum2(List<Emp> list) {
    IntSummaryStatistics statistics = list.stream()
            .mapToInt(Emp::getSal)
            .summaryStatistics();
    System.out.println("ave=" + statistics.getAverage() + ",sum=" + statistics.getSum());
}

IntSummaryStatisticsから集計結果を取得することができるのです。

このクラスから取得できないような独自の計算をしたい場合は、大人しくfor文で書くか、自分でCollectorを作るという方法がありそうです。自分でCollectorを作る方法は、また改めて紹介します。

8. stream中に元のオブジェクトを操作(禁止度:S)

やる人がいそうなので、念のため。

Double someOperation(List<Emp> list) {
    return list.stream()
            .filter(emp -> {
                list.remove(emp);
                return emp.getSal() > 1000;
            })
            .mapToInt(Emp::getSal)
            .average()
            .getAsDouble();
}

listのstream処理中に、listに対して要素の追加や削除をするというものです。この例のlist.remove(emp)は恣意的なものなので業務的な意味はありませんが、このように元のオブジェクトに対して操作をするという人は必ず出てくると思います。

元のListがArrayListなのかCopyOnWriteArrayListなのかで動きも変わりますし、ただでさえ動きの掴みづらいstream処理で、このような危険な実装は避けるべきでしょう。
どうしても元のオブジェクトに手を入れながら処理するのであれば(最適なstream処理が思いつかないなら)大人しくfor文で書くべきでしょう。

9. parallelStreamを使ってDB/Webアクセス(禁止度:S)

parallelStreamの効果を試すために、Webにアクセスをするようなサンプルもありますが、実案件ではそのような実装は避けましょう。
マルチスレッドでDB/Webにアクセスをしたいのであれば、スレッド数や、各スレッドの状態をきちんと管理・把握できる、ExecutorService使うべきです。

っていうか、parallelStreamを使ってDBやWebにアクセスなんかしたら絶対に問題が起きるし、むしろ運用中に発現して大問題になってStream API禁止令の引き金になるから、ホントやめて!(><)

まとめ

1. ラムダ式は禁止される理由がない!
2. filter/sort/map/reduce/collect/groupingByあたりの勉強会を行うべし!
3. Comparator.comparing* の使用は少し控えよう!
4. collectをしたら一度ローカル変数に入れよう!
5. Stream APIのエキスパートが近くにいたほうがいい!
6. streamからのstreamとか、ネストしたstreamとかは避けよう!
7. streamの親オブジェクトや、外の変数を触るんじゃない!
8. DBとかWebにアクセスするんじゃない!

まめ知識

「ラムダ」で画像検索したらラピュタのロボット兵が出てきたので、なにごとかと思って調べてみたら、ルパン三世「さらば愛しきルパンよ」に、ロボット兵が「ラムダ」という名前で登場していたようですね。
http://ja.wikipedia.org/wiki/%E3%81%95%E3%82%89%E3%81%B0%E6%84%9B%E3%81%97%E3%81%8D%E3%83%AB%E3%83%91%E3%83%B3%E3%82%88

そう、最初に敢えて滑ったのは、この伏線だったのですよ。敢えての、滑りだったんですよ。敢えての(←しつこい)

それでは皆さん、良いラムダライフを! バルス!

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Kafka+Storm+Elasticsearch+Kibanaでストリームデータ処理の可視化を行ってみた

こんにちは。kimukimuです。


AWS re:Invent 2013Amazon Kinesis が発表されるなど、
ストリームデータ処理に対するニーズの高まりを感じますね。
(Amazon Kinesis は、Stormとも連携できるようになっているようです)。

さて、先日、Storm 0.9.0 が正式リリースされたり、Apache Kafka 0.8.0 が正式リリースされたりしたので、
それらを連携して、ストリームデータの可視化を行うプロトタイプを作ってみました。

1. はじめに

まず、「ストリームデータ」とは、連続的に発生し続けるデータのことを指します。
システムが出力するログやセンサーが発生するデータ、SNSなどで常時発生するメッセージなどが該当します。
今回は、Apacheが出力するログを、ストリームデータとして収集・可視化することを行ってみます。

1-1.やりたいこと

実現したい内容は、以下のような内容です。

  • ログをリアルタイムに収集する。
  • ログの出力状況をリアルタイムにブラウザで表示させる。
  • スケールアウトを考慮して、分散処理を行う。

1-2.利用するもの

今回利用したプロダクトは以下の通りです。

尚、Stormのインストーラは下記の場所で公開&随時更新していますので、お使いください。
acromusashi/storm-installer · GitHub

2.どんな構成になるのか?

2-1.構成図

今回作った構成は下記のようになります。

2-2.各要素の役割

実際の処理の流れとしては下記のようになります。

  1. Kafkaが各サーバ上でログを収集する。
  2. StormのKafkaSpoutが、Kafkaに蓄積されたログを取得する。
  3. Storm内で、ElasticsearchBoltが、分散して、Elasticsearchにログを投入する。
  4. Kibana3がElasticsearchに投入されたログの統計情報を表示する。

ソースについてはElasticsearchBoltの抜粋部のみこの記事に記載しますが、
整理が完了したら後程公開しますのでお楽しみに。

ElasticsearchBoltは下記のように実装しています。
ここでのclientはElasticsearchのクライアントインスタンス、converterはTupleから投入するデータを生成するコンバータです。

/**
 * {@inheritDoc}
 */
@Override
public void execute(Tuple input)
{
    String documentId = null;
    String indexName = null;
    String typeName = null;
    String document = null;

    try
    {
        documentId = this.converter.convertToId(input);
        indexName = this.converter.convertToIndex(input);
        typeName = this.converter.convertToType(input);
        document = this.converter.convertToDocument(input);

        IndexResponse response = this.client.prepareIndex(indexName, typeName, documentId).setSource(
                document).setPercolate(this.percolate).execute().actionGet();

        if (logger.isDebugEnabled() == true)
        {
            String logFormat = "Document Indexed. Id={0}, Type={1}, Index={2}, Version={3}";
            logger.debug(MessageFormat.format(logFormat, response.getId(), typeName, indexName,
                    response.getVersion()));
        }
    }
    catch (Exception ex)
    {
        String logFormat = "Document Index failed. Dispose Tuple. Id={0}, Type={1}, Index={2}";
        logger.warn(MessageFormat.format(logFormat, documentId, typeName, indexName), ex);
    }

    getCollector().ack(input);
}

3.実際に動かしてみる

では、実際にログを流して結果をKibana 3で確認してみます。

すると・・・?
下記のような形で簡単な統計情報を表示することができました。
HTTPリクエストのレスポンスタイム平均値、リクエスト回数、アクセス元ホスト、ステータスコードといった
基本的な統計が表示できることが確認できました。
実際の画面上では、随時グラフが更新されていくので、どのような動作になっているのかが、リアルタイムにわかります。

4.何が良いのか?

今回のプロトタイプはつまりは
「ストリームデータを収集し、Stormで処理/変換を行ってElasticsearchに投入、Kibana 3で統計情報を可視化」
のプロトタイプ・・・という形になります。

このプロトタイプを応用することで、以下のようなことが実現できると考えています。

  • (異常検知)ログやイベントをリアルタイムに収集し、サーバ動作やユーザアクセスの異常検知などを行い、可視化する。
  • (M2M)センサーデータを受信し、センサーデータの統計処理を行い、可視化する。
  • (評判分析)SNSのメッセージ内容を解析し、その内容をクラスタリングし、可視化する。

色々夢は広がりますが、とりあえず今回はこのあたりで。


Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

twitterとelasticsearchを簡単に連携させる(Apache Camel入門 その2 コンポーネントについて)

こんにちは、もっと多くのJava技術者がCamelで楽できるはずと信じているツカノ(@)です。

よくJavaで書くと冗長だと言われます。確かにそうだし、欠点もいろいろありますが、誇れる点も多くあります。そのひとつが、発達したJavaのエコシステムです。Javaで書かれたOSSは非常に多く、GitHub上に登録されている言語で最も多いのはJavaです。
自分に必要なOSSを使いこなせれば、実はそんなに書かなくて良いことが多いはずです。そのサポートを行ってくれるのがCamelです。

f:id:acro-engineer:20131125064348p:plain

ちなみに、今回はJava Advent Calendar 2013の10日目の投稿でもあります。9日目は櫻庭さんによる、以下の投稿でした。

11日目は高橋徹さんによる、以下の投稿です。

さて、前回はいかがでしたか? ぶっちゃけ、「Hello, World!」程度じゃCamelのパワーは伝わりきらなかったと思います。前回は導入ということで、雰囲気を伝えることに注力しましたが、今回はもっと面白いサンプルを使って解説します。この記事を読めば、「自分の仕事で使えないか調べたくなる」くらいCamelのパワーが伝わるのではないかと思います。

コンポーネントの話

それでは、本題に入っていきましょう。今回はCamelのコンポーネント周りについて、紹介します。
Camelのコンポーネントを使いこなすことによって、発達したJavaエコシステムの恩恵を受けることができます。
そのあたりを詳しく説明します。

今回は、次のようなプログラムを作りながら説明します。

  • twitterからキーワード「camel」で検索する
  • 検索結果をJSONに変換する
  • elasticsearchに書き込む

この目的なら通常はTwitter River Pluginを使うのかもしれませんが^^ ここでは、Camelを使います。

コンポーネントの調べ方

twitterやelasticsearchにアクセスする部分は、世の中的にはtwitter4jやelasticsearchのclientがあるはずです。こういう「OSSにアクセスする部分」をCamelではコンポーネントとして提供しています。

では、どのようなコンポーネントがあるのでしょうか? それは、Camelのコンポーネント一覧を見ると分かります。ここでは、Camelで提供しているコンポーネントがアルファベット順に並んでいます。
これを見て行くと、、、ありますね。「camel-elasticsearch」や「camel-twitter」があります。

OSSの名称の部分は、個々のコンポーネントの使い方が書かれたページにリンクしています。
ひとまず、camel-twitterの使い方を見てみましょう。

まずは、Mavenのdependencyにコンポーネントを登録するよう、書いてあります。

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-twitter</artifactId>
    <version>${camel-version}</version>
</dependency>

次にURI formatの説明として、以下のように書いてあります。

twitter://endpoint[?options]

この文脈で「URIって何?」と思うかもしれません。説明します。
前回のサンプルでfromやtoの引数に指定した文字列を、Camelでは「Endpoint URI」と言います。これは以下のようなフォーマットです。
f:id:acro-engineer:20131210080622p:plain
※図はCamel in Actionから引用しました。

SchemeとはCamelのコンポーネントを表しています。今回のサンプルでは、twitterやelasticsearchのコンポーネントにあたります。
Context Pathコンポーネントによっても違いますが、概ねどの機能を呼び出すかを表しています。
Optionsではそのコンポーネントを利用するための詳細なパラメータを指定します。

このように考えて、camel-twitterの使い方を読んでいくと、今回のサンプルでは以下のようなEndpoint URIを指定すれば良いことが分かります。(twitter APIのconsumerKey, consumerSecret, accessToken, accessTokenSecretについてはマスクしています)

twitter://search?type=direct&keywords=camel&consumerKey=xxx&consumerSecret=xxx&accessToken=xxx&accessTokenSecret=xxx


また、camel-elasticsearchの使い方ページを読んでいくと、以下のようなEndpoint URIを指定すれば良いことが分かります。
(clusterName=elasticsearch, indexName=twitter, indexType=tweetとしています)

elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet

camel-twitter以外もまとめて書くと、以下のdependencyをpom.xmlに入れます。

    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-twitter</artifactId>
      <version>${camel-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-elasticsearch</artifactId>
      <version>${camel-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-jackson</artifactId>
      <version>${camel-version}</version>
    </dependency>

実装してみましょう

さて、ここまで分かったところで、この処理をCamelで実現するコードを書いてみましょう。

  • TwitterCrowler.java
package snuffkingit.camel.example;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.dataformat.JsonLibrary;

public class TwitterCrowler {

    public static void main(String... args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                from("twitter://search?type=direct&keywords=camel&consumerKey=xxx&consumerSecret=xxx&accessToken=xxx&accessTokenSecret=xxx") // (1)
                .marshal().json(JsonLibrary.Jackson) // (2)
                .to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet"); // (3)
            }
        });
        context.start();
        Thread.sleep(10 * 1000);
        context.stop();
    }
}

前回解説したあたりは飛ばして、簡単に解説します。
(1) twitterからキーワード「camel」で検索します。
(2) twitterからの検索結果をJSON形式に変換します。
(3) JSON形式の文字列をelasticsearchに書き込みます。

これを動かしてみると、twitterからキーワード「camel」で検索し、JSON形式に変換し、elasticsearchに書き込むことができます。
(今回の本題ではないため、twitter APIの登録や、elasticsearchのセットアップに関する説明は省略しています。また、1回検索して終了させていますが、常駐プロセスにして定期実行、というのもできます)

まとめ

今回のコードを振り返ってみると、以下のような特長があります。

  • どのようにOSSにアクセスするのか分かっていれば、個々のOSSのAPIを直接使わなくて良い
  • OSSへのアクセス方法はEndpoint URIとして抽象化されており、細かなパラメータは違えどフォーマットは共通
  • Endpoint URIで記載することアクセスできるため、実装量が削減される

そして、Camelのコンポーネント一覧を見ると分かるように、Camelはかなり数のOSSに対応しています。この資産を使えば、OSSをつないだ処理を簡単に書くことができるようになります。

自分の仕事で使ってるOSSのコンポーネントが用意されているか、調べたくなったのではないでしょうか?
ぜひ、Camelを使ってもっと楽をしましょう!
 

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

RxJavaを使ってCallback Hellから脱出する( Java8 ラムダ編 )

id:KenichiroMurata(@ )です。

f:id:acro-engineer:20131206014754j:plain

皆さん、RxJava 使っていますか?

前回はVert.xのmod-rxvertxを使い、RxJavaによってCallback Hellから脱出する方法を説明しました。本記事は、せっかくなのでJava8のラムダを使ったらさらにどのようになるのか?を試してみた補足記事です。

目次は以下の通りです。

  1. 環境
  2. Java8 ラムダによってCallback Hellはいかに解決されたか
  3. まとめ

1. 環境

本記事で試した環境について説明します。

  • Vert.x 2.1M1
  • mod-rxvertx 1.0.0-beta2
  • RxJava 0.15.1
  • Java 1.8.0-ea(build 1.8.0-ea-b118)
  • IntelliJ IDEA 13 CE

Java8を試すために、IntelliJ IDEA 13 CEを使いました。Java8をインストールし、IDEAを使う場合は以下のように初期化してください。

ken@vertx-test $ ./gradlew test
ken@vertx-test $ ./gradlew copyModJson
ken@vertx-test $ ./gradlew idea

2. Java8 ラムダによってCallback Hellはいかに解決されたか

まずは、通常のVert.xで書いた場合のコードのJava8 ラムダ版は次のようになります。例を分かりやすくするために、JSONメッセージの生成部などは外部メソッド化しました。

J8BusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    final EventBus eventBus = vertx.eventBus();
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap("muraken720.testexample");

    JsonObject req1 = createAddRequest("@muraken720");

    // リクエスト1
    eventBus.send("muraken720.vertx.mod.testexample", req1,
        (Handler<Message<JsonObject>>) reply -> {
          assertEquals("ok", reply.body().getString("status"));
          assertEquals("@muraken720", map.get("name"));

          JsonObject req2 = createAddRequest("Kenichiro");

          // リクエスト2
          eventBus.send("muraken720.vertx.mod.testexample", req2,
              (Handler<Message<JsonObject>>) reply1 -> {
                assertEquals("ok", reply.body().getString("status"));
                assertEquals("Kenichiro", map.get("name"));

                JsonObject req3 = createAddRequest("Murata");

                // リクエスト3
                eventBus.send("muraken720.vertx.mod.testexample", req3,
                    (Handler<Message<JsonObject>>) reply2 -> {
                      assertEquals("ok", reply.body().getString("status"));
                      assertEquals("Murata", map.get("name"));

                      testComplete();
                    });
              });
        });
  }

ラムダ式により、無名クラスの部分がスッキリしていますね。

それではこれをRxJavaを使ってコードを書くと次のようになります。

RxJ8BusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap("muraken720.testexample");

    JsonObject req1 = createAddRequest("@muraken720");

    // リクエスト1
    Observable<RxMessage<JsonObject>> obs1 = rxEventBus.send("muraken720.vertx.mod.testexample", req1);

    Observable<RxMessage<JsonObject>> obs2 = obs1.flatMap(reply -> {
      assertEquals("ok", reply.body().getString("status"));
      assertEquals("@muraken720", map.get("name"));

      JsonObject req2 = createAddRequest("Kenichiro");

      return rxEventBus.send("muraken720.vertx.mod.testexample", req2);
    });

    // リクエスト2
    Observable<RxMessage<JsonObject>> obs3 = obs2.flatMap(reply -> {
      assertEquals("ok", reply.body().getString("status"));
      assertEquals("Kenichiro", map.get("name"));

      JsonObject req3 = createAddRequest("Murata");

      return rxEventBus.send("muraken720.vertx.mod.testexample", req3);
    });

    // リクエスト3
    obs3.subscribe(reply -> {
      assertEquals("ok", reply.body().getString("status"));
      assertEquals("Murata", map.get("name"));

      testComplete();
    });
  }

Java8のラムダ式を使うことにより、RxJavaのAction1、Function1といった無味乾燥なクラス名も登場しなくなり、非常にスッキリしました。コード自体も縦には長くなっていますが、ネストが深くならないことがよく分かると思います。

3 まとめ

いかかでしょうか?Java8のラムダを使うことで、Callback Hellから脱出するだけでなく、その記述方法のシンプルになることがご覧頂けたと思います。Java8の正式リリースは3月?だったと思いますが、そのJava8に向けて、RxJavaもVert.xもさらに進化していくと思います。今後どのようになるのか?を想像しながら、あれこれ試してみると面白いと思います。

See also

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

RxJavaを使ってCallback Hellから脱出する( Vert.x がいいね!第5回 )

id:KenichiroMurata(@ )です。

本記事はJava Advent Calendar 2013 - Adventarの6日目の記事です。

f:id:acro-engineer:20131206014754j:plain

皆さん、RxJava 使っていますか?

RxJava はNetflixが開発する Reactive ExtensionsJava版です。Reactive Extensions (Rx)はReactive Programmingを可能にするライブラリです。

私はReactive Programmingとはなんぞや?と語れるほどには詳しくないので、ここでは asynchronous で event-based なプログラムを書くのに便利なライブラリというレベルの紹介とさせて頂きます。:-)

さて、なぜ RxJava なのか?というと Vert.x を使って、asynchronous で eventbusベースのプログラムを書こうとすると、どうしてもコールバックを多段に書くことなり、ネストの深いコード(Callback Hell)になってしまうのを改善したいからです。

そこで本記事では、Vert.xとVert.xにてRxJavaを使えるようにするためのモジュールである mod-rxvertx を使って、Callback Hellからいかに脱出できるのか?を試したいと思います。

ちなみに、mod-rxvertxはVert.xのAPIをObservableを返すようにラップしてくれるライブラリで、現在では以下のAPIに対応しています。

  • EventBus
  • HttpServer/HttpClient
  • NetServer/NetClient
  • Timer

また、RxJavaそのものについては、Wikiページが充実していますので、ぜひご覧下さい。


目次は以下の通りです。

  1. 環境
  2. BusModとIntegration Testを"Rx-ify"してみる
  3. Callback Hellはいかに解決されたか
  4. RxJavaを使うとできること
  5. まとめ

1. 環境

本記事で試した環境について説明します。

  • Vert.x 2.1M1
  • mod-rxvertx 1.0.0-beta2
  • RxJava 0.15.1

また、今回の全ソースはこちらにありますのでcloneしてお試しください。git clone したら以下を実行することで、eclipseからもJUnitテストを動かして試すことができます。

ken@vertx-test $ ./gradlew test
ken@vertx-test $ ./gradlew copyModJson
ken@vertx-test $ ./gradlew eclipse

2. BusModとIntegration Testを"Rx-ify"してみる

それでは、さっそくコードを書いてみましょう。題材は前回のBusModとそのIntegration Testです。

まずはVert.xで通常に書いた場合のコードです。

BusModVerticle.java

public class BusModVerticle extends Verticle {

  public void start() {
    container.logger().info("BusModVerticle start.");

    vertx.eventBus().registerHandler("muraken720.vertx.mod.testexample",
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> message) {
            ConcurrentMap<String, String> map = vertx.sharedData().getMap(
                "muraken720.testexample");

            JsonObject json = message.body();

            if ("add".equals(json.getString("action"))) {
              String key = json.getString("key");
              String value = json.getString("value");
              map.put(key, value);
            } else {
              message.reply(new JsonObject().putString("status", "error")
                  .putString("message", "unknown action."));
            }

            message.reply(new JsonObject().putString("status", "ok"));
          }
        });
  }
}

これをRxJava(mod-rxvertx)を使って書くと次のようになります。

RxBusModVerticle.java

public class RxBusModVerticle extends Verticle {

  public void start() {
    container.logger().info("RxBusModVerticle start.");

    RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    Observable<RxMessage<JsonObject>> obs = rxEventBus
        .<JsonObject> registerHandler("muraken720.vertx.mod.testexample");

    obs.subscribe(new Action1<RxMessage<JsonObject>>() {
      @Override
      public void call(RxMessage<JsonObject> message) {
        ConcurrentMap<String, String> map = vertx.sharedData().getMap(
            "muraken720.testexample");

        JsonObject json = message.body();

        if ("add".equals(json.getString("action"))) {
          String key = json.getString("key");
          String value = json.getString("value");
          map.put(key, value);
        } else {
          message.reply(new JsonObject().putString("status", "error")
              .putString("message", "unknown action."));
        }

        message.reply(new JsonObject().putString("status", "ok"));
      }
    });
  }
}

EventBusをラップしたRxEventBusを使います。Integration Test側のコードも見てみましょう。

BusModVerticleTest.testAddAction

  @Test
  public void testAddAction() {
    container.logger().info("in testAddAction()");

    JsonObject request = new JsonObject().putString("action", "add")
        .putString("key", "name").putString("value", "@muraken720");

    container.logger().info("request message: " + request);

    vertx.eventBus().send("muraken720.vertx.mod.testexample", request,
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> reply) {
            JsonObject json = reply.body();
            container.logger().info("response message: " + json);

            assertEquals("ok", json.getString("status"));

            ConcurrentMap<String, String> map = vertx.sharedData().getMap(
                "muraken720.testexample");

            assertEquals("@muraken720", map.get("name"));

            testComplete();
          }
        });
  }

これをRxJava(mod-rxvertx)を使って書くと次のようになります。

RxBusModVerticleTest.testAddAction

  @Test
  public void testAddAction() {
    container.logger().info("in testAddAction()");

    JsonObject request = new JsonObject().putString("action", "add")
        .putString("key", "name").putString("value", "@muraken720");

    container.logger().info("request message: " + request);

    RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    Observable<RxMessage<JsonObject>> obs = rxEventBus.send(
        "muraken720.vertx.mod.testexample", request);

    obs.subscribe(new Action1<RxMessage<JsonObject>>() {
      @Override
      public void call(RxMessage<JsonObject> reply) {
        JsonObject json = reply.body();
        container.logger().info("response message: " + json);

        assertEquals("ok", json.getString("status"));

        ConcurrentMap<String, String> map = vertx.sharedData().getMap(
            "muraken720.testexample");

        assertEquals("@muraken720", map.get("name"));

        testComplete();
      }
    });
  }

いかがでしょうか?あれ、あまり変わらない?

そうなんですよね。最初にRxJavaを使った例を見たとき、言うほどCallback Hellは解消してなくない?と私は思ってしまいました。まぁ、ただこの例があまりにも単純な例なので、その効果もよく分からない、むしろ複雑になっているではないか?とすら思います。

ただ、実際にコードを書いてみると、ネストが1段減るだけでも、書きやすくなったと感じます。

この例ではあまりにも単純なので、もう少し複雑なコードを書いてみましょう。

3. Callback Hellはいかに解決されたか

それでは、今回のBusModに対してシーケンシャルにリクエストを投げるコードを書いてみます。つまり、リクエストを投げて、レスポンスを受けたら次のリクエストを投げる。さらにレスポンスを受けたら次のリクエストを投げるという例です。

まずは、通常のVert.xで書いた場合のコードは次のようになります。

BusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    container.logger().info("in testSerialAction()");

    final EventBus eventBus = vertx.eventBus();
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap(
        "muraken720.testexample");

    // リクエスト1
    eventBus.send("muraken720.vertx.mod.testexample",
        new JsonObject().putString("action", "add").putString("key", "name")
            .putString("value", "@muraken720"),
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> reply) {
            assertEquals("ok", reply.body().getString("status"));
            assertEquals("@muraken720", map.get("name"));

            // リクエスト2
            eventBus.send(
                "muraken720.vertx.mod.testexample",
                new JsonObject().putString("action", "add")
                    .putString("key", "name").putString("value", "Kenichiro"),
                new Handler<Message<JsonObject>>() {
                  @Override
                  public void handle(Message<JsonObject> reply) {
                    assertEquals("ok", reply.body().getString("status"));
                    assertEquals("Kenichiro", map.get("name"));

                    // リクエスト3
                    eventBus.send(
                        "muraken720.vertx.mod.testexample",
                        new JsonObject().putString("action", "add")
                            .putString("key", "name")
                            .putString("value", "Murata"),
                        new Handler<Message<JsonObject>>() {
                          @Override
                          public void handle(Message<JsonObject> reply) {
                            assertEquals("ok", reply.body().getString("status"));
                            assertEquals("Murata", map.get("name"));

                            testComplete();
                          }
                        });
                  }
                });
          }
        });
  }

ご覧の通り、最初にリクエストを投げてレスポンスを受けるHandlerの中で、次のリクエスト投げる。さらにそのレスポンスを受けるHandlerの中で、次のリクエストを投げる・・・という具合で、どんどんネストが深くなっています。これぞ Callback Hell です。

それではこれをRxJavaを使ってコードを書くと次のようになります。

RxBusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    container.logger().info("in testSerialAction()");

    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap(
        "muraken720.testexample");

    // リクエスト1
    Observable<RxMessage<JsonObject>> obs1 = rxEventBus.send(
        "muraken720.vertx.mod.testexample",
        new JsonObject().putString("action", "add").putString("key", "name")
            .putString("value", "@muraken720"));

    Observable<RxMessage<JsonObject>> obs2 = obs1
        .flatMap(new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() {
          @Override
          public Observable<RxMessage<JsonObject>> call(
              RxMessage<JsonObject> reply) {
            assertEquals("ok", reply.body().getString("status"));
            assertEquals("@muraken720", map.get("name"));

            // リクエスト2
            return rxEventBus.send(
                "muraken720.vertx.mod.testexample",
                new JsonObject().putString("action", "add")
                    .putString("key", "name").putString("value", "Kenichiro"));
          }
        });

    Observable<RxMessage<JsonObject>> obs3 = obs2
        .flatMap(new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() {
          @Override
          public Observable<RxMessage<JsonObject>> call(
              RxMessage<JsonObject> reply) {
            assertEquals("ok", reply.body().getString("status"));
            assertEquals("Kenichiro", map.get("name"));

            // リクエスト3
            return rxEventBus.send(
                "muraken720.vertx.mod.testexample",
                new JsonObject().putString("action", "add")
                    .putString("key", "name").putString("value", "Murata"));
          }
        });

    obs3.subscribe(new Action1<RxMessage<JsonObject>>() {
      @Override
      public void call(RxMessage<JsonObject> reply) {
        assertEquals("ok", reply.body().getString("status"));
        assertEquals("Murata", map.get("name"));

        testComplete();
      }
    });
  }

いかがでしょうか?深くなっていたネストが縦方向に展開されて、ネストが1段で収まっています。少しコードがゴテゴテしてますが、Java8のラムダに対応すれば、もっとスッキリとして見通しがよいコードになりそうです。

4. RxJavaを使うとできること

RxJava にはもっと便利な機能がたくさんあります。ここでmod-rxvertxのテストコード(EventBusIntegrationTest)からいくつか例を紹介します。

Observable.concat

  @Test
  public void testSimpleSerial() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
    final AtomicInteger totalReqs = new AtomicInteger(3);
    final AtomicInteger activeReqs = new AtomicInteger(0);
    
    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        System.out.println("serial-foo["+message.body()+"]");
        message.reply("pong!");
        activeReqs.incrementAndGet();
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.observeSend("foo", "ping!A");
    Observable<RxMessage<String>> obs2 = rxEventBus.observeSend("foo", "ping!B");
    Observable<RxMessage<String>> obs3 = rxEventBus.observeSend("foo", "ping!C");

    Observable<RxMessage<String>> concatenated = Observable.concat(obs1, obs2, obs3);

    concatenated.subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        System.out.println("serial-resp["+message.body()+"]");
        assertEquals("pong!", message.body());
        assertEquals(0,activeReqs.decrementAndGet());
        if (totalReqs.decrementAndGet()==0)
          testComplete();
      }
    });
  }

Observable.concatは2つ以上のObservableをシーケンシャルに実行するように連結してくれます。

ここまで見てくると気づくと思いますが、Observableで処理を書く場合、宣言した処理は直ぐには実行されず、subscribeを呼び出した段階になって実行(遅延実行)されます。コーディングスタイルとしては、Observableに対する処理を宣言的に記述して行き、最後にsubscribeを呼んで処理を実行、という流れになります。

Observable.merge

  @Test
  public void testGather() {

    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> msg) {
        System.out.println("receive");
        msg.reply("pong"+msg.body());
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.send("foo", "A");
    Observable<RxMessage<String>> obs2 = rxEventBus.send("foo", "B");
    Observable<RxMessage<String>> obs3 = rxEventBus.send("foo", "C");
    Observable<RxMessage<String>> merged = Observable.merge(obs1, obs2, obs3);
    
    merged.takeLast(1).subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        assertEquals("pongC", message.body());
        testComplete();
      }
    });
  }

Observable.mergeはパラレルに処理を実行し、全ての処理が完了するまで待ちます。

Observable.reduce

  @Test
  public void testConcatResults() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> msg) {
        System.out.println("receive: " + msg.body());
        msg.reply("pong"+msg.body());
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.send("foo", "A");
    Observable<RxMessage<String>> obs2 = rxEventBus.send("foo", "B");
    Observable<RxMessage<String>> obs3 = rxEventBus.send("foo", "C");
    Observable<RxMessage<String>> merged = Observable.merge(obs1, obs2, obs3);
    Observable<String> result = merged.reduce("", new Func2<String, RxMessage<String>, String>() {
      @Override
      public String call(String accum, RxMessage<String> reply) {
        return accum + reply.body();
      }
    });

    result.takeFirst().subscribe(new Action1<String>() {
      @Override
      public void call(String value) {
        assertEquals("pongApongBpongC", value);
        testComplete();
      }
    });
  }

先のObservable.mergeで一括りにしたパラレル処理の処理結果をrudeceで処理してまとめる、なんてこともできます。

ObservableにはTransforming、Filtering、Combiningといった様々な機能がありますので、ぜひWikiページをご覧下さい。

5 まとめ

いかかでしょうか?Vert.xによるプログラミングは asynchronous で eventbusベースなためにネストが深くなり、Callback Hell に陥りがちです。この Callback Hell から脱出する方法の一つとして、ぜひRxJava( mod-rxvertx )を試してみてください。私もまださわり始めたばかりなのですが、いかんせん日本語の情報はまだ少ないので、こんなこともできる!という情報などある方は、このブログのコメントや、tweetで教え頂けると嬉しいです。

See also

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Apache Camel入門(その1 OSSをつなぐOSS)

こんにちは、ツカノ(@)です。
twitterのタイムラインを見ていると、新しいOSSが登場した、という情報が頻繁に流れてきます。何年か前と比べると「仕事で有用なOSS」の増加速度が早まっているように思います。その分、話題だけれどあまり知らないOSSもたくさんあると感じています。今回は、OSS時代の今にフィットしたOSSの紹介です。

様々なOSSを組み合わせて開発する時代

ここ数年、システム開発をする際に複数のOSSを組合せて利用することが一般的になってきています。このブログの最近1年くらいの記事を振り返ってみても、次のようなOSSについて取り上げています。

  • アプリケーション基盤となるフレークワークではSpringやVert.x
  • ビッグデータ関連ではHadoopやStorm
  • 通信関連ではNettyやZeroMQ
  • NoSQLのミドルウェアではInfinispan

例えば、大量のデータをリアルタイムに処理し、さらにスケールさせる必要があるシステムを開発するケースを考えてみます。
何年も前なら、頑張って独自プロダクトで構築していたかもしれません。一例ですが、現在であればOSSを組み合わせて、

  • Nettyで受信する
  • NoSQLプロダクトを参照しつつ、独自ロジックでメッセージ変換
  • Stormに流し込む

という流れで処理することができます。自分でゼロからすべてを開発せずに実現できます。
重要な事なのでもう一回言いますが、複数のOSSを組合せて利用することが一般的になっています。

一方で、各OSS毎にAPIや利用している通信プロトコルが異なっているため、それなりに学習コストもかかります。また、開発期間も短いため、処理の流れを変えることになったり、特定箇所を分散させたりといったケースでも柔軟に対応できる必要があります。

こういった課題を解決してくれるのが、Apache発のOSSであるCamelです。
f:id:acro-engineer:20131125064348p:plain

という訳で、今回は、強力なパワーを持ったプロダクトであるにも関わらず、日本ではあまり利用されていないように見えるCamelを紹介します。

Camelとは?

システム統合のベストプラクティスをパターン化したものが、Enterprise Integration Patternsとして定義されており、書籍も出版されています。

Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions (Addison-Wesley Signature Series (Fowler))

Enterprise Integration Patterns: Designing, Building, and Deploying Messaging Solutions (Addison-Wesley Signature Series (Fowler))

Camelはここで定義された処理のパターンをシンプルに実現するために開発されました。

分厚くて、お固い雰囲気の本ですが、ご心配なく。Camelで実現できるパターンは、以下のページに図と共にまとまっています(^^)
Apache Camel: Enterprise Integration Patterns

Camelを使うと、「別システムからのリクエストを受け付け、一旦キューに入れて、取り出した所で並列実行する」といったことが簡単にできます。個々のパーツを組み合わせて処理の流れを作るため、リクエストを受け付ける部分を、定期的に情報を取得しに行く処理に変更することもできます。また、並列実行する部分をラウンドロビンで割り当てたり、キーのハッシュ値で振り分けたり、なんてこともできます。よく利用する処理の流れは大抵サポートしています。

先ほど「別システムからのリクエスト」と書いたような、OSSのプロダクトにアクセスしたり通信を行う処理を、Camelでは「コンポーネント」という概念で実現しています。
Camelの素晴らしいところは、豊富なコンポーネントが用意されており、現在も日々増加中であるところです。Camelのコンポーネント一覧を見ると、100以上のコンポーネントが用意されており、冒頭で紹介したOSSに関するコンポーネントも、ほとんどは既に存在するか開発予定になっています。
FTPSSH、HTTP(S)といった昔からある通信プロトコルや、AWSヘのアクセス、ビッグデータやNoSQL関連プロダクトとの通信、FacebookTwitterへのアクセス等、様々なコンポーネントが用意されています。

Camelでは、これら処理のパターンやコンポーネントを組み合わせることで、処理の流れを簡単に定義することができます。

Hello, World!

百聞は一見にしかず。さっそく実際にCamelを使ってみましょう。
mavenをインストールしてあれば、アプリケーション開発のひな型を生成できますので、これを利用します。以下のコマンドを実行してください(groupId, artifactId, versionは作成するアプリケーションに合わせて読み換えてください)。

mvn archetype:generate -DgroupId=snuffkingit \
    -DartifactId=camel-example \
    -Dversion=1.0.0-SNAPSHOT \
    -DarchetypeGroupId=org.apache.camel.archetypes \
    -DarchetypeArtifactId=camel-archetype-java \
    -DarchetypeVersion=2.12.1

これで、camel-exampleディレクトリ配下にCamelを利用するためのpom.xml等が作成されました。
ちなみに、Camel関連で用意されているarchetypeは以下のページに一覧があります。
Apache Camel: Camel Maven Archetypes

src/main/java配下に、ひな型のjavaのコードが作られますが、それは置いといて以下のコードを書いてみましょう。

package snuffkingit.camel.example;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class HelloWorld {

    public static void main(String... args) throws Exception {
        CamelContext context = new DefaultCamelContext(); // (1)
        context.addRoutes(new RouteBuilder() { // (2)
            @Override
            public void configure() {
                from("direct:foo") // (3)
                .to("log:MyCategory"); // (4)
            }
        });
        context.start();
        context.createProducerTemplate().sendBody("direct:foo", "Hello, World!"); // (5)
    }
}

「Hello, Worldで、どうしてこんなに書かせるんだ!」と思われるかもしれませんが、怒らず最後まで聞いてください^^; もっと実際的な話は次回以降紹介しますが、まずは基本の理解です。
(1) まずは、Camelのコンテキストを初期化します。
(2) 処理の流れをCamelでは「ルート(route)」として定義し、その中を流れる「メッセージ」を処理します。この行では、コンテキストにルートを追加しています。
(3) fromとして、ルートの開始部分を指定します。ここでは開始部分に「direct:foo」という名前を付けています。(この意味の詳しい説明は次回以降行います)
(4) toとして、ルートの行き先を指定します。ここでは、メッセージをログ出力しています。
(5) ルートの開始部分「direct:foo」に"Hello, World!"というメッセージを送ります。

このクラスを実行すると、以下のログが出力されます(デフォルトのログフォーマットなので、ちょっと見づらいかもしれません)。

[                          main] DefaultCamelContext            INFO  Apache Camel 2.12.1 (CamelContext: camel-1) is starting
[                          main] ManagedManagementStrategy      INFO  JMX is enabled
[                          main] DefaultTypeConverter           INFO  Loaded 175 type converters
[                          main] DefaultCamelContext            INFO  StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
[                          main] DefaultCamelContext            INFO  Route: route1 started and consuming from: Endpoint[direct://foo]
[                          main] DefaultCamelContext            INFO  Total 1 routes, of which 1 is started.
[                          main] DefaultCamelContext            INFO  Apache Camel 2.12.1 (CamelContext: camel-1) started in 1.027 seconds
[                          main] MyCategory                     INFO  Exchange[ExchangePattern: InOnly, BodyType: String, Body: Hello, World!]
[                          main] DefaultCamelContext            INFO  Apache Camel 2.12.1 (CamelContext: camel-1) is shutting down
[                          main] DefaultShutdownStrategy        INFO  Starting to graceful shutdown 1 routes (timeout 300 seconds)
[el-1) thread #1 - ShutdownTask] DefaultShutdownStrategy        INFO  Route: route1 shutdown complete, was consuming from: Endpoint[direct://foo]
[                          main] DefaultShutdownStrategy        INFO  Graceful shutdown of 1 routes completed in 0 seconds
[                          main] DefaultCamelContext            INFO  Apache Camel 2.12.1 (CamelContext: camel-1) uptime 1.089 seconds
[                          main] DefaultCamelContext            INFO  Apache Camel 2.12.1 (CamelContext: camel-1) is shutdown in 0.017 seconds

横長になっていますので、スクロールしないと見えないかもしれませんが、下から7行目(MyCategoryのところ)の右の方に「Hello, World!」と出ていますね。
「direct:foo」からメッセージが流れ、そのメッセージがログ出力されました。

実は「direct」や「log」というのがCamelのコンポーネントです。「direct」コンポーネントの中をメッセージが流れて行き、「log」コンポーネントがログ出力していたのです。
fromやtoに指定した文字列を替えるだけで様々な処理を行えるのがCamelの大きな魅力です。このあたりは次回のお楽しみです(^^)

参考情報

日本語で読めるCamelの情報はあまり多くありませんが、以下のサイトが参考になると思います。

「Camel in Action」はManning Publicationsから発売されています。

Camel in Action

Camel in Action

  • 作者: Claus Ibsen,Jonathan Anstey,Gregor Hohpe,James Strachan
  • 出版社/メーカー: Manning Pubns Co
  • 発売日: 2010/11/28
  • メディア: ペーパーバック
  • この商品を含むブログを見る


さて、どうでしたか。次回はCamelのコンポーネント周りについて、紹介します。それではまた~
 

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ