Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Apache Storm 1.0.0の機能を使ってみる

お久しぶりです@ です。

このブログへの登場はかなり久しぶりです。昨年10月にミャンマーから日本に帰ってきて、今は、IoTやら可視化などに関する仕事をしています

さて、TwitterよりStormが公開されて以降、分散ストリーム処理フレームワークも、FlinkSpark StreamingSamzaBeamGearpumpSensorBee等、さまざまなOSSプロダクトが公開されました。

世はまさに「大ストリーム時代」!?(ワンピース風)

そのような中、4/12にApache Storm から正式メジャーバージョンとなる、1.0.0がリリースされました。このタイミングでどのような機能が盛り込まれるのか、興味を持っていましたが、これまでの課題を解消しつつ、他プロダクトよりも一歩先に行くような内容もリリースされました。

大きな変更点は12個

以下の公式サイトでも公表されていますが、メインとなる変更点は12個のようです。
Storm 1.0.0 released

No タイトル 内容
1 Improved Performance 16倍の処理速度向上と、60%のレイテンシの減少に成功しました。
2 Pacemaker - Heartbeat Server Stormがスケールアップするにつれて生じていたZookeeperのパフォーマンスボトルネックの解消のため、インメモリのKey-Valueストアとして機能するオプションのStormデーモンPacemakerが追加されました。
3 Distributed Cache API Topology毎に共有できるデータストア空間を用意し、Blobで共有データが保持、参照できるようになりました。
4 HA Nimbus Distributed Cache APIの機能を利用することで、高可用性を備えたNimbusが実現可能になりました。
5 Native Streaming Window API Storm Native な Window APIが用意され、いわゆるCEP処理を実装しやすくなりました。
6 State Management - Stateful Bolts with Automatic Checkpointing 自動チェック機構を備えたステートフルなBoltの利用により、状態管理が可能になりました。
7 Automatic Backpressure 上限値、下限値の設定による自動バックプレッシャーが可能になりました。
8 Resource Aware Scheduler Topology毎のリソース(メモリ/CPU)を考慮したタスクスケジューラが実現可能になりました。
9 Dynamic Log Levels Storm UIから動的に出力ログレベルの変更が可能になりました。
10 Tuple Sampling and Debugging Storm UI上でTupleのサンプリングとデバッグが可能になりました。
11 Distributed Log Search Worker毎に分散されてしまうログの検索がStorm UI上で実施可能になりました。
12 Dynamic Worker Profiling WorkerプロセスのプロファイリングがStorm UI上で可能になりました。


今回は上記の変更点の中から、特に面白いだろうと思われる以下の3点を調べてみました。他のものは、なんとなくタイトルから想像できますよね。

  1. Distributed Cache API
  2. Native Streaming Window API
  3. Automatic Backpressure

Distributed Cache APIを使ってみる

前バージョンでは、デプロイしたTopology上で何かファイルのデータなどを使いたい場合、Topologyと一緒にデプロイする必要がありました。そのため、大きなデータをTopology起動後に利用したい場合は、デプロイそのものに時間がかかることがありました。
また、各サーバに共有データを置いたり、データ共有のためにKVSなどのStormとは別のプロダクトを利用するのは、実現したいことに対して重く感じます。
しかし今回のバージョンアップで、Topology上で使いたいファイルを、Stormが持っているデータストアに保持し、Topologyからそのデータを参照することが可能になりました。共有データ保存場所が存在し、そこにデータを配置することでデプロイ時間の削減を可能にした機能です。共有データのサイズが大きければ大きいほど、その恩恵を受けることができます。本家サイトでは「位置情報」や「辞書データ」を保持するとよい、と言われています。

Distributed Cache APIの仕組み

Stormのサイトに素敵な解説図があるので、転載させてもらいます。BlobStoreというインタフェースがあり、このインタフェースを実装したLocalFsBlobStoreとHdfsBlobStoreが提供されています。どちらのStore実装も処理の流れはほぼ同じです。仕組みとしては、Supervisor起動時にBlobStoreのMapを取得し、その後MapにしたがってMap情報(共有データ)を取得する流れのようです。

[LocalFsBlobStore]
f:id:acro-engineer:20160425224701p:plain

[HdfsBlobStore]
f:id:acro-engineer:20160425224721p:plain

使ってみる

早速使ってみます。

  1. 共有データの登録
  2. Topologyの起動

が手順になります。確認のため、Topologyは2つ動作させます。

共有データの登録

README.markdownの登録をします。

# ./bin/storm blobstore create --file README.markdown --acl o::rwa --replication-factor 4 key1

共有用のデータは「storm.local.dir/storm-local/blobs/」に配置されていました。

Topologyの起動

