Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Apache Storm 1.0.0の機能を使ってみる

お久しぶりです@ です。

このブログへの登場はかなり久しぶりです。昨年10月にミャンマーから日本に帰ってきて、今は、IoTやら可視化などに関する仕事をしています

さて、TwitterよりStormが公開されて以降、分散ストリーム処理フレームワークも、FlinkSpark StreamingSamzaBeamGearpumpSensorBee等、さまざまなOSSプロダクトが公開されました。

世はまさに「大ストリーム時代」!?(ワンピース風)

そのような中、4/12にApache Storm から正式メジャーバージョンとなる、1.0.0がリリースされました。このタイミングでどのような機能が盛り込まれるのか、興味を持っていましたが、これまでの課題を解消しつつ、他プロダクトよりも一歩先に行くような内容もリリースされました。

大きな変更点は12個

以下の公式サイトでも公表されていますが、メインとなる変更点は12個のようです。
Storm 1.0.0 released

No タイトル 内容
1 Improved Performance 16倍の処理速度向上と、60%のレイテンシの減少に成功しました。
2 Pacemaker - Heartbeat Server Stormがスケールアップするにつれて生じていたZookeeperのパフォーマンスボトルネックの解消のため、インメモリのKey-Valueストアとして機能するオプションのStormデーモンPacemakerが追加されました。
3 Distributed Cache API Topology毎に共有できるデータストア空間を用意し、Blobで共有データが保持、参照できるようになりました。
4 HA Nimbus Distributed Cache APIの機能を利用することで、高可用性を備えたNimbusが実現可能になりました。
5 Native Streaming Window API Storm Native な Window APIが用意され、いわゆるCEP処理を実装しやすくなりました。
6 State Management - Stateful Bolts with Automatic Checkpointing 自動チェック機構を備えたステートフルなBoltの利用により、状態管理が可能になりました。
7 Automatic Backpressure 上限値、下限値の設定による自動バックプレッシャーが可能になりました。
8 Resource Aware Scheduler Topology毎のリソース(メモリ/CPU)を考慮したタスクスケジューラが実現可能になりました。
9 Dynamic Log Levels Storm UIから動的に出力ログレベルの変更が可能になりました。
10 Tuple Sampling and Debugging Storm UI上でTupleのサンプリングとデバッグが可能になりました。
11 Distributed Log Search Worker毎に分散されてしまうログの検索がStorm UI上で実施可能になりました。
12 Dynamic Worker Profiling WorkerプロセスのプロファイリングがStorm UI上で可能になりました。


今回は上記の変更点の中から、特に面白いだろうと思われる以下の3点を調べてみました。他のものは、なんとなくタイトルから想像できますよね。

  1. Distributed Cache API
  2. Native Streaming Window API
  3. Automatic Backpressure

Distributed Cache APIを使ってみる

前バージョンでは、デプロイしたTopology上で何かファイルのデータなどを使いたい場合、Topologyと一緒にデプロイする必要がありました。そのため、大きなデータをTopology起動後に利用したい場合は、デプロイそのものに時間がかかることがありました。
また、各サーバに共有データを置いたり、データ共有のためにKVSなどのStormとは別のプロダクトを利用するのは、実現したいことに対して重く感じます。
しかし今回のバージョンアップで、Topology上で使いたいファイルを、Stormが持っているデータストアに保持し、Topologyからそのデータを参照することが可能になりました。共有データ保存場所が存在し、そこにデータを配置することでデプロイ時間の削減を可能にした機能です。共有データのサイズが大きければ大きいほど、その恩恵を受けることができます。本家サイトでは「位置情報」や「辞書データ」を保持するとよい、と言われています。

Distributed Cache APIの仕組み

Stormのサイトに素敵な解説図があるので、転載させてもらいます。BlobStoreというインタフェースがあり、このインタフェースを実装したLocalFsBlobStoreとHdfsBlobStoreが提供されています。どちらのStore実装も処理の流れはほぼ同じです。仕組みとしては、Supervisor起動時にBlobStoreのMapを取得し、その後MapにしたがってMap情報(共有データ)を取得する流れのようです。

[LocalFsBlobStore]
f:id:acro-engineer:20160425224701p:plain

[HdfsBlobStore]
f:id:acro-engineer:20160425224721p:plain

使ってみる

早速使ってみます。

  1. 共有データの登録
  2. Topologyの起動

が手順になります。確認のため、Topologyは2つ動作させます。

共有データの登録

README.markdownの登録をします。

# ./bin/storm blobstore create --file README.markdown --acl o::rwa --replication-factor 4 key1

共有用のデータは「storm.local.dir/storm-local/blobs/」に配置されていました。

Topologyの起動

Topologyを2つ起動し、どちらも登録したREADME.markdownをダウンロードすることをログから確認したいと思います。本来はTopologyの中で利用されているところを確認したいのですが、サンプルに適当なものがなかったため、ひとまず起動時に登録した共有データが読み込まれることを確認したいと思います。

# ./bin/storm jar examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.clj.word_count test_topo -c topology.blobstore.map='{"key1":{"localname":"blob_file", "uncompress":"false"}}'

test_repoというTopologyを作成し、key1というキーに対して登録したBlobファイルの中身を解凍オプションなしで参照、実行しています。Blobファイルの読み込みに成功すると、以下のようなログが確認できるはずです。

2016-04-23 14:48:05.782 o.a.s.d.supervisor [INFO] Downloading code for storm id test_topo-4-1461390482
2016-04-23 14:48:06.279 o.a.s.d.supervisor [INFO] Successfully downloaded blob resources for storm-id test_topo-4-1461390482
2016-04-23 14:48:06.280 o.a.s.d.supervisor [INFO] Finished downloading code for storm id test_topo-4-1461390482
:
2016-04-23 14:48:06.285 o.a.s.d.supervisor [INFO] Creating symlinks for worker-id: 6d23e0f9-9aa1-43c6-a475-773b0537bdfb storm-id: test_topo-4-1461390482 to its port artifacts directory
2016-04-23 14:48:06.286 o.a.s.d.supervisor [INFO] Creating symlinks for worker-id: 6d23e0f9-9aa1-43c6-a475-773b0537bdfb storm-id: test_topo-4-1461390482 for files(2): ("resources" "blob_file")

