少し間が空いてしまいましたが、
Stormのデバッグと、実際に動くソースの確認をしてみましょう。
・・・Storm0.70がリリースされ、Storm-Startarプロジェクトも更新されたので、
今投稿しておかないとずれるという事情はさておきましょう^^;
1.デバッグ実行方法
既に前回環境を構築し、ビルドパスも通してしまったため、
後は実際にソースを実行するだけです。
「storm.starter.ExclamationTopology」クラスを実行しましょう。
ただ、1点注意点が。
StormでZooKeeperのログファイルを起動時に出力し、終了時に終了するのですが、
何故か削除だけ失敗します。
そのため、テンポラリディレクトリ配下に1回実行するたびに65535KBのログが残っていきます・・・
一通り実行して満足したら、後でファイルは削除しておいてください。
2.ソースコードを読んでみます
とりあえず実行とデバッグが出来たので、コードの中で何をやっているか読んでみましょう。
まずは、ExclamationTopologyクラスのmainメソッドです。
■ExclamationTopology#main(String[] args)
public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); // Spoutを並列度10で設定 builder.setSpout("word", new TestWordSpout(), 10); // ExclamationBoltを"word"から情報を受け取る形で設定 builder.setBolt("exclaim1", new ExclamationBolt(), 3) .shuffleGrouping("word"); // ExclamationBoltを"exclaim1"から情報を受け取る形で設定 builder.setBolt("exclaim2", new ExclamationBolt(), 2) .shuffleGrouping("exclaim1"); Config conf = new Config(); conf.setDebug(true); if(args!=null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); } }
つまりは、下記のTopologyを作り、10秒だけローカルクラスタで
実行する旨を宣言しているわけですね。
では、次はデータを生成するSpoutです。
backtype.storm.testing.TestWordSpoutを見てみましょう。
対象メソッドは、実際にデータを生成する「nextTuple」メソッドです。
■TestWordSpout#nextTuple()
public void nextTuple() { Utils.sleep(100); final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"}; final Random rand = new Random(); final String word = words[rand.nextInt(words.length)]; // データを次のBoltに渡す _collector.emit(new Values(word)); }
やっていることは、100msごとに"nathan", "mike", "jackson", "golda", "bertels"という文字列を
ランダムに生成し、次のBoltに渡すのみ、ですね。
最後に、データを処理するBoltです。
ExclamationTopologyの内部クラスExclamationBoltを確認してみましょう。
対象メソッドは実際にデータを処理する「execute(Tuple tuple)」メソッドです。
■ExclamationBolt#execute(Tuple tuple)
@Override public void execute(Tuple tuple) { // データの末尾に「!!!」を付けて次のBoltに渡す _collector.emit(tuple, new Values(tuple.getString(0) + "!!!")); // データの処理が成功した旨を通知 _collector.ack(tuple); }
やっていることは更に単純で、下記の2点です。
- データの末尾に「!!!」を付けて次のBoltに渡す
- データの処理が成功した旨を通知
『上記のSpout→Boltという流れをTopologyで組んで、実行する』
がStormの基本的な流れになります。
基本的な流れは分散システム上で実行しても変わりません。
これで、まずはローカルで実行してみて動作を確認し、
OKならばStormクラスタに対して実行してみる・・・
という流れでStormの開発も可能となりました。
分散環境を構築するのは大変ですが、興味がある方は是非どうぞ^^;