Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

StormUnitを使ってJUnitでテスト(その1

こんにちは。kimukimuです。

新年おめでとうございます。
今年も「Taste of Tech Topics」ブログをよろしくお願いします。

では、今日のお題です。

1.JUnitでStormのコンポーネントを検証したい!

過去の機能紹介記事で触れたように、Storm0.8.1からStormのテスト用コンポーネントが整備され、
JUnitでStormの機能が一部テスト可能になっています。

ですが、このブログでは触れたことがなかったため、
実際にテストコードを書いて、どこまで使えるのか、気をつけるべき点は何なのかを記述します。

2.テスト対象のコンポーネントは?

今回テスト対象とするコンポーネントは下記のexecuteメソッドを持つBoltです。
・・・つまりは、受信した値を二乗して次のBoltに流すBoltになりますね。

@Override
public void execute(Tuple input)
{
    int inputValue = input.getIntegerByField("Value");
    int resultValue = inputValue * inputValue;
    this.collector.emit(input, new Values(resultValue));
    this.collector.ack(input);
}

上記のコードではexecuteメソッドだけ抜粋して記載していますので、
全ソースを見たい方はSquareBolt.javaを確認してください。

全く関係ないですが上記のソース群は依存性解決をGradleで行うよう改修してあるため、
もし、もの好きな方がいて「ソースを全て落として確認してみるぜ!」となった場合、
面倒ですがGradleでEclipseタスクを走らせてから確認してください。お願いします。

3.実際にテストコードを書いてみる!

Stormが公開している-TestingApiDemo.javaを参考に、まずはテストコードを書いてみます。
すると下記のようになりました。

/**
 * SquareBoltを1段階組んで結果を確認する。<br/>
 * 投入する値は「0、10」
 */
@Test
public void testExecute_SquareBolt1段()
{
    MkClusterParam mkClusterParam = new MkClusterParam();
    Config daemonConf = new Config();
    daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false);
    mkClusterParam.setDaemonConf(daemonConf);

    Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() {
        @Override
        public void run(ILocalCluster cluster)
        {
            // 準備
            // Topology構成を生成
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("SingleIntValueSpout", new SingleIntValueSpout(), 2);
            builder.setBolt("SquareBolt", new SquareBolt(), 2).fieldsGrouping("SingleIntValueSpout",
                    new Fields("Value"));
            StormTopology topology = builder.createTopology();

            // テスト用のデータを生成
            MockedSources mockedSources = new MockedSources();
            mockedSources.addMockData("SingleIntValueSpout", new Values(0), new Values(10));

            // 動作用の設定を生成
            Config conf = new Config();
            CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
            completeTopologyParam.setMockedSources(mockedSources);
            completeTopologyParam.setStormConf(conf);

            // 実施(Topologyを実行)
            Map result = Testing.completeTopology(cluster, topology, completeTopologyParam);

            // 検証
            assertTrue(Testing.multiseteq(new Values(new Values(0), new Values(100)),
                    Testing.readTuples(result, "SquareBolt")));
        }
    });
}

さっそく実行してみると・・・

・・・ええ。思いっきりエラーになっています。

エラー内容は下記でした。
どうやらZookeeperの管理ファイルを削除しようとして失敗しているようですが・・・
さて、困りました。ファイルが存在するのに削除失敗します。
しかも、今まで実行していたファイルが継続して残るので、
実はStormのTopologyを稼働させるごとに1回65MBのゴミファイルが残る
ということです。
なんてこったい!

java.io.IOException: Unable to delete file: C:\Users\kimutansk\AppData\Local\Temp\5c851776-75d6-49ab-9818-707c4679c733\version-2\log.1

・・・ですが、Linuxでは同様の事象は発生しないように見えるため、
Windowsの環境に伴う問題としてとりあえずIOExceptionを無視して進めることにします。

4.きちんと検証できているの?

例外が発生するのはTesting#withSimulatedTimeLocalClusterメソッド実行時のため、
Testing.withSimulatedTimeLocalCluster実行時にIOExceptionが発生した場合は無視するという処理を追加しました。

すると、下記のようにテストコード実行は成功したように見えます。

後は「テストの実行結果が誤っていた場合に検知可能か?」を確認します。

下記のように実際にassertに渡す値を変更して、失敗を検知できるか確認します。

assertTrue(Testing.multiseteq(new Values(new Values(0), new Values(99)),
        Testing.readTuples(result, "SquareBolt")));

すると・・・

なぜか成功してしまいます。
にもかかわらず、コンソールには下記のAssertionErrorが。

java.lang.AssertionError
	at org.junit.Assert.fail(Assert.java:86)
	at org.junit.Assert.assertTrue(Assert.java:41)
	at org.junit.Assert.assertTrue(Assert.java:52)
	at storm.sample.bolt.SquareBoltTest$1.run(SquareBoltTest.java:72)
        (省略)

どうやら、

assertTrueメソッドを実行しているのはJUnit実行を行っているスレッドとは
別スレッドのため、JUnit側では失敗したことを検知できないようです。

そのため、最終的なテストコードは下記のようになりました・・・
ソース全体を確認したい場合は「SquareBoltTest.java」を見てください。

/** Assert確認フラグ。Stormクラスタ実行スレッドの検証が正常終了した場合にtrueにして検証がOKだったかを検知 */
private boolean isAsserted = false;

/**
 * SquareBoltを1段階組んで結果を確認する。<br/>
 * 投入する値は「0、10」
 * @throws Exception 実行失敗時
 */
@Test
public void testExecute_SquareBolt1段() throws Exception
{
    this.isAsserted = false;
    〜〜〜(省略)〜〜〜

    try
    {
        Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() {
            @Override
            public void run(ILocalCluster cluster)
            {
                〜〜〜(省略)〜〜〜
                // 検証
                assertTrue(Testing.multiseteq(new Values(new Values(0), new Values(100)),
                        Testing.readTuples(result, "SquareBolt")));
                // 検証OKだった場合検証OKフラグを設定
                SquareBoltTest.this.isAsserted = true;
            }
        });
    }
    catch (Exception ex)
    {
        // Windows上で実行した場合、ZooKeeperファイル削除に失敗してIOExceptionが発生する。
        // そのため、IOExceptionが発生した場合は無視。
        if ((ex instanceof IOException) == false)
        {
            throw ex;
        }
    }
    
    assertTrue(this.isAsserted);
}

いまいち書き方としては美しくありませんが、これでBoltの検証は可能となりました。
なお、モックデータを準備して流す関係上、Spoutの検証は同じ機構では無理のようです。
StormUnitでは無理なのか、それとも他のAPIがあるのか・・・

その辺りの確認と、後は他の検証が可能かを含めて次回に書きます。

それでは。