同じ操作で別名のTopologyを作成してみてください。上記と同じデータダウンロードが正常に完了するログが確認できるはずです。これでDistributed Cache APIを試すことができました。

Native Streaming Window APIを使ってみる

Stormのネイティブな機能として、スライディングウィンドウが追加されました。どこまでの内容までがStormネイティブとして対応しているのか、サンプルをベースに確認してみたいと思います。まずはメインパートであるSlidingWindowTopology.javaのソースを確認してみます。

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("integer", new RandomIntegerSpout(), 1);
        builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
                .shuffleGrouping("integer");
        builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1)
                .shuffleGrouping("slidingsum");
        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Utils.sleep(40000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }

これだけ見ても、以下の3つのBoltが存在します。

  1. SlidingWindowSumBolt
  2. TumblingWindowAvgBolt
  3. PrinterBolt

これらが何をしているのか、またどのように動くのかを確認したいと思います。

[SlidingWindowSumBolt]
とても単純に、受信したTupleの中身を加算していることがわかります。一応ウィンドウから外れたTupleの値は減算するようにも記述されているので、スライディングウィンドウの条件を満たしていることも確認できます。

    @Override
    public void execute(TupleWindow inputWindow) {
            /*
             * The inputWindow gives a view of
             * (a) all the events in the window
             * (b) events that expired since last activation of the window
             * (c) events that newly arrived since last activation of the window
             */
        List<Tuple> tuplesInWindow = inputWindow.get();
        List<Tuple> newTuples = inputWindow.getNew();
        List<Tuple> expiredTuples = inputWindow.getExpired();

        LOG.debug("Events in current window: " + tuplesInWindow.size());
            /*
             * Instead of iterating over all the tuples in the window to compute
             * the sum, the values for the new events are added and old events are
             * subtracted. Similar optimizations might be possible in other
             * windowing computations.
             */
        for (Tuple tuple : newTuples) {
            sum += (int) tuple.getValue(0);
        }
        for (Tuple tuple : expiredTuples) {
            sum -= (int) tuple.getValue(0);
        }
        collector.emit(new Values(sum));
    }

[TumblingWindowAvgBolt]
設定したWindowのサイズで合計値を除算している、シンプルなつくりでした。SlidingWindowTopologyに内包されていますね。「いくつ溜まったら平均値を計算する」という引数には「3」が設定されています。

        @Override
        public void execute(TupleWindow inputWindow) {
            int sum = 0;
            List<Tuple> tuplesInWindow = inputWindow.get();
            LOG.debug("Events in current window: " + tuplesInWindow.size());
            if (tuplesInWindow.size() > 0) {
                /*
                * Since this is a tumbling window calculation,
                * we use all the tuples in the window to compute the avg.
                */
                for (Tuple tuple : tuplesInWindow) {
                    sum += (int) tuple.getValue(0);
                }
                collector.emit(new Values(sum / tuplesInWindow.size()));
            }
        }

[PrinterBolt]
驚くほどシンプルですね。出力するだけ。

  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    System.out.println(tuple);
  }

まとめると、以下の通りに動くと予想できます。

  1. ランダムに0~999の整数を、合計用のBoltに送付する。
  2. 合計用Boltはデータを受信時、メモリに保持しているSum値に加算していく。
  3. 受信したTuple数が10になったところで、平均計算用Boltに合計値を送付する。
  4. 1~3を、平均計算用Boltの保持Tuple数が3になったら、平均値を計算する。
  5. 1~4を40秒間繰り返す。


これらを基に、動かした際のログを見てみましょう。ソースコードの通り、起動後に40秒で終了するようなので、以下のコマンドを実行してしばらく待ってみます。

# bin/storm jar examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.SlidingWindowTopology

[合計用Boltに対する出力ログ]
以下のような感じで合計用Boltにはログが出力されていました。

17562 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Processing received message FOR 4 TUPLE: source: integer:2, stream: default, id: {4017617819859316672=-3880300761259985782}, [899, 1461542140369, 1]
17562 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: default, id: {4017617819859316672=-3880300761259985782}, [899, 1461542140369, 1] TASK: 4 DELTA: 
17674 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Processing received message FOR 4 TUPLE: source: integer:2, stream: default, id: {-2465951973599725012=3371764614267379312}, [888, 1461542140475, 2]
17674 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: default, id: {-2465951973599725012=3371764614267379312}, [888, 1461542140475, 2] TASK: 4 DELTA: 

で、10個たまったところで平均計算用Boltに送付しています。

