Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

メッセージを処理する度に共通処理を行う方法は?

こんにちは。kimukimuです。

少し宣伝を。
2013年5月18日発売のSoftware Design 2013年6月号に、
「春の嵐吹く,リアルタイム分散処理Storm」という形でStormの記事が掲載されています。

Stormの概要や実際に外部のプロセスと組み合わせてTopologyを動かす方法が載っていますので、
興味がある方はぜひ読んでみてください。

というわけで(?)、本題に入ります。

1.StormのTuple処理に対して共通的に処理を行わせることは出来ないの?

Stormを実際に動かしていて大変なこととして、「どのイベントがどう流れたかがわからない」というものがあります。

一応、Topologyを起動するときにパラメータを指定すればデバッグモードで起動して、
個々のイベントについてもログ出力がされます。
・・・なのですが、Spout/Boltで処理した全イベントに対して一律でログが出力されるため、
量が多すぎて追うこと自体が非常に大変な状態になってしまいます。

元々、Storm自体がストリーム処理ということで秒間数千数万のイベントを処理することを前提としていますので、
全イベントに対して一律ログ出力した場合の量は推して知るべし・・・というわけですね。

そんな困ったことに応えるための機構として、StormにはTaskhookという機構があります。
読んでみると、Tupleを処理するごとにイベントを呼び出すことができるというものの模様。

そのため、まずは今回はTaskhookを動かしてみて動作を確認します。

2.Taskhookの作り方は?

実装方法は比較的単純で、BaseTaskHookクラスを継承して行いたい処理を記述するのみです。

そのため、まずはTuple処理時の内容を全てログ出力するTaskhookを下記のように作成しました。

public class PrintTaskHook extends BaseTaskHook
{
    /** ロガー */
    private static final Logger logger = Logger.getLogger(PrintTaskHook.class);

    /**
     * パラメータを指定せずにインスタンスを生成する。
     */
    public PrintTaskHook()
    {}

    @Override
    public void prepare(Map conf, TopologyContext context)
    {
        logger.warn("TopologyContext=" + context.toJSONString());
    }

    @Override
    public void cleanup()
    {
        logger.warn("cleanuped");
    }

    @Override
    public void emit(EmitInfo info)
    {
        logger.warn("EmitInfo=" + ToStringBuilder.reflectionToString(info, ToStringStyle.SIMPLE_STYLE));
    }

    @Override
    public void spoutAck(SpoutAckInfo info)
    {
        logger.warn("SpoutAckInfo=" + ToStringBuilder.reflectionToString(info, ToStringStyle.SIMPLE_STYLE));
    }

    @Override
    public void spoutFail(SpoutFailInfo info)
    {
        logger.warn("SpoutFailInfo=" + ToStringBuilder.reflectionToString(info, ToStringStyle.SIMPLE_STYLE));
    }

    @Override
    public void boltAck(BoltAckInfo info)
    {
        logger.warn("BoltAckInfo=" + ToStringBuilder.reflectionToString(info, ToStringStyle.SIMPLE_STYLE));
    }

    @Override
    public void boltFail(BoltFailInfo info)
    {
        logger.warn("BoltFailInfo=" + ToStringBuilder.reflectionToString(info, ToStringStyle.SIMPLE_STYLE));
    }

    @Override
    public void boltExecute(BoltExecuteInfo info)
    {
        logger.warn("BoltExecuteInfo=" + ToStringBuilder.reflectionToString(info, ToStringStyle.SIMPLE_STYLE));
    }
}

見ての通り、イベントが発生した際にその時の情報を出力する・・・というものです。
実はTaskhook作成に必要な内容はこれだけです。

後は実際にTopologyに組み込んで動作を確認してみます。

3.Taskhookの設定方法は?

Taskhookを実際に起動するには下記2つの手順が必要です。

  1. TopologyのJarファイルにTaskhookを含める
  2. Stormの設定でTaskhookを登録するよう設定する

1の方はTopologyと同じプロジェクトに入れてビルドを行えば、JarファイルにTaskhookも含まれます。
そのため、2の説明を行いますね。

Stormの設定でTaskhookを登録するにはyamlファイルに下記のように設定を記述します。

topology.auto.task.hooks:
    - "storm.sample.taskhook.PrintTaskHook"

これで、準備は完了。
実際にTopologyを動作させて試してみましょう。

4.Taskhookを実際に動作させてみる

まず、3の設定を行った状態でTopologyを起動します。

すると・・・・Stormのログに下記のように各イベント発生時のログが出力されていることがわかります。

2013-05-20 15:43:36 PrintTaskHook [WARN] TopologyContext={"task->component":{"1":"JudgeBolt","2":"JudgeBolt","3":"JudgeBolt","4":"JudgeBolt","5":"LongWord","6":"LongWord","7":"LongWord","8":"LongWord","9":"ShortWord","10":"ShortWord","11":"ShortWord","12":"ShortWord","13":"WordSpout","14":"WordSpout","15":"WordSpout","17":"__acker","16":"WordSpout"}}
2013-05-20 15:45:22 PrintTaskHook [WARN] EmitInfo=[mike],ShortWord,2,[10]
2013-05-20 15:45:22 PrintTaskHook [WARN] BoltExecuteInfo=source: JudgeBolt:2, stream: ShortWord, id: {}, [mike],10,<null>
2013-05-20 15:45:22 PrintTaskHook [WARN] BoltAckInfo=source: JudgeBolt:2, stream: ShortWord, id: {}, [mike],10,<null>

これでまずTaskhookが起動可能なことと、概要の動作について把握することができました。

5.Taskhookを使うと何が嬉しいの?

今回の結果だけだと単にログを出力するのみで終わりますが、下記のような利点が考えられると思います。

  • 1.デバッグモードと異なり、「特定の条件を満たした場合にログ出力」が可能になる

一律で全出力のデバッグモードと異なり、必要な場合にのみログ出力を行うということが可能になります。
それによってログ出力の負荷を抑制し、必要な情報を出力できるというのが利点としてあげられると思います。

  • 2.ログ出力だけでなくデータの集計/出力も可能となる

ログ出力という固定の処理だけでなく、イベントの結果をどこかに保存しておいて集計したり、加工した上での出力も可能となります。

もし、デバッグモードで使いにくい点があると感じている方は是非使ってみてください。
それでは。