Topologyを2つ起動し、どちらも登録したREADME.markdownをダウンロードすることをログから確認したいと思います。本来はTopologyの中で利用されているところを確認したいのですが、サンプルに適当なものがなかったため、ひとまず起動時に登録した共有データが読み込まれることを確認したいと思います。

# ./bin/storm jar examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.clj.word_count test_topo -c topology.blobstore.map='{"key1":{"localname":"blob_file", "uncompress":"false"}}'

test_repoというTopologyを作成し、key1というキーに対して登録したBlobファイルの中身を解凍オプションなしで参照、実行しています。Blobファイルの読み込みに成功すると、以下のようなログが確認できるはずです。

2016-04-23 14:48:05.782 o.a.s.d.supervisor [INFO] Downloading code for storm id test_topo-4-1461390482
2016-04-23 14:48:06.279 o.a.s.d.supervisor [INFO] Successfully downloaded blob resources for storm-id test_topo-4-1461390482
2016-04-23 14:48:06.280 o.a.s.d.supervisor [INFO] Finished downloading code for storm id test_topo-4-1461390482
:
2016-04-23 14:48:06.285 o.a.s.d.supervisor [INFO] Creating symlinks for worker-id: 6d23e0f9-9aa1-43c6-a475-773b0537bdfb storm-id: test_topo-4-1461390482 to its port artifacts directory
2016-04-23 14:48:06.286 o.a.s.d.supervisor [INFO] Creating symlinks for worker-id: 6d23e0f9-9aa1-43c6-a475-773b0537bdfb storm-id: test_topo-4-1461390482 for files(2): ("resources" "blob_file")

同じ操作で別名のTopologyを作成してみてください。上記と同じデータダウンロードが正常に完了するログが確認できるはずです。これでDistributed Cache APIを試すことができました。

Native Streaming Window APIを使ってみる

Stormのネイティブな機能として、スライディングウィンドウが追加されました。どこまでの内容までがStormネイティブとして対応しているのか、サンプルをベースに確認してみたいと思います。まずはメインパートであるSlidingWindowTopology.javaのソースを確認してみます。

    public static void main(String[] args) throws Exception {
        TopologyBuilder builder = new TopologyBuilder();
        builder.setSpout("integer", new RandomIntegerSpout(), 1);
        builder.setBolt("slidingsum", new SlidingWindowSumBolt().withWindow(new Count(30), new Count(10)), 1)
                .shuffleGrouping("integer");
        builder.setBolt("tumblingavg", new TumblingWindowAvgBolt().withTumblingWindow(new Count(3)), 1)
                .shuffleGrouping("slidingsum");
        builder.setBolt("printer", new PrinterBolt(), 1).shuffleGrouping("tumblingavg");
        Config conf = new Config();
        conf.setDebug(true);
        if (args != null && args.length > 0) {
            conf.setNumWorkers(1);
            StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
        } else {
            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("test", conf, builder.createTopology());
            Utils.sleep(40000);
            cluster.killTopology("test");
            cluster.shutdown();
        }
    }

これだけ見ても、以下の3つのBoltが存在します。

  1. SlidingWindowSumBolt
  2. TumblingWindowAvgBolt
  3. PrinterBolt

これらが何をしているのか、またどのように動くのかを確認したいと思います。

[SlidingWindowSumBolt]
とても単純に、受信したTupleの中身を加算していることがわかります。一応ウィンドウから外れたTupleの値は減算するようにも記述されているので、スライディングウィンドウの条件を満たしていることも確認できます。

    @Override
    public void execute(TupleWindow inputWindow) {
            /*
             * The inputWindow gives a view of
             * (a) all the events in the window
             * (b) events that expired since last activation of the window
             * (c) events that newly arrived since last activation of the window
             */
        List<Tuple> tuplesInWindow = inputWindow.get();
        List<Tuple> newTuples = inputWindow.getNew();
        List<Tuple> expiredTuples = inputWindow.getExpired();

        LOG.debug("Events in current window: " + tuplesInWindow.size());
            /*
             * Instead of iterating over all the tuples in the window to compute
             * the sum, the values for the new events are added and old events are
             * subtracted. Similar optimizations might be possible in other
             * windowing computations.
             */
        for (Tuple tuple : newTuples) {
            sum += (int) tuple.getValue(0);
        }
        for (Tuple tuple : expiredTuples) {
            sum -= (int) tuple.getValue(0);
        }
        collector.emit(new Values(sum));
    }