18516 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [6053]
18516 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {-1356407922762824927=-4912942846543592390, 4017617819859316672=-8950777703384980140, -7679610850762262585=5600559632140381686, -217168626838496871=6670643717321357413, -8433729321932816312=-1512990481045386819, -695350376461229364=-6915299522591467528, -8604773776820158944=2085240823323939478, 4818452273885227082=1055563511177261421, -4383830359476279213=-2430226558792731842, -2465951973599725012=-3111554498250395772}, [6053]]
:
:
19527 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [11182]
19527 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {4017617819859316672=1956648145851480265, -7679610850762262585=-8990022321548325348, -217168626838496871=8356349476653175499, 6556174365450512594=-2956830898901769282, 4973296703617984132=630324356173502412, -8433729321932816312=7530781138220324522, 8041484834072108391=-1100584463729972475, -695350376461229364=5513770714708145606, -8604773776820158944=-7212706120285088590, -4383830359476279213=5439461521018447939, -1641897178290464600=-510250118691366334, 6730277299577429107=6208397095766677293, -8115189405407159227=1214364586718890587, -1356407922762824927=3843071132908231388, 7588127658633797238=-3035483582895424875, -1600730095770316997=7644364465767360178, -5977653414665598802=6443496393438179244, -4289645355525492039=-7771435519918529374, 4818452273885227082=2145248154554053428, -2465951973599725012=3719500247592761581}, [11182]]
:
:
20536 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [16102]
20536 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {-217168626838496871=-2098183848111262192, 6694109964234120893=-2919364166006116209, -490688806946141211=-115866302962242997, 6556174365450512594=3417840899423850921, 4973296703617984132=-2075234239029720284, -8433729321932816312=3438734146522000751, -695350376461229364=5479035516364205453, -1356407922762824927=536244103058094589, -8759945507516006916=-7691606294364721504, -1600730095770316997=5198643672682538868, 1928584781574233931=3233801634595403530, 4818452273885227082=3613752122075445564, 4017617819859316672=-5965638492780984127, -7679610850762262585=8746099621132998817, 3628468721856542322=3234989506915660599, 8041484834072108391=2473403470958482418, -8604773776820158944=4291163489101357389, 5275805877791886609=2224008364377626542, -4383830359476279213=-1613810029185700041, -1641897178290464600=-7937957462653577021, 6730277299577429107=-5906850224979170611, -8115189405407159227=-6807612857900762546, -492827708169915833=-3985992390535713144, 7588127658633797238=7922452426043726560, -3771417496478433111=-1133769369307004904, -5977653414665598802=2226449212304201933, -4289645355525492039=8279576780572003648, 2457695339746807997=-943386263403830945, 7278045793299574088=-3628544844039353718, -2465951973599725012=5019064850597613642}, [16102]]

最後に、3個合計Tupleが溜まったところで平均値を計算して出力しています。

20540 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.task - Emitting: tumblingavg default [11112]

この後PrinterBoltにデータ送付しているのですが、PrinterBoltは非常にシンプルなので、ここで触れるのは割愛したいと思います。
さて、上記のような感じでスライディングウィンドウも使えました。設定した閾値に伴い指定された動作をしてくれるので、簡単な仕組みであれば、Stormだけで動作させられそうです。
具体的な関数の整備はこれからのようですが、SlidingWindowSumBoltやTumblingWindowAvgBoltを見ればわかるように、「BaseWindowedBoltを実装してexecuteで計算させる」というシンプルでオーソドックスなAPIになっています。EsperWSO2 CEP(Siddhi)といったCEPプロダクトの関数を実装したことがある人なら、難なく実装できると思います。

Automatic Backpressure

個人的に一番気になっているところです。そもそもBackpressureって何ぞや、というところですが。

Backpressureとは何ぞや?

もともとは半二重接続のハブやスイッチで用いられるフロー制御方式の一つです。機器内の通信バッファがあふれてフローが止まってしまう前に、データ送付側に通知を投げて、送ってくるデータを止めたり、量を調整したりする仕組みのことを言います。

なぜBackpressureがStormに必要か?

Stormがリリースされた際にずっと言われていた問題点として、「Spoutの処理性能がBoltの処理性能を上回っている場合、キューに処理が溜まり続けてTopologyが止まってしまう/遅くなってしまう」という考慮すべき点がありました。Storm自体の処理性能は良いのですが、「データストア用のプロダクトに書き込むBoltの性能が上がらず、キューに溜まる」という事象は「Stormあるある」と言ってよいくらい見かけます。
そのため、Stormの環境を構築する際には、SpoutとBoltの処理性能に気を付ける必要があったわけです。

しかし、今回のBackpressure機構を利用すれば、この問題点を緩和させることが可能になります。今回のBackpressureはTopology単位で以下の設定が可能です。

  1. high-watermarkとlow-watermarkの指定が可能。
  2. キュー内のメッセージ量がhigh-watermarkで指定した比率を上回ったら、Backpressure機能が発生し、Spoutの処理を自動的に遅くする。
  3. キュー内のメッセージ量がlow-watermarkで指定した比率を下回ったら、通常のSpoutの処理に戻る。

こちらに検討中のBackpressureの図が載っているのですが、閾値を検知した時点でZookeeperに通知を飛ばし、Spoutの処理を抑えるような制御をするようです。ただし下記の図は検討中なので、ここからおそらく何らかの変更が加わっているとは思います。公式の発表待ちですね。
https://github.com/apache/storm/pull/700

ただし、解決するパターンと解決できないパターンがきちんと言及されています。

  • 解決するケース:Boltの処理が遅い
  • 解決できないケース:外部システムにアクセスするBoltで、外部システムが止まった場合(「遅い」ではなく、そもそも「処理できない」。こういうケースは、HystrixのようなCircuit Breakerが欲しくなりますね、とStormのissueでも話が出ているようです)

Backpressure利用のための設定値

Automatic Backpressureに関する具体的な設定値は以下で指定可能です。

topology.backpressure.enable: true
backpressure.disruptor.high.watermark: 0.9
backpressure.disruptor.low.watermark: 0.4

まとめ

いくつか特徴的なStormの変更点を確認してきましたが、DevOps・運用面にかなり注目が集まっている時代の中で、Stormもついにそちらに目を向け始めたように見えました。ますます便利になっていくので、目が離せません!


ストリーム王に俺はなる!


Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、AWSHadoop、Storm、NoSQL、Elasticsearch、SpringBoot、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 
データ分析で国内に新規市場を生み出す新サービス開発者WANTED! - Acroquest Technology株式会社の新卒・インターンシップ - Wantedlywww.wantedly.com

Apache版Storm初回リリースの新機能の使い方

こんにちは。kimukimuです。
f:id:acro-engineer:20140709095247p:plain

夏なのか梅雨なのか微妙な気候になっているような感覚を覚える今日この頃です。
いきなり暑くなってきているので、バテないよう気をつける必要がありますね。

さて、前回Apache版Stormの新機能の概要について紹介しましたが、
今回は実際に新機能がどういう風に使えるのか、について確認してみようと思います。

尚、Storm-0.9.2-incubatingもリリースされていますが、それは次回に回すとして、
今回はStorm-0.9.1-incubatingの新機能です。

