新年おめでとうございます。
今年も「Taste of Tech Topics」ブログをよろしくお願いします。
では、今日のお題です。
1.JUnitでStormのコンポーネントを検証したい!
過去の機能紹介記事で触れたように、Storm0.8.1からStormのテスト用コンポーネントが整備され、
JUnitでStormの機能が一部テスト可能になっています。
ですが、このブログでは触れたことがなかったため、
実際にテストコードを書いて、どこまで使えるのか、気をつけるべき点は何なのかを記述します。
2.テスト対象のコンポーネントは?
今回テスト対象とするコンポーネントは下記のexecuteメソッドを持つBoltです。
・・・つまりは、受信した値を二乗して次のBoltに流すBoltになりますね。
@Override public void execute(Tuple input) { int inputValue = input.getIntegerByField("Value"); int resultValue = inputValue * inputValue; this.collector.emit(input, new Values(resultValue)); this.collector.ack(input); }
上記のコードではexecuteメソッドだけ抜粋して記載していますので、
全ソースを見たい方はSquareBolt.javaを確認してください。
全く関係ないですが上記のソース群は依存性解決をGradleで行うよう改修してあるため、
もし、もの好きな方がいて「ソースを全て落として確認してみるぜ!」となった場合、
面倒ですがGradleでEclipseタスクを走らせてから確認してください。お願いします。
3.実際にテストコードを書いてみる!
Stormが公開している-TestingApiDemo.javaを参考に、まずはテストコードを書いてみます。
すると下記のようになりました。
/** * SquareBoltを1段階組んで結果を確認する。<br/> * 投入する値は「0、10」 */ @Test public void testExecute_SquareBolt1段() { MkClusterParam mkClusterParam = new MkClusterParam(); Config daemonConf = new Config(); daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false); mkClusterParam.setDaemonConf(daemonConf); Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() { @Override public void run(ILocalCluster cluster) { // 準備 // Topology構成を生成 TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("SingleIntValueSpout", new SingleIntValueSpout(), 2); builder.setBolt("SquareBolt", new SquareBolt(), 2).fieldsGrouping("SingleIntValueSpout", new Fields("Value")); StormTopology topology = builder.createTopology(); // テスト用のデータを生成 MockedSources mockedSources = new MockedSources(); mockedSources.addMockData("SingleIntValueSpout", new Values(0), new Values(10)); // 動作用の設定を生成 Config conf = new Config(); CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam(); completeTopologyParam.setMockedSources(mockedSources); completeTopologyParam.setStormConf(conf); // 実施(Topologyを実行) Map result = Testing.completeTopology(cluster, topology, completeTopologyParam); // 検証 assertTrue(Testing.multiseteq(new Values(new Values(0), new Values(100)), Testing.readTuples(result, "SquareBolt"))); } }); }
・・・ええ。思いっきりエラーになっています。
エラー内容は下記でした。
どうやらZookeeperの管理ファイルを削除しようとして失敗しているようですが・・・
さて、困りました。ファイルが存在するのに削除失敗します。
しかも、今まで実行していたファイルが継続して残るので、
実はStormのTopologyを稼働させるごとに1回65MBのゴミファイルが残る
ということです。
なんてこったい!
java.io.IOException: Unable to delete file: C:\Users\kimutansk\AppData\Local\Temp\5c851776-75d6-49ab-9818-707c4679c733\version-2\log.1
・・・ですが、Linuxでは同様の事象は発生しないように見えるため、
Windowsの環境に伴う問題としてとりあえずIOExceptionを無視して進めることにします。
4.きちんと検証できているの?
例外が発生するのはTesting#withSimulatedTimeLocalClusterメソッド実行時のため、
Testing.withSimulatedTimeLocalCluster実行時にIOExceptionが発生した場合は無視するという処理を追加しました。
すると、下記のようにテストコード実行は成功したように見えます。
後は「テストの実行結果が誤っていた場合に検知可能か?」を確認します。
下記のように実際にassertに渡す値を変更して、失敗を検知できるか確認します。
assertTrue(Testing.multiseteq(new Values(new Values(0), new Values(99)), Testing.readTuples(result, "SquareBolt")));
すると・・・
なぜか成功してしまいます。
にもかかわらず、コンソールには下記のAssertionErrorが。
java.lang.AssertionError at org.junit.Assert.fail(Assert.java:86) at org.junit.Assert.assertTrue(Assert.java:41) at org.junit.Assert.assertTrue(Assert.java:52) at storm.sample.bolt.SquareBoltTest$1.run(SquareBoltTest.java:72) (省略)
どうやら、
assertTrueメソッドを実行しているのはJUnit実行を行っているスレッドとは
別スレッドのため、JUnit側では失敗したことを検知できないようです。
そのため、最終的なテストコードは下記のようになりました・・・
ソース全体を確認したい場合は「SquareBoltTest.java」を見てください。
/** Assert確認フラグ。Stormクラスタ実行スレッドの検証が正常終了した場合にtrueにして検証がOKだったかを検知 */ private boolean isAsserted = false; /** * SquareBoltを1段階組んで結果を確認する。<br/> * 投入する値は「0、10」 * @throws Exception 実行失敗時 */ @Test public void testExecute_SquareBolt1段() throws Exception { this.isAsserted = false; 〜〜〜(省略)〜〜〜 try { Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() { @Override public void run(ILocalCluster cluster) { 〜〜〜(省略)〜〜〜 // 検証 assertTrue(Testing.multiseteq(new Values(new Values(0), new Values(100)), Testing.readTuples(result, "SquareBolt"))); // 検証OKだった場合検証OKフラグを設定 SquareBoltTest.this.isAsserted = true; } }); } catch (Exception ex) { // Windows上で実行した場合、ZooKeeperファイル削除に失敗してIOExceptionが発生する。 // そのため、IOExceptionが発生した場合は無視。 if ((ex instanceof IOException) == false) { throw ex; } } assertTrue(this.isAsserted); }
いまいち書き方としては美しくありませんが、これでBoltの検証は可能となりました。
なお、モックデータを準備して流す関係上、Spoutの検証は同じ機構では無理のようです。
StormUnitでは無理なのか、それとも他のAPIがあるのか・・・
その辺りの確認と、後は他の検証が可能かを含めて次回に書きます。
それでは。