Stormの次バージョン(0.8.2)の開発を楽しみに待っていますが、
いつの間にか0.8.2の開発版のビルド番号が16まで行っていました(ーー;
0.8.2はGUIもパワーアップしているので楽しみです。
・・・そんなわけで、どこかこのあたりで一度リリースしてくれることを期待する今日この頃です^^;
とまぁ、それはさておき、今日のお題です。
1.StormのWikiに書いてありそうで書いていない肝心なこと
これまで何回かStormについて投稿してきましたが、
肝心なことを書いていなかったことに気付きました。
それは・・・
「Stormって条件でTupleを送信するBoltを分岐できるの?」
です。
StormのWikiを見てみても、下のようなSpoutとBoltの接続図は書いてありますが、
条件に関する記述はありません。
下の図からわかることは下記の2点であり、分岐できるか否かについてはわからないんですよね。
1.Spoutは複数種類のBoltにTupleを送信できる(Boltもできそう)
2.実際に分岐が出来るのかやってみる
結論がわかっていないとこの先進みにくいため書いてしまいますが、
実際に分岐させることは可能でした。
実現方法は一言で言うと、
「Boltから条件に応じて異なるStreamにTupleを流す」です。
StormではStreamというTupleフローを定義し、
TupleフローをBoltが読みこむことでBoltがTupleを受信します。
Streamは単なるフローのため、Stream上で処理を分岐させることはできません。
ならば、Streamを複数作ったら分岐もできるだろう・・・ということで、
Bolt側で複数のStreamを定義し、イベントの送付分けを行う形で分岐が可能でした。
実際のコードの一部と、実行結果を示しますね。
■JudgeBoltの分岐処理
@Override public void execute(Tuple tuple) { String word = tuple.getString(0); if (word.length() <= 5) { System.out.println("JudgeBolt_" + this.index_ + "ToShortWord:" + word); // 短い単語用のStreamに流す this.collector_.emit("ShortWord", new Values(word)); } else { System.out.println("JudgeBolt_" + this.index_ + "ToLongWord:" + word); // 長い単語用のStreamに流す this.collector_.emit("LongWord", new Values(word)); } this.collector_.ack(tuple); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { // 分岐用の2個のStremを定義 declarer.declareStream("ShortWord", new Fields("word")); declarer.declareStream("LongWord", new Fields("word")); }
■Topology構築時の定義
builder.setSpout("WordSpout", new TestWordSpout(), 2); builder.setBolt("JudgeBolt", new JudgeBolt(), 5).fieldsGrouping( "WordSpout", new Fields("word")); // ShortWordのStreamを読み込むよう定義 builder.setBolt("ShortWord", new ShortWordBolt(), 5).fieldsGrouping( "JudgeBolt", "ShortWord", new Fields("word")); // LongWordのStreamを読み込むよう定義 builder.setBolt("LongWord", new LongWordBolt(), 5).fieldsGrouping( "JudgeBolt", "LongWord", new Fields("word"));
■実行結果(JudgeBolt)
JudgeBolt_3ToShortWord:mike JudgeBolt_3ToLongWord:bertels JudgeBolt_3ToLongWord:jackson JudgeBolt_4ToLongWord:nathan JudgeBolt_3ToLongWord:jackson JudgeBolt_0ToShortWord:golda JudgeBolt_3ToLongWord:bertels JudgeBolt_3ToLongWord:bertels JudgeBolt_3ToLongWord:jackson JudgeBolt_3ToLongWord:bertels JudgeBolt_3ToLongWord:jackson JudgeBolt_3ToLongWord:bertels JudgeBolt_3ToLongWord:jackson JudgeBolt_0ToShortWord:golda JudgeBolt_3ToLongWord:jackson
■実行結果(ShortWordBolt)
ShortWordBolt_3:mike ShortWordBolt_0:golda ShortWordBolt_0:golda
■実行結果(LongWordBolt)
LongWordBolt_3:bertels LongWordBolt_3:jackson LongWordBolt_4:nathan LongWordBolt_3:jackson LongWordBolt_3:bertels LongWordBolt_3:bertels LongWordBolt_3:jackson LongWordBolt_3:bertels LongWordBolt_3:jackson LongWordBolt_3:bertels LongWordBolt_3:jackson LongWordBolt_3:jackson
きちんとShort側では短い単語だけ割り振られ、
Long側には長い単語だけ割り振られていますね。
かつ、同じ単語は同じBoltで処理されているため、グルーピングも適用されているようです。
というわけで、
下記の2点ができることがわかりました。
1.Tupleの内容に応じて次のBoltを分岐させることが可能なこと
2.Boltを分岐させた場合でもFieldGroupingは適用可能なこと
尚、全コードについては(DecisionTestTopology)を参照してください。
3.で、わかって何が嬉しいんだっけ?
StormがTupleを分岐して流すことが分かったことで、
Stormが取得したイベントやデータに応じて処理を分けられることがわかりました。
1でも書いたとおり、
1Topologyで複数のイベントに対して異なる処理を適用できるわけですね。
そんなわけで、Stormを使ってみようかと考えている皆さん、
条件分岐は出来るよ、という前提で検討を進めてみてください。
それでは。