・・・といっても、ビルドツールの変更などは確認してもあまり嬉しいことはないため、
下記の2つの機能に絞って確認を行ってみることにします。

  1. Storm-UIの各項目にツールチップで解説を表示
  2. NimbusにTopologyをSubmitする際、設定に対するバリデーションが追記

1.Storm-UIの各項目にツールチップで解説を表示

Storm-UIで表示される各種項目に対してマウスオーバーした際にツールチップで解説が表示されるようになった」という新機能です。

Storm-UIを使ってさえいれば常時有効となります。
これは実際にStorm-UIで見てみた方が早いため、実際どんな内容が表示されるかを見てみましょう。

まずはTop画面のバージョンから。こういった形で項目にマウスオーバーすることで項目の説明が表示されます。
f:id:acro-engineer:20140519072554j:plain
同じように、Topology Summaryの画面でも下記のような項目の説明が表示されます。
Capasityをはじめとした「生データから算出される項目」についてはどのように値が算出されているかも記述されています。
f:id:acro-engineer:20140519072902j:plain
f:id:acro-engineer:20140519072906j:plain
Executor SummaryにおいてもHostNameの算出の方法について記述されるなど、かゆい所にも手が届きます。
f:id:acro-engineer:20140519072909j:plain

Storm-UIは基本的にStormがZooKeeper上に保持している性能情報を
Nimbusから取得して表にしているだけのため、パっと見はわかりにくい画面なのですが、
今回各項目に解説が表示されるようになったため、使いやすくなったとは思います。

2.NimbusにTopologyをSubmitする際、設定に対するバリデーションが追記

では、次は「TopologyをSubmitする際、設定に対する定型的なバリデーションを行えるようになった」についてです。

これは内容としては、TopologySubmit時に型として不正な値が設定されている設定値を検出してバリデーションを行うものです。
バリデーション対象となるのはStorm自体が動作するために必要な設定値です。
Topology固有の設定値についてはこれまでと同じくTopologyValidatorを自前で作成し、チェックを行う必要があります。

では、実際にどういう場面で使われるのかを確認してみましょう。

今回ベースとするのはincubator-storm/examples/storm-starter at master · apache/incubator-storm · GitHubです。
この中で一番単純なExclamationTopologyを例にとります。
ExclamationTopologyをStormクラスタにSubmitする際のConfigオブジェクトに
ZooKeeperのポート設定を文字列("Test")として詰めて起動してみます。
当然ながら、ポート設定のため本来数値で設定されている必要があります。

  • ExclamationTopology
  public static void main(String[] args) throws Exception {
    TopologyBuilder builder = new TopologyBuilder();

    builder.setSpout("word", new TestWordSpout(), 10);
    builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
    builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");

    Config conf = new Config();
    conf.put(Config.STORM_ZOOKEEPER_PORT, "Test"); // 本来数値でないと動作しない設定に文字列を設定

    if (args != null && args.length > 0) {
      conf.setNumWorkers(3);

      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    }
    else {

      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();
    }
  }

この状態でStormクラスタにSubmitを行うと・・?

# bin/storm jar storm-starter-0.9.1-incubating-jar-with-dependencies.jar storm.starter.ExclamationTopology ExclamationTopology-3
(省略)
354  [main] INFO  backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar...
359  [main] INFO  backtype.storm.StormSubmitter - Uploading topology jar storm-starter-0.9.1-incubating-jar-with-dependencies.jar to assigned location: /opt/storm/nimbus/inbox/stormjar-06306ca5-a1d6-4991-a47a-98b87126186b.jar
409  [main] INFO  backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /opt/storm/nimbus/inbox/stormjar-06306ca5-a1d6-4991-a47a-98b87126186b.jar
409  [main] INFO  backtype.storm.StormSubmitter - Submitting topology ExclamationTopology-3 in distributed mode with conf {"topology.workers":3,"storm.zookeeper.port":"Test"}
415  [main] WARN  backtype.storm.StormSubmitter - Topology submission exception: field storm.zookeeper.port 'Test' must be a 'java.lang.Number'
Exception in thread "main" InvalidTopologyException(msg:field storm.zookeeper.port 'Test' must be a 'java.lang.Number')
        at backtype.storm.generated.Nimbus$submitTopology_result.read(Nimbus.java:2466)
        at org.apache.thrift7.TServiceClient.receiveBase(TServiceClient.java:78)
        at backtype.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:162)
        at backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:146)
        at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:98)
        at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:58)
        at storm.starter.ExclamationTopology.main(ExclamationTopology.java:76)

このように、「storm.zookeeper.port」が"Test"という設定になっており、NumberではないからSubmit出来ない、
クラスタに投入する前にはじくことができました。

これは今までだと設定が誤っていることに気付かずにStormクラスタにSubmitしてしまい、
クラスタで起動する際にWorkerプロセスが起動して死ぬを繰り返す・・・という厄介な状態に陥っていました。

特に、実際に使っている方だと
JSONYamlといったファイルに設定値を外だしして読み込ませる方も多いと思いますが、
設定ファイルに記述していた内容が誤っていた場合に予め検出してくれるのでかなり便利に使えると思います。

3.Apache版Stormの初回リリース機能についてのまとめ

  1. Storm-UIに解説が加わり、各画面の項目の意味がわかりやすくなりました。
  2. TopologySubmit時に明らかに誤った設定は事前にはじけるようになりました。

特に新しい機能が追加された・・・というわけではないのですが、使いやすさが確実に増すバージョンアップだったと思います。

尚、別の投稿で紹介しますが、Storm-0.9.2-incubatingでリリースした内容は
使いやすくなる、ではなく実際に使える機能が追加されたリリースになっています。
Apacheに移り、今後も期待していけるStormになった、と言えるでしょう。

それでは。

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Apache 初回リリースのStorm0.9.1のリリース内容は?

こんにちは。kimukimuです。

徐々に暖かくなってきていますね。春は目前です。

さて、つい先日Apache Stormとして初リリースとなる「0.9.1-incubating」がリリースされました!
#ちなみに前回リリースは「0.9.0.1」です。

