Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Amazon Kinesis + Storm を連携させて、ストリームデータ処理を行ってみた

こんにちは。kimukimuです。


昨日はクリスマスイブでしたが、皆さんはどのように過ごされたでしょうか?
私はケーキは買う派ではなく、自分で作る派です。……まぁ、食べるのも自分なのですが……

クリスマスはさておき、今日はKinesisとStormに関する内容です。

AWS re:Invent 2013 で 発表されたAmazon Kinesis が12/17にPublic Betaになり一般公開されましたね!
以下のように、KinesisはStormのコネクタを含んでいる、ということなので、さっそく試してみましょう。

Kinesis does include a connector for porting data to Storm, which AWS General Manager
for Data Science Matt Wood said is a possibility in cases where existing Storm users want to keep using it
for processing data while automating the collection with Kinesis.

Amazon’s streaming data service, Kinesis, is now available — Tech News and Analysis

手始めに、Stormと連携し、Kinesisに投入された文章中の単語出現数をリアルタイムに集計するWordCountのサンプルを作成してみました。

今回使用したソースは全てkimutansk/storm-example-wordcount · GitHubにアップしています。

1.Amazon Kinesis とは?(おさらい)

Amazon Kinesisはストリームデータのリアルタイム処理を行うプラットフォームです。
詳細は下記のページなどで紹介されているため、参照してください。

Amazon Kinesisはストリームデータを処理するキューとして下記のような特徴を持っています。
• 取得した時点でキュー上のメッセージは削除されないため、後で繰り返し取得可能
• あるキューに対して複数のProducer(メッセージ生産者)、Consumer(メッセージ消費者)を紐づけることが可能

加えて、AWSのサービスであるためスケール可能です。
大容量、スケール可能な高速なキューをサービスとして利用でき、
かついくらでも他のコンポーネントから独立してスケール可能というのは非常に大きいと思います。

2.どんな構成になるのか?

2-1.構成図

今回作った構成は下記のようになります。

2-2.各要素の役割

実際の処理の流れとしては下記のようになります。

  1. KinesisPutterが文章をAmazon Kinesisに投入する。
  2. StormのKinesisSpoutが、Amazon Kinesisに蓄積された文章を取得する。
  3. Storm内で、SplitSentenceが、文章を単語単位に分割する。単語ごとにグルーピングを行い、次のBoltに流す。
  4. Storm内で、WordCountBoltが単語をカウントする。

3.Amazon Kinesisの設定を行ってみる

3-1.Kinesis用のユーザ権限確認

では、まずAmazon Kinesisの設定を行います。
Amazon Kinesisのページから申し込みを行ったうえでAWS Consoleにログインすると
下記のようにKinesisのサービスが表示されるようになります。

加えて、その状態ですとIAMの設定画面からAmazon Kinesis用の権限テンプレートも
追加されていますので、KinesisにフルアクセスできるGroupを作成し、ユーザに割り振っておきます。

KinesisにフルアクセスできるユーザのaccessKeyとsecretKeyを用いてKinesisのアプリケーション開発を行います。

3-2.Kinesis Streamの作成

次にKinesisのStream(1個のデータストリームを示す)の作成を行います。
Kinesisの画面を表示し、Streamを作成します。

Streamの名前とShard(Stream中のデータパーティション。この数でStreamの性能が決まる)の数も入力します。
Shardの数に応じて性能が決まるので、性能の値もきちんと確認しておきましょう。

その上でCreateボタンを押下すると、Streamの初期化が始まります。
Createボタンを押下してしばらくは「CREATING」という初期化中のステータスとなります。

初期化が完了するとステータスが「ACTIVE」となり、使用可能になります。

4.作成したソースコード

では、実際に作成したソースコードを示します。
開発を行う場合にはMavenに以下の定義を追加しておく必要がありますので、その前提で進めます。

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.0.0</version>
</dependency>

4-1.KinesisPutter

Kinesisに文章を投入するプログラムです。
KinesisClientを初期化した後Partition用のキーを指定して投入するだけなので、投入側は単純な構成ですね。

  • KinesisWordPutter.java
/**
 * プログラムエントリポイント<br/>
 * <ul>
 * <li>起動引数:arg[0] KinesisStream名称(例:TestStream)</li>
 * <li>起動引数:arg[1] 投入メッセージ数</li>
 * <li>起動引数:arg[2] 送信メッセージ間隔(ms)</li>
 * </ul>
 * 
 * @param args
 *            起動引数
 * @throws InterruptedException 割り込み発生時
 * @throws UnsupportedEncodingException 文字コード不正時
 */
public static void main(String... args) throws InterruptedException, UnsupportedEncodingException
{
    // KinesisClient初期化
    AmazonKinesisClient client = new AmazonKinesisClient(new ClasspathPropertiesFileCredentialsProvider());

    String streamName = args[0];
    int putCount = Integer.valueOf(args[1]);
    long interval = Long.valueOf(args[2]);

    Random random = new Random();
    int sentenceNum = SENTENCES.length;

    // 送信メッセージ数の数だけランダムで文章一覧から取得し、Kinesisに送信
    for (int count = 0; count < putCount; count++)
    {
        int sentenceIndex = random.nextInt(sentenceNum);
        String putSentence = SENTENCES[sentenceIndex];
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setStreamName(streamName);
        putRecordRequest.setData(ByteBuffer.wrap(String.format(putSentence).getBytes("UTF-8")));
        putRecordRequest.setPartitionKey(String.format("partitionKey-%d", count));
        PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
        System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey() + ", ShardID : " + putRecordResult.getShardId());

        TimeUnit.MILLISECONDS.sleep(interval); 
    }
}

4-2.KinesisSpout

Kinesisから文章を取得するStormSpoutです。
こちらは多少複雑で、初めにStream/Shardの情報を取得した上でShardのIteratorを取得し、
Iteratorを用いて文章を取得する形になります。

また、Kinesisは先ほどのコンソールでわかったかと思いましたがReadのリクエスト処理数が少ない(秒間5リクエスト)ため、
1リクエストでまとめてデータを取得し、処理する必要が出てきます。
その関係上、Stormの処理モデル(nextTupleメソッド1回呼び出しあたり1Tuple1Emit)に合わせるため、
Kinesisから取得したレコードを一度リストに蓄積しておき、リストから1Tupleずつ取り出し、Boltに流す構成になっています。

  • KinesisWordPutter.java
/**
 * {@inheritDoc}
 */
