読者です 読者をやめる 読者になる 読者になる

Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Stormの新機能、Trident APIって何?(その3

Storm

こんにちは。kimukimuです。

前回に引き続き、Tridentを用いたコードの内容を見ていきます。

前回はSpout部分の内容を確認したので、次はTridentStateの定義部ですね。

3.サンプルのコードを解読します(TridentState)

まず、サンプル内での定義部は下記の通りです。
1行1行何を行っているかを見ていきましょう。

■FixedBatchSpout(初期化部)

TridentState wordCounts = topology
    .newStream("spout1", spout) //★1
    .parallelismHint(16)        //★2
    .each(new Fields("sentence"), new Split(), new Fields("word"))   //★3
    .groupBy(new Fields("word"))                                     //★4
    .persistentAggregate(new MemoryMapState.Factory(), new Count(),  //★5
        new Fields("count")).parallelismHint(16);                    

まず、★1部分は「指定したSpoutを用いてSpout1という名前のストリームを生成」となります。
StreamIDだけ補足されてますが、後はSpoutのフィールド定義などを用いて生成されていますね。
■TridentTopology.java

public Stream newStream(String txId, IBatchSpout spout) {
    Node n = new SpoutNode(getUniqueStreamId(), spout.getOutputFields(), txId, spout, SpoutNode.SpoutType.BATCH);
    return addNode(n);
}

次の★2部は並列度を見ている定義を設定していると思われますが、
残念ながらJava側からではSpoutComponentという構成定義に設定しているまでで、
実際に「並列度の分スレッドが生成されるのか?または分離されて処理されるだけか?」
はわかりませんでした。

★3部は
「各々の[sentence]というフィールドにSplit処理(下記)を適用し、
 結果を[word]というフィールドを持つTupleとして流す」という動作となります。
Splitの定義は下記のため、そのまんまではありますね。

public static class Split extends BaseFunction {
  @Override
  public void execute(TridentTuple tuple, TridentCollector collector) {
    String sentence = tuple.getString(0);
    for (String word : sentence.split(" ")) {
      collector.emit(new Values(word));
    }
  }
}

★4部は「[work]というフィールドをグループ化する」という動作となっています。
こちらも実際にグループ化を行っている処理は階層が深くて追えていません。

★5部は「メモリ上で[word]をCountする」という処理になっています。
実際の定義は下記にありました。
どうやら、1個Tupleが流れてくるごとに生成され、どんどんマージされる処理となっているようですね。
■Count.java

public class Count implements CombinerAggregator<Long> {
    @Override
    public Long init(TridentTuple tuple) {
        return 1L;
    }
    @Override
    public Long combine(Long val1, Long val2) {
        return val1 + val2;
    }
    @Override
    public Long zero() {
        return 0L;
    }
}

総合して、「文章を単語単位に分割して、それをカウントした結果を保存する」となっているようです。
最終的な動作はコードの見た通りではあるのですが^^;

3.サンプルのコードを解読します(DRPCStream)

では、次は保存した情報を利用する側の処理を見てみましょう。

// 定義部
topology.newDRPCStream("words", drpc)
    .each(new Fields("args"), new Split(), new Fields("word"))      //★1
    .groupBy(new Fields("word"))                                    //★2
    .stateQuery(wordCounts, new Fields("word"), new MapGet(),       //★3
        new Fields("count"))                                       
    .each(new Fields("count"), new FilterNull())                    //★4
    .aggregate(new Fields("count"), new Sum(), new Fields("sum"));  //★5

// 実行部
drpc.execute("words", "cat the dog jumped"));                       //★6

見たところ、「★1〜★5で定義した処理部に★6で引数を渡して実行」という構成になっています。

こちらも1行ずつ何を行っているかを見ていきましょう。

★1と★2はTridentStateと同じのため省略します。

★3は、
「保持したwordCountsに対してフィールド[word]を指定してMapGetを行い、
 結果をフィールド[count]に格納して返す」という動作になっているようです。
 実際コードを見てもその通りの内容になっていますね。
■MapGet.java

public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
    @Override
    public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
        return map.multiGet((List) keys);
    }    
    
    @Override
    public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
        collector.emit(new Values(result));
    }    
}

  
★4は「フィールド[count]に対して、FilterNullというフィルタをかける」という動作です。
実際の動作を見ても、「Tupleにnullが含まれていた場合はkeepしない」となっていますね。
■FilterNull.java

public class FilterNull extends BaseFilter {
    @Override
    public boolean isKeep(TridentTuple tuple) {
        for(Object o: tuple) {
            if(o==null) return false;
        }
        return true;
    }
}

★5は「フィールド[count]に対してSum処理を実行して[sum]として返す」動作です。
Sumクラスも名前が示す通りの動作となっています。
■Sum.java

public class Sum implements CombinerAggregator<Number> {
    @Override
    public Number init(TridentTuple tuple) {
        return (Number) tuple.getValue(0);
    }

    @Override
    public Number combine(Number val1, Number val2) {
        return Numbers.add(val1, val2);
    }

    @Override
    public Number zero() {
        return 0;
    }
}

総合して、「引数に指定した文章を単語単位に分割し、単語の出現回数を合計して返す」
となっているようです。

結果が、下記のようになる・・というわけですね。

DRPC RESULT: [[24]]
DRPC RESULT: [[138]]
DRPC RESULT: [[366]]
DRPC RESULT: [[383]]
DRPC RESULT: [[540]]
DRPC RESULT: [[636]]
DRPC RESULT: [[725]]
DRPC RESULT: [[941]]
DRPC RESULT: [[1139]]
DRPC RESULT: [[1308]]
(以下省略)

尚、文章定義から、
「2回バッチを実行すると、theが3個、jumpedが1個取得=4カウント」となります。
10秒間蓄積した結果1308カウント=654バッチとなっていることから、
1秒あたりのバッチ処理数は65.4回。

これを早いと見るか遅いと見るかは・・・アプリケーション次第ですね。
ただ、バッチ1つの単位をもっと大きくすれば早くなりそうではあります。

4.何がうれしいの?

Trident APIを用いることでストリームに対して一律で加工を行っていくことが
可能となった、ということに尽きると思います。

これまでStormはTuple単位の処理しかできないため、
個々のTupleに対してコードを記述する必要がありました。

それがまとめてクエリを書くように処理が可能となったのは大きいと思います。