今回リリース内容は大きく分けて、Apache Incubatorに移行したため発生したものと、
機能追加/不具合対応によるものの2つがあります。

節目のリリースですので両方についてリリースノートより主要な項目を紹介しますね。

1. Apache Incubatorとしてのリリース内容

ライセンスの変更

ライセンスが「Eclipse Public License - v 1.0」から「Apache License Version 2.0」に変更になってます。
今回の変更で今まで使っていた方が使えなくなる・・・ということはないのですが、
きちんとライセンスは確認した上で使うようにしましょう。

ビルドツールの変更

ビルドツールがLeiningen > Mavenに変更になりました。
これも使う上で何かが変わるというわけではありませんが、
Mavenになったことによって、JavaをやっているエンジニアがStormのソースを修正してビルドを行う・・・
ということがやりやすくなったとは思います。
ただ、Stormのコア部分がClojureで記述されていることには変わりありませんので、あしからず。

Mavenリポジトリ上のGroupIdの修正

今回のリリースによって、GroupIdが「storm」 > 「org.apache.storm」に変更になっています。
バージョンアップを行う時には気をつけましょう。

2. 機能追加/不具合対応によるリリース内容

通信モジュールのデフォルトがZeroMQからNettyに変更

今回のバージョンから通信モジュールのデフォルトがZeroMQからNettyに変更されました。
Making Storm fly with Netty | Yahoo Engineeringを見るに、パフォーマンス自体もNettyの方が高いようです。
加えて、依存性も少なくなるため、デフォルトになったのは納得がいく結果ではありますね。

Storm-UIの各項目にツールチップで解説を表示

Storm-UIで表示される各種項目に対してマウスオーバーした際にツールチップで解説が表示されるようになりました。
Storm-UIの動きはこれまでは実際にしばらく使ってみないといまいちわからないことも多かったため、
これは有難い機能追加ですね。

NimbusにTopologyをSubmitする際、設定に対するバリデーションが追記

NimbusにTopologyをSubmitする際に設定値に対する定型的なバリデーションが行われるようになりました。
TopologyにSubmitする際にはじかれない場合、クラスタ上でTopologyが走り出してからWorkerプロセスが
エラーで落ちて、復活してを繰り返す・・・ということが発生するため、意図しない動作を防止してくれますね。

「storm jar」コマンドによるTopology起動がWindows上で動作しない問題に対応

Stormは0.9.0の時点でZeroMQが必須では無くなったことによってWindows上でも動作するようになっていました。
ですが、一部のコマンドが動作しない個所があったため、問題が対応され、Windows上でもStormのフル機能が使用できるようになりました。
主流では無いとは思いますが、Windows上でもそれなりに使われているようですね。

3.今回のリリースのまとめ

今回のリリースはApacheに移行したことによるメンテナンスと、後は使い勝手に関わる箇所のリリースで、
何か大きな新機能が追加されたということはありませんでした。

ですが、今回のリリースでApacheへの移行が正式に完了したということにはなります。
以後はApacheとしてのリリースが続いていくはずですので、この先に期待、ですね。
それでは。
f:id:acro-engineer:20140305072059j:plain

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Storm上で動作するオンライン機械学習ライブラリをOSSとして公開しました!

id:KenichiroMurata (@muraken720)です。

昨日、AcroquestはStorm上で動作するオンライン機械学習ライブラリをOSSとして公開しました!
ここでも簡単に紹介させて頂きます。

AcroMUSASHI Stream-ML(Machine Learning Library)

「AcroMUSASHI Stream-ML」はStorm上で動作し、連続的に発生し続けるストリームデータに対して、動的に学習データを更新しながら、リアルタイムの分析をすることができるライブラリです。

https://github.com/acromusashi/acromusashi-stream-ml

Storm をベースとした、ストリームデータの分散処理プラットフォームである「AcroMUSASHI Stream」と組み合わせることで、「データの収集~分析~結果の出力」までをシームレスに結合することができ、機械学習を行うためのシステム開発を迅速に行うことができます。

現在は以下のアルゴリズムが利用できます。

今後もニーズに合わせて、順次対応を広げていく予定です。
ぜひ、興味を持たれた方はご覧になってください。

参考


f:id:acro-engineer:20131211084154j:plain:w200

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Storm関連本とオススメ電子書籍

新年あけましておめでとうございます。ツカノ(@)です。
今年最初のT3ブログですね。よろしくお願いします。

私はリアルタイム分散処理の仕事に携わっているため、昨年、こんな話題に興味を惹かれました。

  • CDHによるSparkのサポート
  • AmazonからKinesisが登場
  • ImpalaPrestoといった分散クエリの盛り上がり

こういった傾向から、リアルタイム分散処理に関する機運の高まりを感じます。この調子だと、2014年はリアルタイム分散処理でアドベントカレンダーを立ち上げられるくらいの年になるのではと、期待しています。

さて、リアルタイム分散処理を行うプロダクトはSparkSplunkStorm等、様々あります。中でもStormは、このブログで特に取り上げてきました。そこで、今回はStorm関連本についてまとめてみました。
あっ、私が知らないだけで、他にもオススメの本があれば是非教えてください^^

Getting Started With Storm

Getting Started With Storm

  • 作者: Jonathan Leibiusky,Gabriel Eisbruch,Dario Simonassi
  • 出版社/メーカー: Oreilly & Associates Inc
  • 発売日: 2012/09/17
  • メディア: ペーパーバック
  • クリック: 1回
  • この商品を含むブログを見る
92ページとコンパクトですが、Topology、Spout、Boltといった概念を図やコード付きで一通り説明しています。また、後半では、Node.jsやRedisと連携してECサイトを構築する例が載っています。ただし、Stormは0.7.1を使っていて、現在ではAPIが変更されている箇所があるため注意が必要です。

f:id:acro-engineer:20140109230121g:plain
「Getting Started with Storm」の日本語訳です。Ebookのみ販売されています。

Storm Real-time Processing Cookbook

Storm Real-time Processing Cookbook