[TumblingWindowAvgBolt]
設定したWindowのサイズで合計値を除算している、シンプルなつくりでした。SlidingWindowTopologyに内包されていますね。「いくつ溜まったら平均値を計算する」という引数には「3」が設定されています。

        @Override
        public void execute(TupleWindow inputWindow) {
            int sum = 0;
            List<Tuple> tuplesInWindow = inputWindow.get();
            LOG.debug("Events in current window: " + tuplesInWindow.size());
            if (tuplesInWindow.size() > 0) {
                /*
                * Since this is a tumbling window calculation,
                * we use all the tuples in the window to compute the avg.
                */
                for (Tuple tuple : tuplesInWindow) {
                    sum += (int) tuple.getValue(0);
                }
                collector.emit(new Values(sum / tuplesInWindow.size()));
            }
        }

[PrinterBolt]
驚くほどシンプルですね。出力するだけ。

  @Override
  public void execute(Tuple tuple, BasicOutputCollector collector) {
    System.out.println(tuple);
  }

まとめると、以下の通りに動くと予想できます。

  1. ランダムに0~999の整数を、合計用のBoltに送付する。
  2. 合計用Boltはデータを受信時、メモリに保持しているSum値に加算していく。
  3. 受信したTuple数が10になったところで、平均計算用Boltに合計値を送付する。
  4. 1~3を、平均計算用Boltの保持Tuple数が3になったら、平均値を計算する。
  5. 1~4を40秒間繰り返す。


これらを基に、動かした際のログを見てみましょう。ソースコードの通り、起動後に40秒で終了するようなので、以下のコマンドを実行してしばらく待ってみます。

# bin/storm jar examples/storm-starter/storm-starter-topologies-1.0.0.jar org.apache.storm.starter.SlidingWindowTopology

[合計用Boltに対する出力ログ]
以下のような感じで合計用Boltにはログが出力されていました。

17562 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Processing received message FOR 4 TUPLE: source: integer:2, stream: default, id: {4017617819859316672=-3880300761259985782}, [899, 1461542140369, 1]
17562 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: default, id: {4017617819859316672=-3880300761259985782}, [899, 1461542140369, 1] TASK: 4 DELTA: 
17674 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Processing received message FOR 4 TUPLE: source: integer:2, stream: default, id: {-2465951973599725012=3371764614267379312}, [888, 1461542140475, 2]
17674 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - Execute done TUPLE source: integer:2, stream: default, id: {-2465951973599725012=3371764614267379312}, [888, 1461542140475, 2] TASK: 4 DELTA: 

で、10個たまったところで平均計算用Boltに送付しています。

18516 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [6053]
18516 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {-1356407922762824927=-4912942846543592390, 4017617819859316672=-8950777703384980140, -7679610850762262585=5600559632140381686, -217168626838496871=6670643717321357413, -8433729321932816312=-1512990481045386819, -695350376461229364=-6915299522591467528, -8604773776820158944=2085240823323939478, 4818452273885227082=1055563511177261421, -4383830359476279213=-2430226558792731842, -2465951973599725012=-3111554498250395772}, [6053]]
:
:
19527 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [11182]
19527 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {4017617819859316672=1956648145851480265, -7679610850762262585=-8990022321548325348, -217168626838496871=8356349476653175499, 6556174365450512594=-2956830898901769282, 4973296703617984132=630324356173502412, -8433729321932816312=7530781138220324522, 8041484834072108391=-1100584463729972475, -695350376461229364=5513770714708145606, -8604773776820158944=-7212706120285088590, -4383830359476279213=5439461521018447939, -1641897178290464600=-510250118691366334, 6730277299577429107=6208397095766677293, -8115189405407159227=1214364586718890587, -1356407922762824927=3843071132908231388, 7588127658633797238=-3035483582895424875, -1600730095770316997=7644364465767360178, -5977653414665598802=6443496393438179244, -4289645355525492039=-7771435519918529374, 4818452273885227082=2145248154554053428, -2465951973599725012=3719500247592761581}, [11182]]
:
:
20536 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.task - Emitting: slidingsum default [16102]
20536 [Thread-25-slidingsum-executor[4 4]] INFO  o.a.s.d.executor - TRANSFERING tuple [dest: 5 tuple: source: slidingsum:4, stream: default, id: {-217168626838496871=-2098183848111262192, 6694109964234120893=-2919364166006116209, -490688806946141211=-115866302962242997, 6556174365450512594=3417840899423850921, 4973296703617984132=-2075234239029720284, -8433729321932816312=3438734146522000751, -695350376461229364=5479035516364205453, -1356407922762824927=536244103058094589, -8759945507516006916=-7691606294364721504, -1600730095770316997=5198643672682538868, 1928584781574233931=3233801634595403530, 4818452273885227082=3613752122075445564, 4017617819859316672=-5965638492780984127, -7679610850762262585=8746099621132998817, 3628468721856542322=3234989506915660599, 8041484834072108391=2473403470958482418, -8604773776820158944=4291163489101357389, 5275805877791886609=2224008364377626542, -4383830359476279213=-1613810029185700041, -1641897178290464600=-7937957462653577021, 6730277299577429107=-5906850224979170611, -8115189405407159227=-6807612857900762546, -492827708169915833=-3985992390535713144, 7588127658633797238=7922452426043726560, -3771417496478433111=-1133769369307004904, -5977653414665598802=2226449212304201933, -4289645355525492039=8279576780572003648, 2457695339746807997=-943386263403830945, 7278045793299574088=-3628544844039353718, -2465951973599725012=5019064850597613642}, [16102]]

