読者です 読者をやめる 読者になる 読者になる

Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Stormって条件でBoltを分岐させることってできるの?

Storm

こんにちは。kimukimuです。

Stormの次バージョン(0.8.2)の開発を楽しみに待っていますが、
いつの間にか0.8.2の開発版のビルド番号が16まで行っていました(ーー;

0.8.2はGUIもパワーアップしているので楽しみです。
・・・そんなわけで、どこかこのあたりで一度リリースしてくれることを期待する今日この頃です^^;

とまぁ、それはさておき、今日のお題です。

1.StormのWikiに書いてありそうで書いていない肝心なこと

これまで何回かStormについて投稿してきましたが、
肝心なことを書いていなかったことに気付きました。

それは・・・
「Stormって条件でTupleを送信するBoltを分岐できるの?」
です。

StormのWikiを見てみても、下のようなSpoutとBoltの接続図は書いてありますが、
条件に関する記述はありません。

下の図からわかることは下記の2点であり、分岐できるか否かについてはわからないんですよね。

1.Spoutは複数種類のBoltにTupleを送信できる(Boltもできそう)
2.Boltは複数のSpout/BoltからTupleを受信することができる

もし、分岐可能なのであれば1つのTopologyで複数のイベント種別をハンドリング可能となります。
1か所で一気に複数のイベントを受信して、対応するBoltに分配して処理・・・ができるわけですね。

各々別のTopologyを定義すればいいじゃないか、という突っ込みはあります。
ですが、その場合1つのメッセージ取得元に複数のイベントが混ざるケースであれば
事前に分ける必要があるなどして、それはそれで面倒ですので。

というわけで、
下記のようなBoltの条件分岐が可能なのか、
実際のコードを元に試してみます!

2.実際に分岐が出来るのかやってみる

結論がわかっていないとこの先進みにくいため書いてしまいますが、
実際に分岐させることは可能でした。

実現方法は一言で言うと、
「Boltから条件に応じて異なるStreamにTupleを流す」です。

StormではStreamというTupleフローを定義し、
TupleフローをBoltが読みこむことでBoltがTupleを受信します。

Streamは単なるフローのため、Stream上で処理を分岐させることはできません。
ならば、Streamを複数作ったら分岐もできるだろう・・・ということで、
Bolt側で複数のStreamを定義し、イベントの送付分けを行う形で分岐が可能でした。

実際のコードの一部と、実行結果を示しますね。

■JudgeBoltの分岐処理
@Override
public void execute(Tuple tuple)
{
    String word = tuple.getString(0);

    if (word.length() <= 5)
    {
        System.out.println("JudgeBolt_" + this.index_ + "ToShortWord:"
                + word);
        // 短い単語用のStreamに流す
        this.collector_.emit("ShortWord", new Values(word));
    }
    else
    {
        System.out.println("JudgeBolt_" + this.index_ + "ToLongWord:"
                + word);
        // 長い単語用のStreamに流す
        this.collector_.emit("LongWord", new Values(word));
    }

    this.collector_.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
    // 分岐用の2個のStremを定義
    declarer.declareStream("ShortWord", new Fields("word"));
    declarer.declareStream("LongWord", new Fields("word"));
}
■Topology構築時の定義
builder.setSpout("WordSpout", new TestWordSpout(), 2);
builder.setBolt("JudgeBolt", new JudgeBolt(), 5).fieldsGrouping(
        "WordSpout", new Fields("word"));

// ShortWordのStreamを読み込むよう定義
builder.setBolt("ShortWord", new ShortWordBolt(), 5).fieldsGrouping(
        "JudgeBolt", "ShortWord", new Fields("word"));

// LongWordのStreamを読み込むよう定義
builder.setBolt("LongWord", new LongWordBolt(), 5).fieldsGrouping(
        "JudgeBolt", "LongWord", new Fields("word"));
■実行結果(JudgeBolt)
JudgeBolt_3ToShortWord:mike
JudgeBolt_3ToLongWord:bertels
JudgeBolt_3ToLongWord:jackson
JudgeBolt_4ToLongWord:nathan
JudgeBolt_3ToLongWord:jackson
JudgeBolt_0ToShortWord:golda
JudgeBolt_3ToLongWord:bertels
JudgeBolt_3ToLongWord:bertels
JudgeBolt_3ToLongWord:jackson
JudgeBolt_3ToLongWord:bertels
JudgeBolt_3ToLongWord:jackson
JudgeBolt_3ToLongWord:bertels
JudgeBolt_3ToLongWord:jackson
JudgeBolt_0ToShortWord:golda
JudgeBolt_3ToLongWord:jackson
■実行結果(ShortWordBolt)
ShortWordBolt_3:mike
ShortWordBolt_0:golda
ShortWordBolt_0:golda
■実行結果(LongWordBolt)
LongWordBolt_3:bertels
LongWordBolt_3:jackson
LongWordBolt_4:nathan
LongWordBolt_3:jackson
LongWordBolt_3:bertels
LongWordBolt_3:bertels
LongWordBolt_3:jackson
LongWordBolt_3:bertels
LongWordBolt_3:jackson
LongWordBolt_3:bertels
LongWordBolt_3:jackson
LongWordBolt_3:jackson

きちんとShort側では短い単語だけ割り振られ、
Long側には長い単語だけ割り振られていますね。
かつ、同じ単語は同じBoltで処理されているため、グルーピングも適用されているようです。

というわけで、
下記の2点ができることがわかりました。

1.Tupleの内容に応じて次のBoltを分岐させることが可能なこと
2.Boltを分岐させた場合でもFieldGroupingは適用可能なこと

尚、全コードについては(DecisionTestTopology)を参照してください。

3.で、わかって何が嬉しいんだっけ?

StormがTupleを分岐して流すことが分かったことで、
Stormが取得したイベントやデータに応じて処理を分けられることがわかりました。

1でも書いたとおり、
1Topologyで複数のイベントに対して異なる処理を適用できるわけですね。

そんなわけで、Stormを使ってみようかと考えている皆さん、
条件分岐は出来るよ、という前提で検討を進めてみてください。

それでは。