ログファイルを集めてStormで処理した結果をElasticsearch+Kibanaで処理したり、機械学習したり、AWSに載せたり、様々なことにチャレンジしています。
この本を出版しているPackt Publishingは新しい技術の電子書籍を素早く、積極的に出版しています。また、年末に電子書籍の一律$5セールを行ったり、随分お世話になっています。書籍の種類も多いため、気になる方はPackt Publishingのサイトをチェックしてみてはいかがでしょうか。

f:id:acro-engineer:20131212194541j:plain
Manning PublicationsMEAP(Manning Early Access Program)です。正式出版は先ですが、書いたところから公開されています。2014年夏に正式出版予定です。まだ書かれていませんが、目次を見るとチューニングトラブルシュートの章があるので、期待しています。
Manning Publicationsは「~~ in Action」シリーズを出版している会社で、中でもMEAPは新しい技術についてすぐに読むことができるので、素敵な企画だと思います。また、一年中、日替わりで電子書籍の半額セールを行っているので、欲しい本がある人は半額になるタイミングを待つのがオススメです!

f:id:acro-engineer:20140110001915j:plain
これもMEAPです。Stormの作者であるNathan Marzさんが著者のひとりです。2014年3月に正式出版予定です。twitterハッシュタグをリアリタイム分析するために生まれた「ラムダ・アーキテクチャ」(このアーキテクチャにStormが組み込まれています)の説明などがあり、Stormに限らず、ビッグデータ周りのアーキテクチャおさえる上でも良いと思います。以下のページでは、ラムダ・アーキテクチャの日本語での解説が読めます。非常にありがたいですね。
  ハッシュタグのリアルタイム分析のためのラムダ・アーキテクチャー

The Storm Book

The Storm Book

書名を見て思わず買ってしまった、その名も「The Storm book」。32ページと手軽なサイズ。また、本全体の半分は絵だけのページとなっており、
小学生でも読めるStorm関連本(^^)となっています。
Amazonのカスタマーレビューでは★★★★★のコメントがいくつも寄せられているので、小さなお子さんがいる方は、Stormの英才教育をすべく、寝る前に読み聞かせてみましょう!

……はい、どう見てもただの絵本です。本当にありがとうございました^^


さて、いかがだったでしょうか?
最後の本はともかく、今年は一層盛り上がる(と思っています!)リアルタイム分散処理、中でもStormを学ぶ際に参考になればと思います。

ちなみに、Stormの入門には以下のSlideShareがオススメです^^


Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Amazon Kinesis + Storm を連携させて、ストリームデータ処理を行ってみた

こんにちは。kimukimuです。


昨日はクリスマスイブでしたが、皆さんはどのように過ごされたでしょうか?
私はケーキは買う派ではなく、自分で作る派です。……まぁ、食べるのも自分なのですが……

クリスマスはさておき、今日はKinesisとStormに関する内容です。

AWS re:Invent 2013 で 発表されたAmazon Kinesis が12/17にPublic Betaになり一般公開されましたね!
以下のように、KinesisはStormのコネクタを含んでいる、ということなので、さっそく試してみましょう。

Kinesis does include a connector for porting data to Storm, which AWS General Manager
for Data Science Matt Wood said is a possibility in cases where existing Storm users want to keep using it
for processing data while automating the collection with Kinesis.

Amazon’s streaming data service, Kinesis, is now available — Tech News and Analysis

手始めに、Stormと連携し、Kinesisに投入された文章中の単語出現数をリアルタイムに集計するWordCountのサンプルを作成してみました。

今回使用したソースは全てkimutansk/storm-example-wordcount · GitHubにアップしています。

1.Amazon Kinesis とは?(おさらい)

Amazon Kinesisはストリームデータのリアルタイム処理を行うプラットフォームです。
詳細は下記のページなどで紹介されているため、参照してください。

Amazon Kinesisはストリームデータを処理するキューとして下記のような特徴を持っています。
• 取得した時点でキュー上のメッセージは削除されないため、後で繰り返し取得可能
• あるキューに対して複数のProducer(メッセージ生産者)、Consumer(メッセージ消費者)を紐づけることが可能

加えて、AWSのサービスであるためスケール可能です。
大容量、スケール可能な高速なキューをサービスとして利用でき、
かついくらでも他のコンポーネントから独立してスケール可能というのは非常に大きいと思います。

2.どんな構成になるのか?

2-1.構成図

今回作った構成は下記のようになります。

2-2.各要素の役割

実際の処理の流れとしては下記のようになります。

  1. KinesisPutterが文章をAmazon Kinesisに投入する。
  2. StormのKinesisSpoutが、Amazon Kinesisに蓄積された文章を取得する。
  3. Storm内で、SplitSentenceが、文章を単語単位に分割する。単語ごとにグルーピングを行い、次のBoltに流す。
  4. Storm内で、WordCountBoltが単語をカウントする。

3.Amazon Kinesisの設定を行ってみる

3-1.Kinesis用のユーザ権限確認

では、まずAmazon Kinesisの設定を行います。
Amazon Kinesisのページから申し込みを行ったうえでAWS Consoleにログインすると
下記のようにKinesisのサービスが表示されるようになります。

加えて、その状態ですとIAMの設定画面からAmazon Kinesis用の権限テンプレートも
追加されていますので、KinesisにフルアクセスできるGroupを作成し、ユーザに割り振っておきます。

KinesisにフルアクセスできるユーザのaccessKeyとsecretKeyを用いてKinesisのアプリケーション開発を行います。

3-2.Kinesis Streamの作成

次にKinesisのStream(1個のデータストリームを示す)の作成を行います。
Kinesisの画面を表示し、Streamを作成します。

Streamの名前とShard(Stream中のデータパーティション。この数でStreamの性能が決まる)の数も入力します。
Shardの数に応じて性能が決まるので、性能の値もきちんと確認しておきましょう。

その上でCreateボタンを押下すると、Streamの初期化が始まります。
Createボタンを押下してしばらくは「CREATING」という初期化中のステータスとなります。

初期化が完了するとステータスが「ACTIVE」となり、使用可能になります。

4.作成したソースコード