@SuppressWarnings({ "rawtypes" })
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
{
    this.collector = collector;
    this.taskIndex = context.getThisTaskIndex();

    this.kinesisClient = new AmazonKinesisClient(new ClasspathPropertiesFileCredentialsProvider());

    DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
    describeStreamRequest.setStreamName(this.streamName);
    describeStreamRequest.setLimit(this.maxShardCount);
    DescribeStreamResult describeStreamResult = this.kinesisClient.describeStream(describeStreamRequest);
    this.shards = describeStreamResult.getStreamDescription().getShards();

    GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
    getShardIteratorRequest.setStreamName(this.streamName);
    getShardIteratorRequest.setShardId(shards.get(this.taskIndex).getShardId());
    getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

    GetShardIteratorResult getShardIteratorResult = this.kinesisClient.getShardIterator(getShardIteratorRequest);
    this.shardIterator = getShardIteratorResult.getShardIterator();

    this.decoder = Charset.forName("UTF-8").newDecoder();
    this.gettedRecords = Lists.newArrayList();
}

/**
 * {@inheritDoc}
 */
@Override
public void nextTuple()
{
    if (this.gettedRecords.isEmpty())
    {
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        getRecordsRequest.setShardIterator(this.shardIterator);
        getRecordsRequest.setLimit(this.maxGetRecordNum);

        GetRecordsResult getRecordsResult = this.kinesisClient.getRecords(getRecordsRequest);
        this.gettedRecords.addAll(getRecordsResult.getRecords());
    }

    if (this.gettedRecords.isEmpty())
    {
        return;
    }

    Record emitRecord = this.gettedRecords.remove(0);
    this.sequence = emitRecord.getSequenceNumber();

    String sentence = null;

    try
    {
        sentence = this.decoder.decode(emitRecord.getData()).toString();
    }
    catch (CharacterCodingException ex)
    {
        throw new RuntimeException(ex);
    }

    this.collector.emit(new Values(sentence), emitRecord.getSequenceNumber());
}

5.実際に動かしてみる

5-1.KinesisPutter

KinesisPutterを動かすと、下記のようにPutしたメッセージが表示されます。
PartitionKeyに合わせてShardIdが分散される構成になっていますね。

(省略)
Successfully putrecord, partition key : partitionKey-1299, ShardID : shardId-000000000001
Successfully putrecord, partition key : partitionKey-1300, ShardID : shardId-000000000000
Successfully putrecord, partition key : partitionKey-1301, ShardID : shardId-000000000001
Successfully putrecord, partition key : partitionKey-1302, ShardID : shardId-000000000001
(省略)

5-2.WordCountTopology

TopologyのDEBUGモードをONにし、Tupleの内容を確認すると単語がカウントされていることがわかります。

(省略)
[Thread-25-WordCount] INFO  backtype.storm.daemon.task - Emitting: WordCount default [the, 1280]
[Thread-33-WordCount] INFO  backtype.storm.daemon.task - Emitting: WordCount default [inch, 800]
[Thread-25-WordCount] INFO  backtype.storm.daemon.task - Emitting: WordCount default [square, 533]
[Thread-33-WordCount] INFO  backtype.storm.daemon.task - Emitting: WordCount default [inch, 801]
(省略)

ということで、KinesisとStormの連携が実現できました。

…ん?Stormのコネクタってどこ?

KinesisSDKには含まれていないのでしょうかね?

amazon-kinesis-clientやawslabsのサンプルを見ましたが、Stormと関連するようなコンポーネントは見つからず…
まぁ、ひとまず連携はできたので、今回はOKとしましょう。

6.KinesisとStormを連携してみた感想

実際にKinesisとStormを連携させた結果は、以下のような感じでした。

  • 基本的な動作は非常に簡単に実現可能
    • 単に1Shardを保持するStreamを作成し、Spoutで取得して処理するだけであれば、ドキュメント&サンプルが充実していることもあり、非常に簡単に実現可能です。
    • ドキュメントのコードをコピーして微修正するだけで実現できます。
  • 異常系処理等がクラウドによって隠蔽されているため、シンプル
    • この手の分散システムを実装する際には必要となる、構成変更時や異常系のハンドリングがクラウドというレイヤを通すことで隠蔽されているため、シンプルに実装出来ます。


なお、KinesisとStormの役割の違いは以下のようになると思います。

  • Kinesis
    • 分散してデータを収集したり、分配したりする。
  • Storm
    • ちょっと複雑な演算処理やストリーム処理とバッチ処理の統合を行ったりする。加えて失敗検知、再送管理を行う。


もう年内も残すところ約1週間ですね。
皆さん、残りの仕事を片付けて、すっきりと来年を迎えられるようにしましょう!(はい、自分自身がガンバリマス)

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

日経ソフトウェアのJava8連載の第2回が出ました!

どうも、かっち です。
最近は随分と寒くなってきて、
みかんとこたつが恋しい季節になってきました。

先取り! Java SE 8

さて、以前 @cero_t がブログに書いていた、
日経ソフトウェアでのJava8連載。
本日2月号が無事に出版されました!!

http://itpro.nikkeibp.co.jp/NSW
http://itpro.nikkeibp.co.jp/NSW

第二回は「ストリームAPI

連載の第二回は「ストリームAPI」を取り上げました。

ストリームAPIの記法に慣れると、これまでのやり方に比べ、
簡単明瞭にデータを処理できることにきっと皆さんも驚きを覚えると思います。
また、これまた簡単に並行処理が実現できてしまうことにも驚きです。
しかし、その使い方には注意があります・・・


そんな内容について記事を書いています。

なお、Acroquestのサイトから2ページ分のPDFを閲覧できますので、こちらも参考にしてください。
http://www.acroquest.co.jp/company/press/2013/1224p1.html

今年の年末年始は、まとまった時間を確保できるまたとないチャンス。
日経ソフトウェアとみかんを片手に、こたつで勉強しましょう!!

以上

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Spring XMLでCamelを書いて、twitterとelasticsearchを連携(Apache Camel入門 その3 Spring XMLについて)

こんにちは、もっと多くのJava技術者がCamelで楽できるはずと信じているツカノ(@)です。

前回は、発達したJavaのエコシステムの恩恵にあずかり、実質数行のJavaコードで、twitterとelasticsearchを連携させることができました。
今回は、Spring XMLでCamelを記述することで、twitterとelasticsearchを連携させてみましょう。

f:id:acro-engineer:20131125064348p:plain

「Camelって何?」って人は、前回までの内容を確認しておきましょう。

CamelのDSLについて

これまで説明してきたCamelのサンプルは、Javaのコードでした。実はこれはCamelの一面でしかありません。Camelは様々なDSLで表現することができ、Java DSLはその中のひとつです。CamelのDSLページによると、今のところ、以下のようなDSLを使うことができます。

プロジェクトの特性などに合わせて好きなDSLを使えるのは、Camelの良いところです。今後、Clojureとか、JRubyのDSLが出てきたら面白いですね。
さて、Springを使っているプロジェクトは多いと思いますので、Spring XMLを使ってみましょう。今までJavaのコードで表現していたCamelをSpringのXMLで表現します。このDSLを使うと、Camelのoption設定等をハードコードせずにCamelを利用することができます。また、Springの資産(DI等)を利用することができます。今回の例では登場しませんが、Springのscopeを利用すれば、グローバルなオブジェクトを利用することも、スレッド単位で異なるオブジェクトを利用することもできます。

