AWS re:Invent 2013 で Amazon Kinesis が発表されるなど、
ストリームデータ処理に対するニーズの高まりを感じますね。
(Amazon Kinesis は、Stormとも連携できるようになっているようです)。
さて、先日、Storm 0.9.0 が正式リリースされたり、Apache Kafka 0.8.0 が正式リリースされたりしたので、
それらを連携して、ストリームデータの可視化を行うプロトタイプを作ってみました。
1. はじめに
まず、「ストリームデータ」とは、連続的に発生し続けるデータのことを指します。
システムが出力するログやセンサーが発生するデータ、SNSなどで常時発生するメッセージなどが該当します。
今回は、Apacheが出力するログを、ストリームデータとして収集・可視化することを行ってみます。
1-1.やりたいこと
実現したい内容は、以下のような内容です。
- ログをリアルタイムに収集する。
- ログの出力状況をリアルタイムにブラウザで表示させる。
- スケールアウトを考慮して、分散処理を行う。
1-2.利用するもの
今回利用したプロダクトは以下の通りです。
尚、Stormのインストーラは下記の場所で公開&随時更新していますので、お使いください。
acromusashi/storm-installer · GitHub
2.どんな構成になるのか?
2-2.各要素の役割
実際の処理の流れとしては下記のようになります。
- Kafkaが各サーバ上でログを収集する。
- StormのKafkaSpoutが、Kafkaに蓄積されたログを取得する。
- Storm内で、ElasticsearchBoltが、分散して、Elasticsearchにログを投入する。
- Kibana3がElasticsearchに投入されたログの統計情報を表示する。
ソースについてはElasticsearchBoltの抜粋部のみこの記事に記載しますが、
整理が完了したら後程公開しますのでお楽しみに。
ElasticsearchBoltは下記のように実装しています。
ここでのclientはElasticsearchのクライアントインスタンス、converterはTupleから投入するデータを生成するコンバータです。
/** * {@inheritDoc} */ @Override public void execute(Tuple input) { String documentId = null; String indexName = null; String typeName = null; String document = null; try { documentId = this.converter.convertToId(input); indexName = this.converter.convertToIndex(input); typeName = this.converter.convertToType(input); document = this.converter.convertToDocument(input); IndexResponse response = this.client.prepareIndex(indexName, typeName, documentId).setSource( document).setPercolate(this.percolate).execute().actionGet(); if (logger.isDebugEnabled() == true) { String logFormat = "Document Indexed. Id={0}, Type={1}, Index={2}, Version={3}"; logger.debug(MessageFormat.format(logFormat, response.getId(), typeName, indexName, response.getVersion())); } } catch (Exception ex) { String logFormat = "Document Index failed. Dispose Tuple. Id={0}, Type={1}, Index={2}"; logger.warn(MessageFormat.format(logFormat, documentId, typeName, indexName), ex); } getCollector().ack(input); }
3.実際に動かしてみる
では、実際にログを流して結果をKibana 3で確認してみます。
すると・・・?
下記のような形で簡単な統計情報を表示することができました。
HTTPリクエストのレスポンスタイム平均値、リクエスト回数、アクセス元ホスト、ステータスコードといった
基本的な統計が表示できることが確認できました。
実際の画面上では、随時グラフが更新されていくので、どのような動作になっているのかが、リアルタイムにわかります。
4.何が良いのか?
今回のプロトタイプはつまりは
「ストリームデータを収集し、Stormで処理/変換を行ってElasticsearchに投入、Kibana 3で統計情報を可視化」
のプロトタイプ・・・という形になります。
このプロトタイプを応用することで、以下のようなことが実現できると考えています。
- (異常検知)ログやイベントをリアルタイムに収集し、サーバ動作やユーザアクセスの異常検知などを行い、可視化する。
- (M2M)センサーデータを受信し、センサーデータの統計処理を行い、可視化する。
- (評判分析)SNSのメッセージ内容を解析し、その内容をクラスタリングし、可視化する。
色々夢は広がりますが、とりあえず今回はこのあたりで。
Acroquest Technologyでは、キャリア採用を行っています。
- 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
- 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
- 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
- OSSの開発に携わりたい。
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
キャリア採用ページ