読者です 読者をやめる 読者になる 読者になる

Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

StormUnitを使ってJUnitでテスト(その2

Storm

こんにちは。kimukimuです。

寒さもピークを過ぎ、徐々に暖かくなってきていると感じる日々ですね。

え?まだピークのまま?
・・・寒い日々が続いているので、その辺麻痺しているのかもしれませんね^^;

と、それはさておき、前回に引き続き、StormUnitを用いてどんな検証ができるかを試してみます。

1.JUnitでStormのAckは検証できるの?

Stormには処理完了したメッセージに対して応答を返すAck/Failの仕組みがあります。
今回はそのうちAckの応答を検知できるかについて試してみます。

結論から言うと・・・できるようです。
TestingApiDemo.java
で、AckとFailを検知しているAPIについて記述されています。

そのため、実際に使えることを今回は試してみます。

2.テスト対象は?

テスト対象とするコンポーネントは前回と同じく下記のexecuteメソッドを持つSquareBoltです。
intの値を流す限りackを常時返すはずのコンポーネントになります。

@Override
public void execute(Tuple input)
{
    int inputValue = input.getIntegerByField("Value");
    int resultValue = inputValue * inputValue;
    this.collector.emit(input, new Values(resultValue));
    this.collector.ack(input);
}

3.実際にテストを行うテストコードは?

実際に検証を行うコード(の一部)は下記です。
全ソースが確認したい場合は「SquareBoltTest.java」を確認してください。

// 準備
// Tracker生成
AckTracker tracker = new AckTracker();
FeederSpout spout = new FeederSpout(new Fields("Value"));
spout.setAckFailDelegate(tracker);

// Topology構成を生成
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("FeederSpout", spout);
builder.setBolt("SquareBolt", new SquareBolt()).shuffleGrouping("FeederSpout");
StormTopology topology = builder.createTopology();
TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology);

try
{
    cluster.submitTopology("AckTest", new Config(), tracked.getTopology());
}
catch (Exception ex)
{
    // 例外が発生した時点で失敗のため、returnで抜ける
    ex.printStackTrace();
    return;
}

// Tupleを流す
spout.feed(new Values(1), 1);
Testing.trackedWait(tracked, 1);

前回のテストコードとの主な違いとして下記があります。

  • Spoutは専用のFeederSpoutを用いる

メッセージのAck/Failをトラッキングする機能を持つFeederSpoutを用いて確認を行います。

  • TopologyをTrackedTopologyでラッピングする

TrackedTopologyという時刻をトレースするTopologyでトラッキングして確認を行います。

  • 1Tupleを流すごとにtrackedWaitメソッドを用いて時刻を経過させる

時刻を経過させて実際にackが返ることを確認するようです。

4.実際に実行してみると?

では実際に実行してみます。
実行してみると・・・・

むむ。失敗してしまいました。
しかも、clojureにおけるメソッド呼び出しで失敗しているため、ぱっとデバッグを行うこともできないようです・・・

5.上手くいかない・・・

というわけで(?)上手くいきませんでした。
しかも、clojureコードにおける例外発生のため、そう簡単に解析をすることも出来ないようです。

残念ですが、この問題については継続して確認を進めるとします・・・
解決できたらまた報告という形で投稿しますね。