Spring XMLで実装してみましょう

mavenをインストールしてあれば、アプリケーション開発のひな型を生成できます。以下のコマンドを実行してください(groupId, artifactId, versionは作成するアプリケーションに合わせて読み換えてください)。

mvn archetype:generate -DgroupId=snuffkingit \
    -DartifactId=camel-example-spring \
    -Dversion=1.0.0-SNAPSHOT \
    -DarchetypeGroupId=org.apache.camel.archetypes \
    -DarchetypeArtifactId=camel-archetype-spring \
    -DarchetypeVersion=2.12.1

このシリーズの1回目で似たようなこと行いましたね。そこではarchetypeArtifactIdとして「camel-archetype-java」を指定していました。それを「camel-archetype-spring」に変更しただけです。

さて、mavenでひな型を生成ましたが、今回はJavaコードは生成されていません。代わりに生成される、以下の設定ファイルを変更します。

src/main/resources/META-INF/spring/camel-context.xml

前回作成したTwitterCrowler.javaをcamel-context.xmlに置き換えると以下の通りです。(twitter APIのconsumerKey, consumerSecret, accessToken, accessTokenSecretについてはマスクしています)

<?xml version="1.0" encoding="UTF-8"?>
<!-- Configures the Camel Context-->

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:camel="http://camel.apache.org/schema/spring"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">

  <camel:camelContext>
    <camel:route>
      <camel:from uri="twitter://search?type=direct&amp;keywords=camel&amp;consumerKey=xxx&amp;consumerSecret=xxx&amp;accessToken=xxx&amp;accessTokenSecret=xxx"/>
      <camel:marshal>
        <camel:json library="Jackson"/>
      </camel:marshal>
      <camel:to uri="elasticsearch://elasticsearch?operation=INDEX&amp;indexName=twitter&amp;indexType=tweet"/>
    </camel:route>
  </camel:camelContext>

</beans>

直観的にイメージは伝わると思いますが、詳細はSpring XMLのページを確認してください。また、前回のコンポーネント説明でcamel-twittercamel-elasticsearchの使い方ページを確認した際に気がつかれたかもしれませんが、Camelの公式サイトでは多くのページでJava DSLとSpring XMLの両方で解説してあります。

Spring XMLを使ったCamelの概要は以下の通りです。

  • camelContextタグに囲った中にCamelの処理を記述します。このタグの外側は通常のSpringのXMLとして使えます。
  • camelContextタグの中にrouteタグを書き、その中にrouteの構成を記述します。
  • fromやtoはタグ名に置き換えて、uriは属性として記述します。
  • 注意点としては、Javaでは文字列に「&」を使えますが、XMLの属性には「&」は使えません。「&」はエスケープが必要なため、「&」→「&amp;」に置き換えます。

また、JSONに変換する部分はCamelでのJSONを説明したページが参考になります。

