読者です 読者をやめる 読者になる 読者になる

Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Twitter Stormのデバッグをするには?(その2

Storm


こんばんは。kimukimuです。

少し間が空いてしまいましたが、
Stormのデバッグと、実際に動くソースの確認をしてみましょう。

・・・Storm0.70がリリースされ、Storm-Startarプロジェクトも更新されたので、
今投稿しておかないとずれるという事情はさておきましょう^^;

1.デバッグ実行方法

既に前回環境を構築し、ビルドパスも通してしまったため、
後は実際にソースを実行するだけです。

「storm.starter.ExclamationTopology」クラスを実行しましょう。

すると、下記のように実行ログがコンソールに表示されます。

ただ、1点注意点が。
StormでZooKeeperのログファイルを起動時に出力し、終了時に終了するのですが、
何故か削除だけ失敗します。
そのため、テンポラリディレクトリ配下に1回実行するたびに65535KBのログが残っていきます・・・

一通り実行して満足したら、後でファイルは削除しておいてください。

Eclipseから実行できるため、ブレークポイントを仕掛けておけば、
当然デバッグ実行も可能となります。

こんな風に。

2.ソースコードを読んでみます

とりあえず実行とデバッグが出来たので、コードの中で何をやっているか読んでみましょう。
まずは、ExclamationTopologyクラスのmainメソッドです。

■ExclamationTopology#main(String[] args)

  public static void main(String[] args) throws Exception 
{
    TopologyBuilder builder = new TopologyBuilder();
    
    // Spoutを並列度10で設定
    builder.setSpout("word", new TestWordSpout(), 10);
    // ExclamationBoltを"word"から情報を受け取る形で設定
    builder.setBolt("exclaim1", new ExclamationBolt(), 3)
        .shuffleGrouping("word");
    // ExclamationBoltを"exclaim1"から情報を受け取る形で設定
    builder.setBolt("exclaim2", new ExclamationBolt(), 2)
        .shuffleGrouping("exclaim1");
        
    Config conf = new Config();
    conf.setDebug(true);
    
    if(args!=null && args.length > 0) {
      conf.setNumWorkers(3);
      
      StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
    } else {
    
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("test", conf, builder.createTopology());
      Utils.sleep(10000);
      cluster.killTopology("test");
      cluster.shutdown();  
    }
  }

つまりは、下記のTopologyを作り、10秒だけローカルクラスタ
実行する旨を宣言しているわけですね。

では、次はデータを生成するSpoutです。
backtype.storm.testing.TestWordSpoutを見てみましょう。
対象メソッドは、実際にデータを生成する「nextTuple」メソッドです。

■TestWordSpout#nextTuple()

public void nextTuple() 
{
    Utils.sleep(100);
    final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
    final Random rand = new Random();
    final String word = words[rand.nextInt(words.length)];
    // データを次のBoltに渡す
    _collector.emit(new Values(word));
}

やっていることは、100msごとに"nathan", "mike", "jackson", "golda", "bertels"という文字列を
ランダムに生成し、次のBoltに渡すのみ、ですね。

最後に、データを処理するBoltです。
ExclamationTopologyの内部クラスExclamationBoltを確認してみましょう。
対象メソッドは実際にデータを処理する「execute(Tuple tuple)」メソッドです。

■ExclamationBolt#execute(Tuple tuple)

@Override
public void execute(Tuple tuple) 
{
    // データの末尾に「!!!」を付けて次のBoltに渡す
    _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
    // データの処理が成功した旨を通知
    _collector.ack(tuple);
}

やっていることは更に単純で、下記の2点です。

  • データの末尾に「!!!」を付けて次のBoltに渡す
  • データの処理が成功した旨を通知

『上記のSpout→Boltという流れをTopologyで組んで、実行する』
がStormの基本的な流れになります。

基本的な流れは分散システム上で実行しても変わりません。

これで、まずはローカルで実行してみて動作を確認し、
OKならばStormクラスタに対して実行してみる・・・
という流れでStormの開発も可能となりました。

分散環境を構築するのは大変ですが、興味がある方は是非どうぞ^^;