では、実際に作成したソースコードを示します。
開発を行う場合にはMavenに以下の定義を追加しておく必要がありますので、その前提で進めます。

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.0.0</version>
</dependency>

4-1.KinesisPutter

Kinesisに文章を投入するプログラムです。
KinesisClientを初期化した後Partition用のキーを指定して投入するだけなので、投入側は単純な構成ですね。

  • KinesisWordPutter.java
/**
 * プログラムエントリポイント<br/>
 * <ul>
 * <li>起動引数:arg[0] KinesisStream名称(例:TestStream)</li>
 * <li>起動引数:arg[1] 投入メッセージ数</li>
 * <li>起動引数:arg[2] 送信メッセージ間隔(ms)</li>
 * </ul>
 * 
 * @param args
 *            起動引数
 * @throws InterruptedException 割り込み発生時
 * @throws UnsupportedEncodingException 文字コード不正時
 */
public static void main(String... args) throws InterruptedException, UnsupportedEncodingException
{
    // KinesisClient初期化
    AmazonKinesisClient client = new AmazonKinesisClient(new ClasspathPropertiesFileCredentialsProvider());

    String streamName = args[0];
    int putCount = Integer.valueOf(args[1]);
    long interval = Long.valueOf(args[2]);

    Random random = new Random();
    int sentenceNum = SENTENCES.length;

    // 送信メッセージ数の数だけランダムで文章一覧から取得し、Kinesisに送信
    for (int count = 0; count < putCount; count++)
    {
        int sentenceIndex = random.nextInt(sentenceNum);
        String putSentence = SENTENCES[sentenceIndex];
        PutRecordRequest putRecordRequest = new PutRecordRequest();
        putRecordRequest.setStreamName(streamName);
        putRecordRequest.setData(ByteBuffer.wrap(String.format(putSentence).getBytes("UTF-8")));
        putRecordRequest.setPartitionKey(String.format("partitionKey-%d", count));
        PutRecordResult putRecordResult = client.putRecord(putRecordRequest);
        System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey() + ", ShardID : " + putRecordResult.getShardId());

        TimeUnit.MILLISECONDS.sleep(interval); 
    }
}

4-2.KinesisSpout

Kinesisから文章を取得するStormSpoutです。
こちらは多少複雑で、初めにStream/Shardの情報を取得した上でShardのIteratorを取得し、
Iteratorを用いて文章を取得する形になります。

また、Kinesisは先ほどのコンソールでわかったかと思いましたがReadのリクエスト処理数が少ない(秒間5リクエスト)ため、
1リクエストでまとめてデータを取得し、処理する必要が出てきます。
その関係上、Stormの処理モデル(nextTupleメソッド1回呼び出しあたり1Tuple1Emit)に合わせるため、
Kinesisから取得したレコードを一度リストに蓄積しておき、リストから1Tupleずつ取り出し、Boltに流す構成になっています。

  • KinesisWordPutter.java
/**
 * {@inheritDoc}
 */
@SuppressWarnings({ "rawtypes" })
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector)
{
    this.collector = collector;
    this.taskIndex = context.getThisTaskIndex();

    this.kinesisClient = new AmazonKinesisClient(new ClasspathPropertiesFileCredentialsProvider());

    DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest();
    describeStreamRequest.setStreamName(this.streamName);
    describeStreamRequest.setLimit(this.maxShardCount);
    DescribeStreamResult describeStreamResult = this.kinesisClient.describeStream(describeStreamRequest);
    this.shards = describeStreamResult.getStreamDescription().getShards();

    GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
    getShardIteratorRequest.setStreamName(this.streamName);
    getShardIteratorRequest.setShardId(shards.get(this.taskIndex).getShardId());
    getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

    GetShardIteratorResult getShardIteratorResult = this.kinesisClient.getShardIterator(getShardIteratorRequest);
    this.shardIterator = getShardIteratorResult.getShardIterator();

    this.decoder = Charset.forName("UTF-8").newDecoder();
    this.gettedRecords = Lists.newArrayList();
}

/**
 * {@inheritDoc}
 */
@Override
public void nextTuple()
{
    if (this.gettedRecords.isEmpty())
    {
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        getRecordsRequest.setShardIterator(this.shardIterator);
        getRecordsRequest.setLimit(this.maxGetRecordNum);

        GetRecordsResult getRecordsResult = this.kinesisClient.getRecords(getRecordsRequest);
        this.gettedRecords.addAll(getRecordsResult.getRecords());
    }

    if (this.gettedRecords.isEmpty())
    {
        return;
    }

    Record emitRecord = this.gettedRecords.remove(0);
    this.sequence = emitRecord.getSequenceNumber();

    String sentence = null;

    try
    {
        sentence = this.decoder.decode(emitRecord.getData()).toString();
    }
    catch (CharacterCodingException ex)
    {
        throw new RuntimeException(ex);
    }

    this.collector.emit(new Values(sentence), emitRecord.getSequenceNumber());
}

5.実際に動かしてみる

5-1.KinesisPutter

KinesisPutterを動かすと、下記のようにPutしたメッセージが表示されます。
PartitionKeyに合わせてShardIdが分散される構成になっていますね。

(省略)
Successfully putrecord, partition key : partitionKey-1299, ShardID : shardId-000000000001
Successfully putrecord, partition key : partitionKey-1300, ShardID : shardId-000000000000
Successfully putrecord, partition key : partitionKey-1301, ShardID : shardId-000000000001
Successfully putrecord, partition key : partitionKey-1302, ShardID : shardId-000000000001
(省略)

5-2.WordCountTopology

TopologyのDEBUGモードをONにし、Tupleの内容を確認すると単語がカウントされていることがわかります。

(省略)
[Thread-25-WordCount] INFO  backtype.storm.daemon.task - Emitting: WordCount default [the, 1280]
[Thread-33-WordCount] INFO  backtype.storm.daemon.task - Emitting: WordCount default [inch, 800]
[Thread-25-WordCount] INFO  backtype.storm.daemon.task - Emitting: WordCount default [square, 533]
[Thread-33-WordCount] INFO  backtype.storm.daemon.task - Emitting: WordCount default [inch, 801]
(省略)

