読者です 読者をやめる 読者になる 読者になる

Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Kafka+Storm+Elasticsearch+Kibanaでストリームデータ処理の可視化を行ってみた

Storm Kafka Elasticsearch Java

こんにちは。kimukimuです。


AWS re:Invent 2013Amazon Kinesis が発表されるなど、
ストリームデータ処理に対するニーズの高まりを感じますね。
(Amazon Kinesis は、Stormとも連携できるようになっているようです)。

さて、先日、Storm 0.9.0 が正式リリースされたり、Apache Kafka 0.8.0 が正式リリースされたりしたので、
それらを連携して、ストリームデータの可視化を行うプロトタイプを作ってみました。

1. はじめに

まず、「ストリームデータ」とは、連続的に発生し続けるデータのことを指します。
システムが出力するログやセンサーが発生するデータ、SNSなどで常時発生するメッセージなどが該当します。
今回は、Apacheが出力するログを、ストリームデータとして収集・可視化することを行ってみます。

1-1.やりたいこと

実現したい内容は、以下のような内容です。

  • ログをリアルタイムに収集する。
  • ログの出力状況をリアルタイムにブラウザで表示させる。
  • スケールアウトを考慮して、分散処理を行う。

1-2.利用するもの

今回利用したプロダクトは以下の通りです。

尚、Stormのインストーラは下記の場所で公開&随時更新していますので、お使いください。
acromusashi/storm-installer · GitHub

2.どんな構成になるのか?

2-1.構成図

今回作った構成は下記のようになります。

2-2.各要素の役割

実際の処理の流れとしては下記のようになります。

  1. Kafkaが各サーバ上でログを収集する。
  2. StormのKafkaSpoutが、Kafkaに蓄積されたログを取得する。
  3. Storm内で、ElasticsearchBoltが、分散して、Elasticsearchにログを投入する。
  4. Kibana3がElasticsearchに投入されたログの統計情報を表示する。

ソースについてはElasticsearchBoltの抜粋部のみこの記事に記載しますが、
整理が完了したら後程公開しますのでお楽しみに。

ElasticsearchBoltは下記のように実装しています。
ここでのclientはElasticsearchのクライアントインスタンス、converterはTupleから投入するデータを生成するコンバータです。

/**
 * {@inheritDoc}
 */
@Override
public void execute(Tuple input)
{
    String documentId = null;
    String indexName = null;
    String typeName = null;
    String document = null;

    try
    {
        documentId = this.converter.convertToId(input);
        indexName = this.converter.convertToIndex(input);
        typeName = this.converter.convertToType(input);
        document = this.converter.convertToDocument(input);

        IndexResponse response = this.client.prepareIndex(indexName, typeName, documentId).setSource(
                document).setPercolate(this.percolate).execute().actionGet();

        if (logger.isDebugEnabled() == true)
        {
            String logFormat = "Document Indexed. Id={0}, Type={1}, Index={2}, Version={3}";
            logger.debug(MessageFormat.format(logFormat, response.getId(), typeName, indexName,
                    response.getVersion()));
        }
    }
    catch (Exception ex)
    {
        String logFormat = "Document Index failed. Dispose Tuple. Id={0}, Type={1}, Index={2}";
        logger.warn(MessageFormat.format(logFormat, documentId, typeName, indexName), ex);
    }

    getCollector().ack(input);
}

3.実際に動かしてみる

では、実際にログを流して結果をKibana 3で確認してみます。

すると・・・?
下記のような形で簡単な統計情報を表示することができました。
HTTPリクエストのレスポンスタイム平均値、リクエスト回数、アクセス元ホスト、ステータスコードといった
基本的な統計が表示できることが確認できました。
実際の画面上では、随時グラフが更新されていくので、どのような動作になっているのかが、リアルタイムにわかります。

4.何が良いのか?

今回のプロトタイプはつまりは
「ストリームデータを収集し、Stormで処理/変換を行ってElasticsearchに投入、Kibana 3で統計情報を可視化」
のプロトタイプ・・・という形になります。

このプロトタイプを応用することで、以下のようなことが実現できると考えています。

  • (異常検知)ログやイベントをリアルタイムに収集し、サーバ動作やユーザアクセスの異常検知などを行い、可視化する。
  • (M2M)センサーデータを受信し、センサーデータの統計処理を行い、可視化する。
  • (評判分析)SNSのメッセージ内容を解析し、その内容をクラスタリングし、可視化する。

色々夢は広がりますが、とりあえず今回はこのあたりで。


Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