前回同様、pom.xmlに以下のdependencyを入れることも必要です。

    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-twitter</artifactId>
      <version>${camel-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-elasticsearch</artifactId>
      <version>${camel-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-jackson</artifactId>
      <version>${camel-version}</version>
    </dependency>

ちなみに、archetypeArtifactIdにcamel-archetype-springを指定してgenerateしているため、生成されたpom.xmlのdependencyにはcamel-springが予め記述されています。

Spring XMLを実行してみましょう

では、これを実行してみましょう。
手っ取り早く実行するには、mavenで実行して下さい。

mvn camel:run

Javaコマンドから実行するには、以下のように実行してください。(Camelから環境変数を参照している訳ではないため、${CLASSPATH}は直接記述しても問題ありません)

java -cp ${CLASSPATH} org.apache.camel.spring.Main

デフォルトでは「${CLASSPATH}/META-INF/spring/*.xml」というファイルを探し、それを設定ファイルとして利用します。具体的にファイルを指定する場合は、引数「-fa」で指定します。(Camelから環境変数を参照している訳ではないため、${CLASSPATH}、${CONTEXT_PATH}は直接記述しても問題ありません)

java -cp ${CLASSPATH} org.apache.camel.spring.Main -fa ${CONTEXT_PATH}/camel-context.xml

実行結果をkibanaで見てみると以下のようになりました。
f:id:acro-engineer:20131211081825j:plain

と言う訳で、Spring XMLを使って、twitterとelasticsearchを連携させることができました。

まとめ

3回にわたりApache Camelの入り口について説明してきましたが、その強力さの一端は伝わったでしょうか。
このようなことができる背景には以下のような要因があります。

  • Java製のOSSは非常に多いこと(今回の例ではtwitter、elasticsearchへのアクセス)
  • OSS同士をCamelで連携できること
  • Spring XMLを使えばハードコーディングせずCamelを使ったアプリケーションを書けること

Camelのコンポーネント一覧を見ると、twitterやelasticsearchだけでなく、多数のOSSに関するコンポーネントが提供されていることが分かります。また、対応しているOSSは日々増えています。「Camelを積極的に使い、使いづらければCamelのコンポーネントを修正してコミュニティにフィードバック」という流れができると、より使いやすさが増すのではないかと思います。

このシリーズで説明したCamelの機能は本当に触りの部分です。Camelでは多くの処理を実現できます。分岐させたり、データ変換したり、特定部分をマルススレッド化したり、、、気になった方は、是非、Camelを利用して今使っているOSSにアクセスしてみてください。
Camelを使って楽をしましょう!
 

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

ラムダ禁止について本気出して考えてみた - 9つのパターンで見るStream API

こんにちは @ です。

今日のテーマは・・・ラピュタ禁止令!

バルス!

いや違う。ラムダ禁止令、です。


さて、なかなかの滑り出しですが、今日はただのラムダの紹介ではなく、禁止令に主眼を置いて語ります。

このエントリーは、Java Advent Calendar 2013の12/16分の投稿です。
http://www.adventar.org/calendars/145

前日は @sugarlife さんの JDK 8 新機能ダイジェスト (JDK 8 Features) です。
翌日は @setoazusa さんです。

ラムダ禁止令はあり得るのか?

勉強会やその懇親会などで、たびたび「ラムダ禁止令が出るのではないか」が話題に上ることがあります。
「そりゃ禁止する組織もあるでしょうね」というのがお決まりの答えなのですが、ただそれに従うだけでは面白くありませんし、要素技術の発展も滞ってしまうでしょう。そもそも新しい技術を食わず嫌いせず、必要に応じて利用できる文化にしたいですよね。

そんな事を考えながらも、ただStream APIについてはちょっと考えないといけないな、と思わされたのが、Java Advent Calendar 5日目の記事でした。

ToIntFunction<Map.Entry<Car, List<Sale>>> toSize = e -> e.getValue().size();
Optional<Car> mostBought;
mostBought = sales.collect(Collectors.groupingBy(Sale::getCar))
        .entrySet()
        .stream()
        .sorted(Comparator.comparingInt(toSize).reversed())
        .map(Map.Entry::getKey)
        .findFirst();
if(mostBought.isPresent()) {
    System.out.println(mostBought.get());
}

Java Advent Calendar 2013 5日目 - Java 8 Stream APIを学ぶ - kagamihogeの日記

ここで取り上げられている例は、特にStream APIに慣れていないうちは、パッと見ても何をしたいのかがよく分かりません。これを見て「可読性が低い」と捉える向きがいてもおかしくありません。

こういう事がエスカレートすると・・・

若者が複雑なStream APIを書く
 ↓
先輩がレビューができない or テスト不十分で見落としが起きる
 ↓
問題が発生する
 ↓
なぜなぜ分析でStream APIの可読性が槍玉にあがる
 ↓
Stream API全面禁止
 ↓
ついでにラムダも全面禁止

_人人人人人人人人人人_
> 突然のラムダ禁止 <
 ̄Y^Y^Y^Y^Y^Y^Y^Y^Y ̄

こんな事態が起きるかも知れない、と思いました。特にStream APIが何なのかをよく分かっていない人たちが、Stream APIラムダ式を混同して「ラムダ式禁止」と言いだすことは十分にあり得ます。

今日はこの辺りの話、つまり「業務でStream APIをどのぐらいまでなら使えるか」「使うための注意点は何か」ということを本気出して考えたいと思います。

ラムダ式禁止」だけは抵抗しよう

まず最初に言っておくと「ラムダ式」の禁止はあり得ません。言ってしまえばほら、ラムダ式はただの匿名クラス(無名クラス)のシンタックスシュガーみたいなものであって、何も難しいところはないからです。

簡単なおさらいとして、Comparatorの例を見てみましょう。

Comparator<Emp> comp1 = new Comparator<Emp>() {
    @Override
    public int compare(Emp emp1, Emp emp2) {
        return emp1.getSal() - emp2.getSal();
    }
};

Comparator<Emp> comp2 = (emp1, emp2) -> emp1.getSal() - emp2.getSal();

Comparator<Emp> comp3 = Comparator.comparingInt(Emp::getSal);

このcomp1、comp2、comp3はいずれも同じ処理をするComparatorです。comp3はやや見慣れない形だとしても、comp2までなら昔ながらのエンジニアにもまだ理解できる記述だと思います。

そのため、匿名クラス(無名クラス)自体を禁止している組織ならまだしも、そうでない組織でラムダ式自体が禁止されることは賛同できません。それこそ「食わず嫌い」の典型なので、もし皆さんの組織(の標準化グループ)がラムダ式禁止だと判断しそうなら、ぜひ抵抗してください。

あるいは、ラムダ式禁止を逆手に取って、Stream APIを匿名クラスまみれで書くことで、抵抗しても良いかも知れません(笑)
# 半年後、そこには元気に走り回るStream API禁止令の姿が!

ただし、単に禁止に反対するだけでなく、相手の懸念している「可読性が下がって困る」というところに応えるためにも、「ここまでなら使っても問題は起きないでしょう」という提案も同時に行なうべきだと思います。

その辺りが、今日のテーマになります。

Stream APIは、どこまで許されるのか?

さて、繰り返しになりますが、今日の本題はラムダ式ではなく、Stream APIです。
ラムダ式自体は簡単なので禁止にする理由がないと書きましたが、Stream APIの方は簡単ではなく、これをむやみやたらに使って炎上すると、Stream API自体を禁止とする組織が出てきてもおかしくありません。

では、どこまでならサクサクと読めるのか、考えてみましょう。

今回は、特に「あまりStream APIに慣れていない人」をターゲットにして書きますので、「自分、streamの数珠つなぎ200行ぐらい普通に読めるんで」のようなマサカリを装備する必要ありません。


なお、今回紹介する機能や処理の「禁止度」について、以下のようにレベルづけをします。

C : 業務で使っても全く問題ないレベル。
B : やや疑問を呈されるけど、積極的に使いたいレベル。
A : 業務では使わない方が良いレベル。
S : 積極的に禁止したいっていうか、使ったら書き直させるレベル。

特に禁止度Bあたりは、古豪のエンジニアから「読めないから使うな」と言われかねないところなので、組織のレベルを見ながら利用するようにすべきです。

1. 同じオブジェクトに対する連続した操作(禁止度:C)

まずStream APIを使った典型的な例が、同じオブジェクトに対してフィルタやソートなどの連続した操作を行なうというものです。

List<Emp> filterAndSort1(List<Emp> list) {
    return list.stream()
            .filter(emp -> emp.getSal() > 1000)
            .sorted((emp1, emp2) -> emp2.getSal() - emp1.getSal())
            .collect(Collectors.toList());
}

このぐらいであれば、「給料が1000より大きい」ものを抽出して「降順にソート」していることを読み取ることは容易です。問題ありませんね。

2. Comparatorのstaticメソッドを使ったソート(禁止度:B)

上に書いたソースを少し修正して、ソートのところをComparator.comparingIntにすることもできます。

List<Emp> filterAndSort2(List<Emp> list) {
    return list.stream()
            .filter(emp -> emp.getSal() > 1000)
            .sorted(Comparator.comparingInt(Emp::getSal).reversed())
            .collect(Collectors.toList());
}

IntelliJ IDEA 12.1.6を使っていると、Emp::getSalのところで「cyclic inference」というエラーを出してくるのですが、実際には問題なく動作します。IDEがエラーを出してくる辺りに、少し不吉なにおいを感じますね。そういう背景もあって禁止度を一つ上げてBにしました。


さらに、OpenJDK8 build111では、以下のような記述は正常にコンパイルされるのですが、

.sorted(Comparator.comparingInt(emp -> emp.sal))
.sorted(Comparator.comparingInt(emp -> emp.getSal()))
.sorted(Comparator.comparingInt(Emp::getSal).reversed())

以下のようにすると、コンパイルエラーになってしまいます。

.sorted(Comparator.comparingInt(emp -> emp.sal).reversed())
.sorted(Comparator.comparingInt(emp -> emp.getSal()).reversed())

どうもreversedをつけると、comparingIntに渡すToIntFunctionの型解決ができなくなってしまうようです。

開発途中とは言え、コンパイラーですら理解できなくなるのだから、人間にも理解しにくい構文なのでしょうか。
昔ながらのJavaエンジニアに配慮するなら、Comparator.comparing*などは使わず、最初の例に挙げたように、

.sorted((emp1, emp2) -> emp2.getSal() - emp1.getSal())

こう書くのが、一番分かりやすいでしょう。

3. mapとreduceを使った操作(禁止度:B)

続いて、特定の項目のみを抽出して集計するような処理を考えます。
下の例は、1000より大きい給料のうち、その2倍の値の平均を取るという、やや謎の処理です。

Double averageSal(List<Emp> list) {
    return list.stream()
            .filter(emp -> emp.getSal() > 1000)
            .mapToInt(emp -> emp.getSal() * 2)
            .average()
            .getAsDouble();
}

これも「map*」メソッドの役割さえきちんと理解してもらえれば、前からスラスラ読むことができます。

続いて、reduceを使う例も見てみましょう。下の例は、1000より大きい給料のうち、給与から1000を引いたものを合計するという、これまたやや謎の処理です。もちろんsum()でも合計はできますが、あえてreduceを使ってみました。

int someCalc(List<Emp> list) {
    return list.stream()
            .filter(emp -> emp.getSal() > 1000)
            .mapToInt(emp -> emp.getSal() - 1000)
            .reduce(0, (x, y) -> x + y);
}

これも「reduce」メソッドの役割さえきちんと理解すれば、スラスラ読んでもらえるものでしょう。

しかしながら、いわゆる関数型プログラミングに慣れていないエンジニアには、mapやreduceという処理には親しみがないため、これも読みにくいと言われかねません。

この辺りを標準的に使いたいのであれば、社内勉強会などを開催して、まずは「filter」「sorted」「map」「reduce」あたりを説明すると良いのではないかと思います。ここまで使えれば、だいたい勝てます(何に?)

4. groupingByやtoMapを使ったMapへの変換(禁止度:B)

Stream APIを使っていて、便利だなと感じるのがこのgroupingByによる集計処理。SQLのgroup byと同じようなものです。
これは型を変えてしまうだけに処理がやや分かりにくくなるのですが、やはりこれも積極的に使いたいですね。2例続けて見てみましょう。

Map<Dept, List<Emp>> groupByDept(List<Emp> list) {
    return list.stream()
            .sorted((emp1, emp2) -> emp2.sal - emp1.sal)
            .collect(Collectors.groupingBy(emp -> emp.dept));
}
Map<Integer, List<Emp>> rankedBySal(List<Emp> list) {
    return list.stream()
            .sorted((emp1, emp2) -> emp2.sal - emp1.sal)
            .collect(Collectors.groupingBy(emp -> {
                if (emp.sal > 4000) return 1;
                if (emp.sal > 2500) return 2;
                if (emp.sal > 1000) return 3;
                return 4;
            }));
}

前者は部署(dept)によるグループ化を行い、後者は給与水準によるグループ化を行なっています。groupingByがCollectionからMapに変換するための処理であることさえ把握していれば、このソースもサクサクと読むことができます。


また、groupingByでは、SQLと同じように値に集計結果を入れることもできます。

Map<Dept, Long> groupByDeptAndAve(List<Emp> list) {
    return list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept, Collectors.averagingInt(Emp::getSal)));
}

この例では部署ごとに平均給与を計算しています。とてもありそうな処理ですね。


しかしながら、SQLにおいても「集計関数がよく分からない勢」が一定数存在することが現在までに確認されています。
数学の勉強をしてから出直してこいと思うのですが 集計関数によく習熟したメンバでチームを作り、お互いにレビューをできるような体制を作れば、事故ることを減らせると思います。

5. groupingByからのentrySet.stream大作戦(禁止度:A)

続いて、部署(dept)を、人数の少ない順に並べるという処理です。

List<Dept> sortDept1(List<Emp> list) {
    return list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept, Collectors.counting()))
            .entrySet()
            .stream()
            .sorted(Comparator.comparingLong(Entry::getValue))
            .map(Entry::getKey)
            .collect(Collectors.toList());
    }
}

Mapのstream処理自体が分かりにくいという事情はあるにせよ、もともとListであったものをMapにして、さらにentrySetを取り出してstream処理を続けるというのは、型が分かりにくくなり、可読性が落ちると言わざるを得ません。というか、私のこの日本語の説明自体も、よく分かりません

しかもこの辺りから、IntelliJ IDEAの自動補完はほぼ効かなくなりますし、エラーもたくさん出るようになります(でも実際にはコンパイルが通って実行できるので、余計に厄介)

こうなってくると、禁止度はさらに上がってAクラスになるでしょう。


では、このソースのどこが可読性を落としているのでしょうか。ポイントは「entrySet().stream()」にあると私は思います。ここで、せめてcollectした後にローカル変数に代入すれば、まだしも読みやすくなるのではないでしょうか。

List<Dept> sortDept2(List<Emp> list) {
    Map<Dept, Long> map = list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept, Collectors.counting()));

    return map.entrySet()
            .stream()
            .sorted(Comparator.comparingLong(Entry::getValue))
            .map(Entry::getKey)
            .collect(Collectors.toList());
}

部署ごとの人数を一度Mapに代入してから、人数でソートしてkeyのみ(部署のみ)取り出しています。このようにすれば、禁止度はBクラスまで落とせるように思います。

「collectした後は、一時変数に代入する」という新しい鉄則を作るという案、どうでしょうかね。


それにしても、Mapの(entrySetの)Streamは分かりづらいですよね。実は2012年前半の時点ではMapStreamというクラスがあり、分かりやすくMapのstream処理を記述することができました。
なぜ消えたのか、背景や理由はよく把握していませんが、いずれにせよMapStreamが消えたせいで、いまのようなstream処理しかできなくなり、可読性が低くなっているというのが現状です。

6. ネストしたstream(禁止度:A)

Mapの集計を考えると、streamをネストさせたくなることがあります。これも2つ例を続けて掲載します。

Map<Dept, Long> groupByDeptAndFilter1(List<Emp> list) {
    return list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept, Collectors.collectingAndThen(Collectors.toList(),
                    emps -> emps.stream()
                            .filter(e -> e.sal > 1000)
                            .count())));
}
Map<Dept, Long> groupByDeptAndFilter2(List<Emp> list) {
    return list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept))
            .entrySet()
            .stream()
            .collect(Collectors.toMap(entry -> entry.getKey(),
                    entry -> entry.getValue()
                            .stream()
                            .filter(emp -> emp.sal > 1000)
                            .count()));
}

ソースコードの意味、分かりますか?

部署をキーにしたMapを作ったうえで、Mapの値のほうには給与が1000を超える社員の人数を入れています。フィルタ処理を入れたいがために、ただの集計処理が使えず、Streamを利用しています。
ここまで来るとかなり可読性が下がり、事故の原因にもなります。

※2013/12/20 修正 - 初出時のソースに誤りがあり、訂正しました。
Streamをネストせざるを得ない時は、たとえば昔ながらのfor文も併用するというのも、ひとつ読みやすくするための手段になると思います。

Map<Dept, Long> groupByDeptAndFilter3(List<Emp> list) {
    Map<Dept, List<Emp>> map = list.stream()
            .collect(Collectors.groupingBy(emp -> emp.dept));

    Map<Dept, Long> result = new HashMap<>();
    for (Entry<Dept, List<Emp>> entry : map.entrySet()) {
        long count = entry.getValue()
                .stream()
                .filter(emp -> emp.sal > 1000)
                .count();
        result.put(entry.getKey(), count);
    }

    return result;
}

ちょっと微妙? ただ、Java8時代には意外とこんなコードが出てくるかも知れません。


もしかすると、業務要件を考えて「あれ、事前にフィルタリングすれば良いだけじゃないの?」と思った方もいるかも知れません。

Map<Dept, Long> groupByDeptAndFilter4(List<Emp> list) {
    return list.stream()
            .filter(emp -> emp.sal > 1000)
            .collect(Collectors.groupingBy(emp -> emp.dept, Collectors.counting()));
}

事前にフィルタリングすれば、シンプルに記述することができます。しかしこうしてしまうと、処理の結果が少し変わってしまうのです。
「給与が1000未満の人しかいない部署」があった場合、groupByDeptAndFilter3までは「0人」として結果を取得できますが、groupByDeptAndFilter4ではそもそも結果のMapに当該の部署は表れません。
今回の例ではそれでも良いかも知れませんが、より複雑な業務処理になると、そのような差分が問題になることも多々あるでしょう。

このように、集計結果が分かりづらくなってくるところも、SQLとよく似ていますね。SQLのエキスパートが必要なことと同様に、Stream APIのエキスパートも必要だと思いますね。

7. streamの外に結果を残す(禁止度:A)

給与の平均と合計を同時に算出したい、という場合です。

void averageAndSum1(List<Emp> list) {
    final Map<String, Integer> dummy = new HashMap<>();
    dummy.put("RESULT", 0);
    double ave = list.stream()
            .mapToInt(emp -> {
                int sal = emp.getSal();
                dummy.put("RESULT", dummy.get("RESULT") + sal);
                return sal;
            })
            .average()
            .getAsDouble();
    System.out.println("ave=" + ave + ",sum=" + dummy.get("RESULT"));
}

ラムダ式がアクセスできる対象はfinalのみなので、ここではdummyというfinalのオブジェクトを定義して、そのオブジェクトに対して値の出し入れをしています。
特にparallelStreamで並列化した時には、確実に問題が起きることも踏まえると(たとえConcurrentHashMapにしていてもです)この記述は避けるべきでしょう。

ちなみに合計と平均の両方を計算したいだけであれば、summaryStatisticsメソッドを利用することで、代表的な集計結果をできます

void averageAndSum2(List<Emp> list) {
    IntSummaryStatistics statistics = list.stream()
            .mapToInt(Emp::getSal)
            .summaryStatistics();
    System.out.println("ave=" + statistics.getAverage() + ",sum=" + statistics.getSum());
}

IntSummaryStatisticsから集計結果を取得することができるのです。

このクラスから取得できないような独自の計算をしたい場合は、大人しくfor文で書くか、自分でCollectorを作るという方法がありそうです。自分でCollectorを作る方法は、また改めて紹介します。

8. stream中に元のオブジェクトを操作(禁止度:S)

やる人がいそうなので、念のため。

Double someOperation(List<Emp> list) {
    return list.stream()
            .filter(emp -> {
                list.remove(emp);
                return emp.getSal() > 1000;
            })
            .mapToInt(Emp::getSal)
            .average()
            .getAsDouble();
}

listのstream処理中に、listに対して要素の追加や削除をするというものです。この例のlist.remove(emp)は恣意的なものなので業務的な意味はありませんが、このように元のオブジェクトに対して操作をするという人は必ず出てくると思います。

元のListがArrayListなのかCopyOnWriteArrayListなのかで動きも変わりますし、ただでさえ動きの掴みづらいstream処理で、このような危険な実装は避けるべきでしょう。
どうしても元のオブジェクトに手を入れながら処理するのであれば(最適なstream処理が思いつかないなら)大人しくfor文で書くべきでしょう。

9. parallelStreamを使ってDB/Webアクセス(禁止度:S)

parallelStreamの効果を試すために、Webにアクセスをするようなサンプルもありますが、実案件ではそのような実装は避けましょう。
マルチスレッドでDB/Webにアクセスをしたいのであれば、スレッド数や、各スレッドの状態をきちんと管理・把握できる、ExecutorService使うべきです。

っていうか、parallelStreamを使ってDBやWebにアクセスなんかしたら絶対に問題が起きるし、むしろ運用中に発現して大問題になってStream API禁止令の引き金になるから、ホントやめて!(><)

まとめ

1. ラムダ式は禁止される理由がない!
2. filter/sort/map/reduce/collect/groupingByあたりの勉強会を行うべし!
3. Comparator.comparing* の使用は少し控えよう!
4. collectをしたら一度ローカル変数に入れよう!
5. Stream APIのエキスパートが近くにいたほうがいい!
6. streamからのstreamとか、ネストしたstreamとかは避けよう!
7. streamの親オブジェクトや、外の変数を触るんじゃない!
8. DBとかWebにアクセスするんじゃない!

まめ知識

「ラムダ」で画像検索したらラピュタのロボット兵が出てきたので、なにごとかと思って調べてみたら、ルパン三世「さらば愛しきルパンよ」に、ロボット兵が「ラムダ」という名前で登場していたようですね。
http://ja.wikipedia.org/wiki/%E3%81%95%E3%82%89%E3%81%B0%E6%84%9B%E3%81%97%E3%81%8D%E3%83%AB%E3%83%91%E3%83%B3%E3%82%88

そう、最初に敢えて滑ったのは、この伏線だったのですよ。敢えての、滑りだったんですよ。敢えての(←しつこい)

それでは皆さん、良いラムダライフを! バルス!

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Kafka+Storm+Elasticsearch+Kibanaでストリームデータ処理の可視化を行ってみた

こんにちは。kimukimuです。


AWS re:Invent 2013Amazon Kinesis が発表されるなど、
ストリームデータ処理に対するニーズの高まりを感じますね。
(Amazon Kinesis は、Stormとも連携できるようになっているようです)。

さて、先日、Storm 0.9.0 が正式リリースされたり、Apache Kafka 0.8.0 が正式リリースされたりしたので、
それらを連携して、ストリームデータの可視化を行うプロトタイプを作ってみました。

1. はじめに

まず、「ストリームデータ」とは、連続的に発生し続けるデータのことを指します。
システムが出力するログやセンサーが発生するデータ、SNSなどで常時発生するメッセージなどが該当します。
今回は、Apacheが出力するログを、ストリームデータとして収集・可視化することを行ってみます。

1-1.やりたいこと

実現したい内容は、以下のような内容です。

  • ログをリアルタイムに収集する。
  • ログの出力状況をリアルタイムにブラウザで表示させる。
  • スケールアウトを考慮して、分散処理を行う。

1-2.利用するもの

今回利用したプロダクトは以下の通りです。

尚、Stormのインストーラは下記の場所で公開&随時更新していますので、お使いください。
acromusashi/storm-installer · GitHub

2.どんな構成になるのか?

2-1.構成図

今回作った構成は下記のようになります。

2-2.各要素の役割

実際の処理の流れとしては下記のようになります。

  1. Kafkaが各サーバ上でログを収集する。
  2. StormのKafkaSpoutが、Kafkaに蓄積されたログを取得する。
  3. Storm内で、ElasticsearchBoltが、分散して、Elasticsearchにログを投入する。
  4. Kibana3がElasticsearchに投入されたログの統計情報を表示する。

ソースについてはElasticsearchBoltの抜粋部のみこの記事に記載しますが、
整理が完了したら後程公開しますのでお楽しみに。

ElasticsearchBoltは下記のように実装しています。
ここでのclientはElasticsearchのクライアントインスタンス、converterはTupleから投入するデータを生成するコンバータです。

/**
 * {@inheritDoc}
 */
@Override
public void execute(Tuple input)
{
    String documentId = null;
    String indexName = null;
    String typeName = null;
    String document = null;

    try
    {
        documentId = this.converter.convertToId(input);
        indexName = this.converter.convertToIndex(input);
        typeName = this.converter.convertToType(input);
        document = this.converter.convertToDocument(input);

        IndexResponse response = this.client.prepareIndex(indexName, typeName, documentId).setSource(
                document).setPercolate(this.percolate).execute().actionGet();

        if (logger.isDebugEnabled() == true)
        {
            String logFormat = "Document Indexed. Id={0}, Type={1}, Index={2}, Version={3}";
            logger.debug(MessageFormat.format(logFormat, response.getId(), typeName, indexName,
                    response.getVersion()));
        }
    }
    catch (Exception ex)
    {
        String logFormat = "Document Index failed. Dispose Tuple. Id={0}, Type={1}, Index={2}";
        logger.warn(MessageFormat.format(logFormat, documentId, typeName, indexName), ex);
    }

    getCollector().ack(input);
}

3.実際に動かしてみる

では、実際にログを流して結果をKibana 3で確認してみます。

すると・・・?
下記のような形で簡単な統計情報を表示することができました。
HTTPリクエストのレスポンスタイム平均値、リクエスト回数、アクセス元ホスト、ステータスコードといった
基本的な統計が表示できることが確認できました。
実際の画面上では、随時グラフが更新されていくので、どのような動作になっているのかが、リアルタイムにわかります。

4.何が良いのか?

今回のプロトタイプはつまりは
「ストリームデータを収集し、Stormで処理/変換を行ってElasticsearchに投入、Kibana 3で統計情報を可視化」
のプロトタイプ・・・という形になります。

このプロトタイプを応用することで、以下のようなことが実現できると考えています。

  • (異常検知)ログやイベントをリアルタイムに収集し、サーバ動作やユーザアクセスの異常検知などを行い、可視化する。
  • (M2M)センサーデータを受信し、センサーデータの統計処理を行い、可視化する。
  • (評判分析)SNSのメッセージ内容を解析し、その内容をクラスタリングし、可視化する。

色々夢は広がりますが、とりあえず今回はこのあたりで。


Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

twitterとelasticsearchを簡単に連携させる(Apache Camel入門 その2 コンポーネントについて)

こんにちは、もっと多くのJava技術者がCamelで楽できるはずと信じているツカノ(@)です。

よくJavaで書くと冗長だと言われます。確かにそうだし、欠点もいろいろありますが、誇れる点も多くあります。そのひとつが、発達したJavaのエコシステムです。Javaで書かれたOSSは非常に多く、GitHub上に登録されている言語で最も多いのはJavaです。
自分に必要なOSSを使いこなせれば、実はそんなに書かなくて良いことが多いはずです。そのサポートを行ってくれるのがCamelです。

f:id:acro-engineer:20131125064348p:plain

ちなみに、今回はJava Advent Calendar 2013の10日目の投稿でもあります。9日目は櫻庭さんによる、以下の投稿でした。

11日目は高橋徹さんによる、以下の投稿です。

さて、前回はいかがでしたか? ぶっちゃけ、「Hello, World!」程度じゃCamelのパワーは伝わりきらなかったと思います。前回は導入ということで、雰囲気を伝えることに注力しましたが、今回はもっと面白いサンプルを使って解説します。この記事を読めば、「自分の仕事で使えないか調べたくなる」くらいCamelのパワーが伝わるのではないかと思います。

コンポーネントの話

それでは、本題に入っていきましょう。今回はCamelのコンポーネント周りについて、紹介します。
Camelのコンポーネントを使いこなすことによって、発達したJavaエコシステムの恩恵を受けることができます。
そのあたりを詳しく説明します。

今回は、次のようなプログラムを作りながら説明します。

  • twitterからキーワード「camel」で検索する
  • 検索結果をJSONに変換する
  • elasticsearchに書き込む

この目的なら通常はTwitter River Pluginを使うのかもしれませんが^^ ここでは、Camelを使います。

コンポーネントの調べ方

twitterやelasticsearchにアクセスする部分は、世の中的にはtwitter4jやelasticsearchのclientがあるはずです。こういう「OSSにアクセスする部分」をCamelではコンポーネントとして提供しています。

では、どのようなコンポーネントがあるのでしょうか? それは、Camelのコンポーネント一覧を見ると分かります。ここでは、Camelで提供しているコンポーネントがアルファベット順に並んでいます。
これを見て行くと、、、ありますね。「camel-elasticsearch」や「camel-twitter」があります。

OSSの名称の部分は、個々のコンポーネントの使い方が書かれたページにリンクしています。
ひとまず、camel-twitterの使い方を見てみましょう。

まずは、Mavenのdependencyにコンポーネントを登録するよう、書いてあります。

<dependency>
    <groupId>org.apache.camel</groupId>
    <artifactId>camel-twitter</artifactId>
    <version>${camel-version}</version>
</dependency>

次にURI formatの説明として、以下のように書いてあります。

twitter://endpoint[?options]

この文脈で「URIって何?」と思うかもしれません。説明します。
前回のサンプルでfromやtoの引数に指定した文字列を、Camelでは「Endpoint URI」と言います。これは以下のようなフォーマットです。
f:id:acro-engineer:20131210080622p:plain
※図はCamel in Actionから引用しました。

SchemeとはCamelのコンポーネントを表しています。今回のサンプルでは、twitterやelasticsearchのコンポーネントにあたります。
Context Pathコンポーネントによっても違いますが、概ねどの機能を呼び出すかを表しています。
Optionsではそのコンポーネントを利用するための詳細なパラメータを指定します。

このように考えて、camel-twitterの使い方を読んでいくと、今回のサンプルでは以下のようなEndpoint URIを指定すれば良いことが分かります。(twitter APIのconsumerKey, consumerSecret, accessToken, accessTokenSecretについてはマスクしています)

twitter://search?type=direct&keywords=camel&consumerKey=xxx&consumerSecret=xxx&accessToken=xxx&accessTokenSecret=xxx


また、camel-elasticsearchの使い方ページを読んでいくと、以下のようなEndpoint URIを指定すれば良いことが分かります。
(clusterName=elasticsearch, indexName=twitter, indexType=tweetとしています)

elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet

camel-twitter以外もまとめて書くと、以下のdependencyをpom.xmlに入れます。

    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-twitter</artifactId>
      <version>${camel-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-elasticsearch</artifactId>
      <version>${camel-version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.camel</groupId>
      <artifactId>camel-jackson</artifactId>
      <version>${camel-version}</version>
    </dependency>

実装してみましょう

さて、ここまで分かったところで、この処理をCamelで実現するコードを書いてみましょう。

  • TwitterCrowler.java
package snuffkingit.camel.example;

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.model.dataformat.JsonLibrary;

public class TwitterCrowler {

    public static void main(String... args) throws Exception {
        CamelContext context = new DefaultCamelContext();
        context.addRoutes(new RouteBuilder() {
            @Override
            public void configure() {
                from("twitter://search?type=direct&keywords=camel&consumerKey=xxx&consumerSecret=xxx&accessToken=xxx&accessTokenSecret=xxx") // (1)
                .marshal().json(JsonLibrary.Jackson) // (2)
                .to("elasticsearch://elasticsearch?operation=INDEX&indexName=twitter&indexType=tweet"); // (3)
            }
        });
        context.start();
        Thread.sleep(10 * 1000);
        context.stop();
    }
}

前回解説したあたりは飛ばして、簡単に解説します。
(1) twitterからキーワード「camel」で検索します。
(2) twitterからの検索結果をJSON形式に変換します。
(3) JSON形式の文字列をelasticsearchに書き込みます。

これを動かしてみると、twitterからキーワード「camel」で検索し、JSON形式に変換し、elasticsearchに書き込むことができます。
(今回の本題ではないため、twitter APIの登録や、elasticsearchのセットアップに関する説明は省略しています。また、1回検索して終了させていますが、常駐プロセスにして定期実行、というのもできます)

まとめ

今回のコードを振り返ってみると、以下のような特長があります。

  • どのようにOSSにアクセスするのか分かっていれば、個々のOSSのAPIを直接使わなくて良い
  • OSSへのアクセス方法はEndpoint URIとして抽象化されており、細かなパラメータは違えどフォーマットは共通
  • Endpoint URIで記載することアクセスできるため、実装量が削減される

そして、Camelのコンポーネント一覧を見ると分かるように、Camelはかなり数のOSSに対応しています。この資産を使えば、OSSをつないだ処理を簡単に書くことができるようになります。

自分の仕事で使ってるOSSのコンポーネントが用意されているか、調べたくなったのではないでしょうか?
ぜひ、Camelを使ってもっと楽をしましょう!
 

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

RxJavaを使ってCallback Hellから脱出する( Java8 ラムダ編 )

id:KenichiroMurata(@ )です。

f:id:acro-engineer:20131206014754j:plain

皆さん、RxJava 使っていますか?

前回はVert.xのmod-rxvertxを使い、RxJavaによってCallback Hellから脱出する方法を説明しました。本記事は、せっかくなのでJava8のラムダを使ったらさらにどのようになるのか?を試してみた補足記事です。

目次は以下の通りです。

  1. 環境
  2. Java8 ラムダによってCallback Hellはいかに解決されたか
  3. まとめ

1. 環境

本記事で試した環境について説明します。

  • Vert.x 2.1M1
  • mod-rxvertx 1.0.0-beta2
  • RxJava 0.15.1
  • Java 1.8.0-ea(build 1.8.0-ea-b118)
  • IntelliJ IDEA 13 CE

Java8を試すために、IntelliJ IDEA 13 CEを使いました。Java8をインストールし、IDEAを使う場合は以下のように初期化してください。

ken@vertx-test $ ./gradlew test
ken@vertx-test $ ./gradlew copyModJson
ken@vertx-test $ ./gradlew idea

2. Java8 ラムダによってCallback Hellはいかに解決されたか

まずは、通常のVert.xで書いた場合のコードのJava8 ラムダ版は次のようになります。例を分かりやすくするために、JSONメッセージの生成部などは外部メソッド化しました。

J8BusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    final EventBus eventBus = vertx.eventBus();
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap("muraken720.testexample");

    JsonObject req1 = createAddRequest("@muraken720");

    // リクエスト1
    eventBus.send("muraken720.vertx.mod.testexample", req1,
        (Handler<Message<JsonObject>>) reply -> {
          assertEquals("ok", reply.body().getString("status"));
          assertEquals("@muraken720", map.get("name"));

          JsonObject req2 = createAddRequest("Kenichiro");

          // リクエスト2
          eventBus.send("muraken720.vertx.mod.testexample", req2,
              (Handler<Message<JsonObject>>) reply1 -> {
                assertEquals("ok", reply.body().getString("status"));
                assertEquals("Kenichiro", map.get("name"));

                JsonObject req3 = createAddRequest("Murata");

                // リクエスト3
                eventBus.send("muraken720.vertx.mod.testexample", req3,
                    (Handler<Message<JsonObject>>) reply2 -> {
                      assertEquals("ok", reply.body().getString("status"));
                      assertEquals("Murata", map.get("name"));

                      testComplete();
                    });
              });
        });
  }

ラムダ式により、無名クラスの部分がスッキリしていますね。

それではこれをRxJavaを使ってコードを書くと次のようになります。

RxJ8BusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap("muraken720.testexample");

    JsonObject req1 = createAddRequest("@muraken720");

    // リクエスト1
    Observable<RxMessage<JsonObject>> obs1 = rxEventBus.send("muraken720.vertx.mod.testexample", req1);

    Observable<RxMessage<JsonObject>> obs2 = obs1.flatMap(reply -> {
      assertEquals("ok", reply.body().getString("status"));
      assertEquals("@muraken720", map.get("name"));

      JsonObject req2 = createAddRequest("Kenichiro");

      return rxEventBus.send("muraken720.vertx.mod.testexample", req2);
    });

    // リクエスト2
    Observable<RxMessage<JsonObject>> obs3 = obs2.flatMap(reply -> {
      assertEquals("ok", reply.body().getString("status"));
      assertEquals("Kenichiro", map.get("name"));

      JsonObject req3 = createAddRequest("Murata");

      return rxEventBus.send("muraken720.vertx.mod.testexample", req3);
    });

    // リクエスト3
    obs3.subscribe(reply -> {
      assertEquals("ok", reply.body().getString("status"));
      assertEquals("Murata", map.get("name"));

      testComplete();
    });
  }

Java8のラムダ式を使うことにより、RxJavaのAction1、Function1といった無味乾燥なクラス名も登場しなくなり、非常にスッキリしました。コード自体も縦には長くなっていますが、ネストが深くならないことがよく分かると思います。

3 まとめ

いかかでしょうか?Java8のラムダを使うことで、Callback Hellから脱出するだけでなく、その記述方法のシンプルになることがご覧頂けたと思います。Java8の正式リリースは3月?だったと思いますが、そのJava8に向けて、RxJavaもVert.xもさらに進化していくと思います。今後どのようになるのか?を想像しながら、あれこれ試してみると面白いと思います。

See also

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