最後に、3個合計Tupleが溜まったところで平均値を計算して出力しています。

20540 [Thread-23-tumblingavg-executor[5 5]] INFO  o.a.s.d.task - Emitting: tumblingavg default [11112]

この後PrinterBoltにデータ送付しているのですが、PrinterBoltは非常にシンプルなので、ここで触れるのは割愛したいと思います。
さて、上記のような感じでスライディングウィンドウも使えました。設定した閾値に伴い指定された動作をしてくれるので、簡単な仕組みであれば、Stormだけで動作させられそうです。
具体的な関数の整備はこれからのようですが、SlidingWindowSumBoltやTumblingWindowAvgBoltを見ればわかるように、「BaseWindowedBoltを実装してexecuteで計算させる」というシンプルでオーソドックスなAPIになっています。EsperWSO2 CEP(Siddhi)といったCEPプロダクトの関数を実装したことがある人なら、難なく実装できると思います。

Automatic Backpressure

個人的に一番気になっているところです。そもそもBackpressureって何ぞや、というところですが。

Backpressureとは何ぞや?

もともとは半二重接続のハブやスイッチで用いられるフロー制御方式の一つです。機器内の通信バッファがあふれてフローが止まってしまう前に、データ送付側に通知を投げて、送ってくるデータを止めたり、量を調整したりする仕組みのことを言います。

なぜBackpressureがStormに必要か?

Stormがリリースされた際にずっと言われていた問題点として、「Spoutの処理性能がBoltの処理性能を上回っている場合、キューに処理が溜まり続けてTopologyが止まってしまう/遅くなってしまう」という考慮すべき点がありました。Storm自体の処理性能は良いのですが、「データストア用のプロダクトに書き込むBoltの性能が上がらず、キューに溜まる」という事象は「Stormあるある」と言ってよいくらい見かけます。
そのため、Stormの環境を構築する際には、SpoutとBoltの処理性能に気を付ける必要があったわけです。

しかし、今回のBackpressure機構を利用すれば、この問題点を緩和させることが可能になります。今回のBackpressureはTopology単位で以下の設定が可能です。

  1. high-watermarkとlow-watermarkの指定が可能。
  2. キュー内のメッセージ量がhigh-watermarkで指定した比率を上回ったら、Backpressure機能が発生し、Spoutの処理を自動的に遅くする。
  3. キュー内のメッセージ量がlow-watermarkで指定した比率を下回ったら、通常のSpoutの処理に戻る。

こちらに検討中のBackpressureの図が載っているのですが、閾値を検知した時点でZookeeperに通知を飛ばし、Spoutの処理を抑えるような制御をするようです。ただし下記の図は検討中なので、ここからおそらく何らかの変更が加わっているとは思います。公式の発表待ちですね。
https://github.com/apache/storm/pull/700

ただし、解決するパターンと解決できないパターンがきちんと言及されています。

  • 解決するケース:Boltの処理が遅い
  • 解決できないケース:外部システムにアクセスするBoltで、外部システムが止まった場合(「遅い」ではなく、そもそも「処理できない」。こういうケースは、HystrixのようなCircuit Breakerが欲しくなりますね、とStormのissueでも話が出ているようです)

Backpressure利用のための設定値

Automatic Backpressureに関する具体的な設定値は以下で指定可能です。

topology.backpressure.enable: true
backpressure.disruptor.high.watermark: 0.9
backpressure.disruptor.low.watermark: 0.4

まとめ

いくつか特徴的なStormの変更点を確認してきましたが、DevOps・運用面にかなり注目が集まっている時代の中で、Stormもついにそちらに目を向け始めたように見えました。ますます便利になっていくので、目が離せません!


ストリーム王に俺はなる!


Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、AWSHadoop、Storm、NoSQL、Elasticsearch、SpringBoot、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 
データ分析で国内に新規市場を生み出す新サービス開発者WANTED! - Acroquest Technology株式会社の新卒・インターンシップ - Wantedlywww.wantedly.com