ということで、KinesisとStormの連携が実現できました。

…ん?Stormのコネクタってどこ?

KinesisSDKには含まれていないのでしょうかね?

amazon-kinesis-clientやawslabsのサンプルを見ましたが、Stormと関連するようなコンポーネントは見つからず…
まぁ、ひとまず連携はできたので、今回はOKとしましょう。

6.KinesisとStormを連携してみた感想

実際にKinesisとStormを連携させた結果は、以下のような感じでした。

  • 基本的な動作は非常に簡単に実現可能
    • 単に1Shardを保持するStreamを作成し、Spoutで取得して処理するだけであれば、ドキュメント&サンプルが充実していることもあり、非常に簡単に実現可能です。
    • ドキュメントのコードをコピーして微修正するだけで実現できます。
  • 異常系処理等がクラウドによって隠蔽されているため、シンプル
    • この手の分散システムを実装する際には必要となる、構成変更時や異常系のハンドリングがクラウドというレイヤを通すことで隠蔽されているため、シンプルに実装出来ます。


なお、KinesisとStormの役割の違いは以下のようになると思います。

  • Kinesis
    • 分散してデータを収集したり、分配したりする。
  • Storm
    • ちょっと複雑な演算処理やストリーム処理とバッチ処理の統合を行ったりする。加えて失敗検知、再送管理を行う。


もう年内も残すところ約1週間ですね。
皆さん、残りの仕事を片付けて、すっきりと来年を迎えられるようにしましょう!(はい、自分自身がガンバリマス)

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Kafka+Storm+Elasticsearch+Kibanaでストリームデータ処理の可視化を行ってみた

こんにちは。kimukimuです。


AWS re:Invent 2013Amazon Kinesis が発表されるなど、
ストリームデータ処理に対するニーズの高まりを感じますね。
(Amazon Kinesis は、Stormとも連携できるようになっているようです)。

さて、先日、Storm 0.9.0 が正式リリースされたり、Apache Kafka 0.8.0 が正式リリースされたりしたので、
それらを連携して、ストリームデータの可視化を行うプロトタイプを作ってみました。

1. はじめに

まず、「ストリームデータ」とは、連続的に発生し続けるデータのことを指します。
システムが出力するログやセンサーが発生するデータ、SNSなどで常時発生するメッセージなどが該当します。
今回は、Apacheが出力するログを、ストリームデータとして収集・可視化することを行ってみます。

1-1.やりたいこと

実現したい内容は、以下のような内容です。

  • ログをリアルタイムに収集する。
  • ログの出力状況をリアルタイムにブラウザで表示させる。
  • スケールアウトを考慮して、分散処理を行う。

1-2.利用するもの

今回利用したプロダクトは以下の通りです。

尚、Stormのインストーラは下記の場所で公開&随時更新していますので、お使いください。
acromusashi/storm-installer · GitHub

2.どんな構成になるのか?

2-1.構成図

今回作った構成は下記のようになります。

2-2.各要素の役割

実際の処理の流れとしては下記のようになります。

  1. Kafkaが各サーバ上でログを収集する。
  2. StormのKafkaSpoutが、Kafkaに蓄積されたログを取得する。
  3. Storm内で、ElasticsearchBoltが、分散して、Elasticsearchにログを投入する。
  4. Kibana3がElasticsearchに投入されたログの統計情報を表示する。

ソースについてはElasticsearchBoltの抜粋部のみこの記事に記載しますが、
整理が完了したら後程公開しますのでお楽しみに。

ElasticsearchBoltは下記のように実装しています。
ここでのclientはElasticsearchのクライアントインスタンス、converterはTupleから投入するデータを生成するコンバータです。

/**
 * {@inheritDoc}
 */
@Override
public void execute(Tuple input)
{
    String documentId = null;
    String indexName = null;
    String typeName = null;
    String document = null;

    try
    {
        documentId = this.converter.convertToId(input);
        indexName = this.converter.convertToIndex(input);
        typeName = this.converter.convertToType(input);
        document = this.converter.convertToDocument(input);

        IndexResponse response = this.client.prepareIndex(indexName, typeName, documentId).setSource(
                document).setPercolate(this.percolate).execute().actionGet();

        if (logger.isDebugEnabled() == true)
        {
            String logFormat = "Document Indexed. Id={0}, Type={1}, Index={2}, Version={3}";
            logger.debug(MessageFormat.format(logFormat, response.getId(), typeName, indexName,
                    response.getVersion()));
        }
    }
    catch (Exception ex)
    {
        String logFormat = "Document Index failed. Dispose Tuple. Id={0}, Type={1}, Index={2}";
        logger.warn(MessageFormat.format(logFormat, documentId, typeName, indexName), ex);
    }

    getCollector().ack(input);
}

3.実際に動かしてみる

では、実際にログを流して結果をKibana 3で確認してみます。

すると・・・?
下記のような形で簡単な統計情報を表示することができました。
HTTPリクエストのレスポンスタイム平均値、リクエスト回数、アクセス元ホスト、ステータスコードといった
基本的な統計が表示できることが確認できました。
実際の画面上では、随時グラフが更新されていくので、どのような動作になっているのかが、リアルタイムにわかります。

4.何が良いのか?

今回のプロトタイプはつまりは
「ストリームデータを収集し、Stormで処理/変換を行ってElasticsearchに投入、Kibana 3で統計情報を可視化」
のプロトタイプ・・・という形になります。

このプロトタイプを応用することで、以下のようなことが実現できると考えています。

  • (異常検知)ログやイベントをリアルタイムに収集し、サーバ動作やユーザアクセスの異常検知などを行い、可視化する。
  • (M2M)センサーデータを受信し、センサーデータの統計処理を行い、可視化する。
  • (評判分析)SNSのメッセージ内容を解析し、その内容をクラスタリングし、可視化する。

色々夢は広がりますが、とりあえず今回はこのあたりで。


Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