寒さもピークを過ぎ、徐々に暖かくなってきていると感じる日々ですね。
え?まだピークのまま?
・・・寒い日々が続いているので、その辺麻痺しているのかもしれませんね^^;
と、それはさておき、前回に引き続き、StormUnitを用いてどんな検証ができるかを試してみます。
1.JUnitでStormのAckは検証できるの?
Stormには処理完了したメッセージに対して応答を返すAck/Failの仕組みがあります。
今回はそのうちAckの応答を検知できるかについて試してみます。
結論から言うと・・・できるようです。
TestingApiDemo.java
で、AckとFailを検知しているAPIについて記述されています。
そのため、実際に使えることを今回は試してみます。
2.テスト対象は?
テスト対象とするコンポーネントは前回と同じく下記のexecuteメソッドを持つSquareBoltです。
intの値を流す限りackを常時返すはずのコンポーネントになります。
@Override public void execute(Tuple input) { int inputValue = input.getIntegerByField("Value"); int resultValue = inputValue * inputValue; this.collector.emit(input, new Values(resultValue)); this.collector.ack(input); }
3.実際にテストを行うテストコードは?
実際に検証を行うコード(の一部)は下記です。
全ソースが確認したい場合は「SquareBoltTest.java」を確認してください。
// 準備 // Tracker生成 AckTracker tracker = new AckTracker(); FeederSpout spout = new FeederSpout(new Fields("Value")); spout.setAckFailDelegate(tracker); // Topology構成を生成 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("FeederSpout", spout); builder.setBolt("SquareBolt", new SquareBolt()).shuffleGrouping("FeederSpout"); StormTopology topology = builder.createTopology(); TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology); try { cluster.submitTopology("AckTest", new Config(), tracked.getTopology()); } catch (Exception ex) { // 例外が発生した時点で失敗のため、returnで抜ける ex.printStackTrace(); return; } // Tupleを流す spout.feed(new Values(1), 1); Testing.trackedWait(tracked, 1);
前回のテストコードとの主な違いとして下記があります。
- Spoutは専用のFeederSpoutを用いる
メッセージのAck/Failをトラッキングする機能を持つFeederSpoutを用いて確認を行います。
- TopologyをTrackedTopologyでラッピングする
TrackedTopologyという時刻をトレースするTopologyでトラッキングして確認を行います。
- 1Tupleを流すごとにtrackedWaitメソッドを用いて時刻を経過させる
時刻を経過させて実際にackが返ることを確認するようです。
5.上手くいかない・・・
というわけで(?)上手くいきませんでした。
しかも、clojureコードにおける例外発生のため、そう簡単に解析をすることも出来ないようです。
残念ですが、この問題については継続して確認を進めるとします・・・
解決できたらまた報告という形で投稿しますね。