読者です 読者をやめる 読者になる 読者になる

Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Stormの新機能、Trident APIって何?(その1

Storm

こんにちは。kimukimuです。

前回はStorm環境構築用のRPMをまとめよう・・・
と考えていましたが、Stormの0.8.0が2012/08/03に正式リリースとなり、
Trident APIという新しいAPI群が追加されちゃいました^^;

なので、今回以降はTrident APIについて調べて書いていこうと思います。

1.Trident とは?

TridentはStormのリアルタイムコンピューティングの新たな抽象概念です。
Tridentを用いることにより、高スループット(百万イベント/秒オーバー)な処理と状態を持つストリーム処理を融合できます。

言うなれば、TridentはStorm上におけるPig or Cascadingと同じ位置づけで、コンセプトも共通しています。
Tridentはジョイン、統合、グルーピング、フィルタリング等をサポートします。
加えて、Tridentは基本的な状態管理の仕組みを提供し、
データベース等の永続化層と同期したインクリメンタルな処理も可能とします。

Tridentは既存のStormとは異なるTridentTopologyを提供します。

これだけ見ると凄い夢が見れそうな内容ですよね。
では、続きを見てみます。

2.Tridentの適用例

Tridentの適用サンプルとしてワードカウントを行うクエリが載っていましたので、
そちらを確認します。

まず、サンプルでは下記のように4種類の文章を無限に生成し続けるSpoutを定義しています。

FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
               new Values("the cow jumped over the moon"),
               new Values("the man went to the store and bought some candy"),
               new Values("four score and seven years ago"),
               new Values("how many apples can you eat"),
spout.setCycle(true);

上記のSpoutを処理するTopology構成が下記になります。

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

これでやっていることが何とな〜く、わかりますね。
ちなみに、実運用においてはSpoutは当然ながら単に文を生成するのではなく、
KestrelやKafkaのようなキューから取得し続ける・・・という形になります。

Tridentは各インプットごとのメタデータをZooKeeperに保存する形で構成を管理するようです。
上記の例ですと、「spout1」というキーに紐づける形でSpoutを管理します。

TridentはSpoutから取得したデータを1つ1つ処理するのではなく、
下記のようにいくつかのデータをまとめてバッチとして処理する形を取るようです。

ちなみに上記の図では2〜5個のデータを1バッチとして扱っていますが、
実際は数千〜数百万のデータをまとめて1バッチとして処理するようです。
そのため、これまでのStormのように小さいデータ各々を高速に処理するというものとは
使い方が微妙に変わってくるように見えますね。

その上で、Tridentは数千〜数百万データのバッチに対して
HadoopのPigやCascadingのような抽象バッチ機能を提供します。

・・・と言いましたが、当然ながらバッチ間を跨いだ情報管理も可能となっており、
Memory、Memcached、Cassandra等に情報を保持して全バッチの集計も可能です。

実例に戻ります。
ワードカウント例においては、Spoutから取得した文章を下記のようにSplitという形で分割します。

public class Split extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
       String sentence = tuple.getString(0);
       for(String word: sentence.split(" ")) {
           collector.emit(new Values(word));                
       }
   }
}

内容は取得した文章(sentence)を単語ごとに分割し、1単語につき1イベントという形に変換する形になります。
実際コードは非常にシンプルですね。

その後、Topologyの下記の記述によって分割された単語はグルーピングされます。

       .groupBy(new Fields("word"))

それ以降のTopologyの記述は状態を保存するための記述となります。
word毎にグルーピングされた結果がCountによって統合され保存されます。
ここにあるpersistentAggregateはデータソースと更新方法を保持し、統合を行います。

統合した結果は下記のように「new MemoryMapState.Factory()」の記述を変更し、
serverLocations(Memcachedのホスト/ポート)を指定することで他データストアに保存することも可能です。

.persistentAggregate(MemcachedState.transactional(serverLocations), new Count(), new Fields("count"))        
MemcachedState.transactional()

Tridentは上記の機能に加えて、耐障害性と1回だけ処理されることの保障を提供するようです。
「1回は必ず処理することを保障する」というStormの基本のTopologyとは異なり、
LinearDRPCTopologyやTransactionalTopologyに近い位置づけのようですね。

・・・と、こんな感じに実現例が記述されていました。

次回はまずはこの基本例を実際にコードとして実現し、どう動くのかを確認してみますね。