Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Stormの新機能、Trident APIって何?(その2

こんにちは。kimukimuです。

では今回は実際にTridentのサンプルをコードに落として動作を見て、
その上で内容を見ていきます。

1.まずはサンプルを動作させてみます

storm-starterプロジェクトからTridentWordCountを落としてきて動かしてみます。
実際に動かしたソースは下記の通りです。

■TridentWordCount.java

public class TridentWordCount {
  public static class Split extends BaseFunction {
    @Override
    public void execute(TridentTuple tuple, TridentCollector collector) {
      String sentence = tuple.getString(0);
      for (String word : sentence.split(" ")) {
        collector.emit(new Values(word));
      }
    }
  }

  public static StormTopology buildTopology(LocalDRPC drpc) {
    FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
        new Values("the cow jumped over the moon"), new Values(
            "the man went to the store and bought some candy"),
        new Values("four score and seven years ago"), new Values(
            "how many apples can you eat"), new Values(
            "to be or not to be the person"));
    spout.setCycle(true);

    TridentTopology topology = new TridentTopology();
    TridentState wordCounts = topology
        .newStream("spout1", spout)
        .parallelismHint(16)
        .each(new Fields("sentence"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .persistentAggregate(new MemoryMapState.Factory(), new Count(),
            new Fields("count")).parallelismHint(16);

    topology.newDRPCStream("words", drpc)
        .each(new Fields("args"), new Split(), new Fields("word"))
        .groupBy(new Fields("word"))
        .stateQuery(wordCounts, new Fields("word"), new MapGet(),
            new Fields("count"))
        .each(new Fields("count"), new FilterNull())
        .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

    return topology.build();
  }

  public static void main(String[] args) throws Exception {
    Config conf = new Config();
    conf.setMaxSpoutPending(20);
    if (args.length == 0) {
      LocalDRPC drpc = new LocalDRPC();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("wordCounter", conf, buildTopology(drpc));
      for (int i = 0; i < 100; i++) {
        System.out.println("DRPC RESULT: "
            + drpc.execute("words", "cat the dog jumped"));
        Thread.sleep(1000);
      }
    } else {
      conf.setNumWorkers(3);
      StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
    }
  }
}

で、動かした結果は下記のようになりました。

DRPC RESULT: [[24]]
DRPC RESULT: [[138]]
DRPC RESULT: [[366]]
DRPC RESULT: [[383]]
DRPC RESULT: [[540]]
DRPC RESULT: [[636]]
DRPC RESULT: [[725]]
DRPC RESULT: [[941]]
DRPC RESULT: [[1139]]
DRPC RESULT: [[1308]]
(以下省略)

徐々にカウントが増えていっているのがわかりますね。

2.サンプルのコードを解読します(Spout)

では、実際にサンプルとなっているコードがどんな動作となっているかを解読します。
まずは、文章を生成するSpoutから。関連ソース部分は下記です。
■FixedBatchSpout(初期化部)

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
    new Values("the cow jumped over the moon"), new Values(
        "the man went to the store and bought some candy"),
    new Values("four score and seven years ago"), new Values(
        "how many apples can you eat"), new Values(
        "to be or not to be the person"));
spout.setCycle(true);

■FixedBatchSpout.java(動作制御部)

public FixedBatchSpout(Fields fields, int maxBatchSize, List<Object>... outputs) {
    this.fields = fields;
    this.outputs = outputs;
    this.maxBatchSize = maxBatchSize;
}

// バッチ呼び出し部。1回バッチを読みだす際に3個の文章Valueを取得
@Override
public void emitBatch(long batchId, TridentCollector collector) {
    if(index>=outputs.length && cycle) {
     // cycleが設定されている場合、outputsを最後まで読んだら再度最初から読み直す
        index = 0;
    }

    for(int i=0; index < outputs.length && i < maxBatchSize; index++, i++) {
    // maxBatchSizeを満たすか、outputsを読みきるまでデータを読み、emit
        collector.emit(outputs[index]);
    }
}

・・・つまり、1回バッチ処理が走るごとに3個の文章を保持するメッセージを流し、
バッチ処理が繰り返されることでカウントが進んでいく・・・という形になるようです。

と、この先はTridentとTopologyとDRPCが混ざってくるので一度切ります。
実際のStream化/集計処理の内部実装確認は次回に。