Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Storm簡単インストーラにStorm0.8.2対応版を追加しました!

こんにちは。kimukimuです。

Stormについてお知らせを。

Storm簡単インストーラにStorm0.8.2対応版を追加しました!
https://github.com/acromusashi/storm-installer
https://github.com/acromusashi/storm-installer/wiki/Download

これでより使いやすくなったStorm0.8.2を簡単にインストールすることができます。

今までインストールの手間がかかるからと遠慮していた方も、
既にインストールしていて、0.8.2に更新したい方も、
是非使ってください^^

StormUnitを使ってJUnitでテスト(その2

こんにちは。kimukimuです。

寒さもピークを過ぎ、徐々に暖かくなってきていると感じる日々ですね。

え?まだピークのまま?
・・・寒い日々が続いているので、その辺麻痺しているのかもしれませんね^^;

と、それはさておき、前回に引き続き、StormUnitを用いてどんな検証ができるかを試してみます。

1.JUnitでStormのAckは検証できるの?

Stormには処理完了したメッセージに対して応答を返すAck/Failの仕組みがあります。
今回はそのうちAckの応答を検知できるかについて試してみます。

結論から言うと・・・できるようです。
TestingApiDemo.java
で、AckとFailを検知しているAPIについて記述されています。

そのため、実際に使えることを今回は試してみます。

2.テスト対象は?

テスト対象とするコンポーネントは前回と同じく下記のexecuteメソッドを持つSquareBoltです。
intの値を流す限りackを常時返すはずのコンポーネントになります。

@Override
public void execute(Tuple input)
{
    int inputValue = input.getIntegerByField("Value");
    int resultValue = inputValue * inputValue;
    this.collector.emit(input, new Values(resultValue));
    this.collector.ack(input);
}

3.実際にテストを行うテストコードは?

実際に検証を行うコード(の一部)は下記です。
全ソースが確認したい場合は「SquareBoltTest.java」を確認してください。

// 準備
// Tracker生成
AckTracker tracker = new AckTracker();
FeederSpout spout = new FeederSpout(new Fields("Value"));
spout.setAckFailDelegate(tracker);

// Topology構成を生成
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("FeederSpout", spout);
builder.setBolt("SquareBolt", new SquareBolt()).shuffleGrouping("FeederSpout");
StormTopology topology = builder.createTopology();
TrackedTopology tracked = Testing.mkTrackedTopology(cluster, topology);

try
{
    cluster.submitTopology("AckTest", new Config(), tracked.getTopology());
}
catch (Exception ex)
{
    // 例外が発生した時点で失敗のため、returnで抜ける
    ex.printStackTrace();
    return;
}

// Tupleを流す
spout.feed(new Values(1), 1);
Testing.trackedWait(tracked, 1);

前回のテストコードとの主な違いとして下記があります。

  • Spoutは専用のFeederSpoutを用いる

メッセージのAck/Failをトラッキングする機能を持つFeederSpoutを用いて確認を行います。

  • TopologyをTrackedTopologyでラッピングする

TrackedTopologyという時刻をトレースするTopologyでトラッキングして確認を行います。

  • 1Tupleを流すごとにtrackedWaitメソッドを用いて時刻を経過させる

時刻を経過させて実際にackが返ることを確認するようです。

4.実際に実行してみると?

では実際に実行してみます。
実行してみると・・・・

むむ。失敗してしまいました。
しかも、clojureにおけるメソッド呼び出しで失敗しているため、ぱっとデバッグを行うこともできないようです・・・

5.上手くいかない・・・

というわけで(?)上手くいきませんでした。
しかも、clojureコードにおける例外発生のため、そう簡単に解析をすることも出来ないようです。

残念ですが、この問題については継続して確認を進めるとします・・・
解決できたらまた報告という形で投稿しますね。

Storm0.8.2公開、運用や問題解析時の便利機能が追加されました!

こんにちは。kimukimuです。

先日、Stormの最新版、0.8.2の正式版リリースが公開されました。
http://storm-project.net/2013/01/11/storm082-released.html

前回のStorm0.8.1が開発を行いやすくする機能が追加されていましたが、
今回はStormUIが主に機能追加対象となり、運用や問題解析を行いやすくする機能がそろっています。

では、追加された主な機能について挙げてみますね。

1.Topologyごとにホストマシンを占有させるIsolation Schedulerの追加

Stormクラスタ上で複数のTopologyを動作させる際、
特定のTopologyに対してホストを占有させる設定が可能となりました。

これによって、商用のTopologyは常時3個のホストを占有し、
検証用のTopologyとは混ざらないようにする。。。といったことが出来ます。

実際にStormクラスタを複数用意するのは大変ですので、
クラスタ上で検証と運用をうまくバランスさせるには便利そうですね。

2.StormUIの改善

StormUIに下記のように操作ボタン、情報が追加され非常に使いやすく改善されました。
−activate、diactivate、kill、rebalanceといったTopologyへの操作ボタン追加
−Boltの現状のキャパシティ目安(イベント処理可能数)表示
−Ack待ちのTuple数を表示
−ClusterやTopologyの設定値を表示

実際まず見る情報はStormUIのため、UIから操作が可能になったというのは非常にうれしいですね!

3.StormUIがNimbusと同ホストでなくても配置でき、複数設置可能になった

StormUIがNimbusと別ホストに配置可能になりました。

Storm0.8.1まではStormUIは問答無用でlocalhostにアクセスして情報を取得していたため、
Nimbusと同じホストにしか配置できませんでした。

Storm0.8.2では別ホストに配置可能となったため、柔軟に対応できることに加え、
StormUIを複数のホストに配置することも可能になりました。
画面だけいろんなところで見たい、ということにも対応できるようになったわけですね。

4.StormUIに表示されるエラーの数を絞ることが可能になった

「7.Workerを殺さずに〜」にも関連するのですが、
StormUIはStorm動作中に発生したエラーを表示する機能を持っています。

Storm0.8.1まではエラー表示件数は発生しただけ表示されていました。
そのため、エラーが大量に発生すると画面から溢れてしまうことが多々ありました。

Storm0.8.2では画面に表示するエラーの数を時間や回数で区切ることが可能となったため、
無駄な情報を省くことが可能になりました。

5.StormにTopologyをSubmit時、バリデーションが可能になった

StormにTopologyをSubmitする際にバリデーションが可能になりました。

Storm0.8.1まではTopologyをSubmitするときには自前でバリデーション処理を作りこみ、
自前のコードの中ではじく必要がありましたが、それをStorm側が提供した枠組みの中で可能になった・・・
というわけですね。

6.Workerを殺さずにStormUIにエラーを表示するReportedFailedExceptionの追加

これは思いっきり開発より&内部の話になってしまいますが・・・
StormUIにはTopology内部で発生した例外を補足して表示する機能がありますが、
表示される例外は「Spout/Boltを停止させるレベルの例外」のみでした。

StormではSpout/Boltが停止するとそのWorkerプロセスも併せて停止するため、
StormUIに例外が表示された時点でプロセスがフェールオーバーして
クラスタ内を飛び回っている状態になります。
結果、問題を追うのが大変・・・というのが多々ありました。

ReportedFailedExceptionは「StormUIには表示されるが、Spout/Boltの動作は継続する」例外です。
そのため、実際にクラスタ上で動かす場合にエラーのレベルを定義できるようなノリですね。

ユーザに知らせたいけどそのまま動作を継続したい場合はReportedFailedExceptionを投げ、
どうしようもない場合は今までどおりの例外を投げてプロセスごと再起動・・・が選択可能になりました。

言うなれば、WARNとERRORの切り分けができるようになった感じでしょうか。

7.総括して、何がうれしくなったの?

全体的に見て、実際に動かしてみて困る内容への対処がそろっています。
また、Topologyのバリデーションなどで事前に問題を検知できるようになったのもうれしいですね。

もしStormを使おうか考えている方がいれば、今回を機にぜひ使ってみてください。

それでは。

StormUnitを使ってJUnitでテスト(その1

こんにちは。kimukimuです。

新年おめでとうございます。
今年も「Taste of Tech Topics」ブログをよろしくお願いします。

では、今日のお題です。

1.JUnitでStormのコンポーネントを検証したい!

過去の機能紹介記事で触れたように、Storm0.8.1からStormのテスト用コンポーネントが整備され、
JUnitでStormの機能が一部テスト可能になっています。

ですが、このブログでは触れたことがなかったため、
実際にテストコードを書いて、どこまで使えるのか、気をつけるべき点は何なのかを記述します。

2.テスト対象のコンポーネントは?

今回テスト対象とするコンポーネントは下記のexecuteメソッドを持つBoltです。
・・・つまりは、受信した値を二乗して次のBoltに流すBoltになりますね。

@Override
public void execute(Tuple input)
{
    int inputValue = input.getIntegerByField("Value");
    int resultValue = inputValue * inputValue;
    this.collector.emit(input, new Values(resultValue));
    this.collector.ack(input);
}

上記のコードではexecuteメソッドだけ抜粋して記載していますので、
全ソースを見たい方はSquareBolt.javaを確認してください。

全く関係ないですが上記のソース群は依存性解決をGradleで行うよう改修してあるため、
もし、もの好きな方がいて「ソースを全て落として確認してみるぜ!」となった場合、
面倒ですがGradleでEclipseタスクを走らせてから確認してください。お願いします。

3.実際にテストコードを書いてみる!

Stormが公開している-TestingApiDemo.javaを参考に、まずはテストコードを書いてみます。
すると下記のようになりました。

/**
 * SquareBoltを1段階組んで結果を確認する。<br/>
 * 投入する値は「0、10」
 */
@Test
public void testExecute_SquareBolt1段()
{
    MkClusterParam mkClusterParam = new MkClusterParam();
    Config daemonConf = new Config();
    daemonConf.put(Config.STORM_LOCAL_MODE_ZMQ, false);
    mkClusterParam.setDaemonConf(daemonConf);

    Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() {
        @Override
        public void run(ILocalCluster cluster)
        {
            // 準備
            // Topology構成を生成
            TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("SingleIntValueSpout", new SingleIntValueSpout(), 2);
            builder.setBolt("SquareBolt", new SquareBolt(), 2).fieldsGrouping("SingleIntValueSpout",
                    new Fields("Value"));
            StormTopology topology = builder.createTopology();

            // テスト用のデータを生成
            MockedSources mockedSources = new MockedSources();
            mockedSources.addMockData("SingleIntValueSpout", new Values(0), new Values(10));

            // 動作用の設定を生成
            Config conf = new Config();
            CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam();
            completeTopologyParam.setMockedSources(mockedSources);
            completeTopologyParam.setStormConf(conf);

            // 実施(Topologyを実行)
            Map result = Testing.completeTopology(cluster, topology, completeTopologyParam);

            // 検証
            assertTrue(Testing.multiseteq(new Values(new Values(0), new Values(100)),
                    Testing.readTuples(result, "SquareBolt")));
        }
    });
}

さっそく実行してみると・・・

・・・ええ。思いっきりエラーになっています。

エラー内容は下記でした。
どうやらZookeeperの管理ファイルを削除しようとして失敗しているようですが・・・
さて、困りました。ファイルが存在するのに削除失敗します。
しかも、今まで実行していたファイルが継続して残るので、
実はStormのTopologyを稼働させるごとに1回65MBのゴミファイルが残る
ということです。
なんてこったい!

java.io.IOException: Unable to delete file: C:\Users\kimutansk\AppData\Local\Temp\5c851776-75d6-49ab-9818-707c4679c733\version-2\log.1

・・・ですが、Linuxでは同様の事象は発生しないように見えるため、
Windowsの環境に伴う問題としてとりあえずIOExceptionを無視して進めることにします。

4.きちんと検証できているの?

例外が発生するのはTesting#withSimulatedTimeLocalClusterメソッド実行時のため、
Testing.withSimulatedTimeLocalCluster実行時にIOExceptionが発生した場合は無視するという処理を追加しました。

すると、下記のようにテストコード実行は成功したように見えます。

後は「テストの実行結果が誤っていた場合に検知可能か?」を確認します。

下記のように実際にassertに渡す値を変更して、失敗を検知できるか確認します。

assertTrue(Testing.multiseteq(new Values(new Values(0), new Values(99)),
        Testing.readTuples(result, "SquareBolt")));

すると・・・

なぜか成功してしまいます。
にもかかわらず、コンソールには下記のAssertionErrorが。

java.lang.AssertionError
	at org.junit.Assert.fail(Assert.java:86)
	at org.junit.Assert.assertTrue(Assert.java:41)
	at org.junit.Assert.assertTrue(Assert.java:52)
	at storm.sample.bolt.SquareBoltTest$1.run(SquareBoltTest.java:72)
        (省略)

どうやら、

assertTrueメソッドを実行しているのはJUnit実行を行っているスレッドとは
別スレッドのため、JUnit側では失敗したことを検知できないようです。

そのため、最終的なテストコードは下記のようになりました・・・
ソース全体を確認したい場合は「SquareBoltTest.java」を見てください。

/** Assert確認フラグ。Stormクラスタ実行スレッドの検証が正常終了した場合にtrueにして検証がOKだったかを検知 */
private boolean isAsserted = false;

/**
 * SquareBoltを1段階組んで結果を確認する。<br/>
 * 投入する値は「0、10」
 * @throws Exception 実行失敗時
 */
@Test
public void testExecute_SquareBolt1段() throws Exception
{
    this.isAsserted = false;
    〜〜〜(省略)〜〜〜

    try
    {
        Testing.withSimulatedTimeLocalCluster(mkClusterParam, new TestJob() {
            @Override
            public void run(ILocalCluster cluster)
            {
                〜〜〜(省略)〜〜〜
                // 検証
                assertTrue(Testing.multiseteq(new Values(new Values(0), new Values(100)),
                        Testing.readTuples(result, "SquareBolt")));
                // 検証OKだった場合検証OKフラグを設定
                SquareBoltTest.this.isAsserted = true;
            }
        });
    }
    catch (Exception ex)
    {
        // Windows上で実行した場合、ZooKeeperファイル削除に失敗してIOExceptionが発生する。
        // そのため、IOExceptionが発生した場合は無視。
        if ((ex instanceof IOException) == false)
        {
            throw ex;
        }
    }
    
    assertTrue(this.isAsserted);
}

いまいち書き方としては美しくありませんが、これでBoltの検証は可能となりました。
なお、モックデータを準備して流す関係上、Spoutの検証は同じ機構では無理のようです。
StormUnitでは無理なのか、それとも他のAPIがあるのか・・・

その辺りの確認と、後は他の検証が可能かを含めて次回に書きます。

それでは。

Stormって条件でBoltを分岐させることってできるの?

こんにちは。kimukimuです。

Stormの次バージョン(0.8.2)の開発を楽しみに待っていますが、
いつの間にか0.8.2の開発版のビルド番号が16まで行っていました(ーー;

0.8.2はGUIもパワーアップしているので楽しみです。
・・・そんなわけで、どこかこのあたりで一度リリースしてくれることを期待する今日この頃です^^;

とまぁ、それはさておき、今日のお題です。

1.StormのWikiに書いてありそうで書いていない肝心なこと

これまで何回かStormについて投稿してきましたが、
肝心なことを書いていなかったことに気付きました。

それは・・・
「Stormって条件でTupleを送信するBoltを分岐できるの?」
です。

StormのWikiを見てみても、下のようなSpoutとBoltの接続図は書いてありますが、
条件に関する記述はありません。

下の図からわかることは下記の2点であり、分岐できるか否かについてはわからないんですよね。

1.Spoutは複数種類のBoltにTupleを送信できる(Boltもできそう)
2.Boltは複数のSpout/BoltからTupleを受信することができる

もし、分岐可能なのであれば1つのTopologyで複数のイベント種別をハンドリング可能となります。
1か所で一気に複数のイベントを受信して、対応するBoltに分配して処理・・・ができるわけですね。

各々別のTopologyを定義すればいいじゃないか、という突っ込みはあります。
ですが、その場合1つのメッセージ取得元に複数のイベントが混ざるケースであれば
事前に分ける必要があるなどして、それはそれで面倒ですので。

というわけで、
下記のようなBoltの条件分岐が可能なのか、
実際のコードを元に試してみます!

2.実際に分岐が出来るのかやってみる

結論がわかっていないとこの先進みにくいため書いてしまいますが、
実際に分岐させることは可能でした。

実現方法は一言で言うと、
「Boltから条件に応じて異なるStreamにTupleを流す」です。

StormではStreamというTupleフローを定義し、
TupleフローをBoltが読みこむことでBoltがTupleを受信します。

Streamは単なるフローのため、Stream上で処理を分岐させることはできません。
ならば、Streamを複数作ったら分岐もできるだろう・・・ということで、
Bolt側で複数のStreamを定義し、イベントの送付分けを行う形で分岐が可能でした。

実際のコードの一部と、実行結果を示しますね。

■JudgeBoltの分岐処理
@Override
public void execute(Tuple tuple)
{
    String word = tuple.getString(0);

    if (word.length() <= 5)
    {
        System.out.println("JudgeBolt_" + this.index_ + "ToShortWord:"
                + word);
        // 短い単語用のStreamに流す
        this.collector_.emit("ShortWord", new Values(word));
    }
    else
    {
        System.out.println("JudgeBolt_" + this.index_ + "ToLongWord:"
                + word);
        // 長い単語用のStreamに流す
        this.collector_.emit("LongWord", new Values(word));
    }

    this.collector_.ack(tuple);
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer)
{
    // 分岐用の2個のStremを定義
    declarer.declareStream("ShortWord", new Fields("word"));
    declarer.declareStream("LongWord", new Fields("word"));
}
■Topology構築時の定義
builder.setSpout("WordSpout", new TestWordSpout(), 2);
builder.setBolt("JudgeBolt", new JudgeBolt(), 5).fieldsGrouping(
        "WordSpout", new Fields("word"));

// ShortWordのStreamを読み込むよう定義
builder.setBolt("ShortWord", new ShortWordBolt(), 5).fieldsGrouping(
        "JudgeBolt", "ShortWord", new Fields("word"));

// LongWordのStreamを読み込むよう定義
builder.setBolt("LongWord", new LongWordBolt(), 5).fieldsGrouping(
        "JudgeBolt", "LongWord", new Fields("word"));
■実行結果(JudgeBolt)
JudgeBolt_3ToShortWord:mike
JudgeBolt_3ToLongWord:bertels
JudgeBolt_3ToLongWord:jackson
JudgeBolt_4ToLongWord:nathan
JudgeBolt_3ToLongWord:jackson
JudgeBolt_0ToShortWord:golda
JudgeBolt_3ToLongWord:bertels
JudgeBolt_3ToLongWord:bertels
JudgeBolt_3ToLongWord:jackson
JudgeBolt_3ToLongWord:bertels
JudgeBolt_3ToLongWord:jackson
JudgeBolt_3ToLongWord:bertels
JudgeBolt_3ToLongWord:jackson
JudgeBolt_0ToShortWord:golda
JudgeBolt_3ToLongWord:jackson
■実行結果(ShortWordBolt)
ShortWordBolt_3:mike
ShortWordBolt_0:golda
ShortWordBolt_0:golda
■実行結果(LongWordBolt)
LongWordBolt_3:bertels
LongWordBolt_3:jackson
LongWordBolt_4:nathan
LongWordBolt_3:jackson
LongWordBolt_3:bertels
LongWordBolt_3:bertels
LongWordBolt_3:jackson
LongWordBolt_3:bertels
LongWordBolt_3:jackson
LongWordBolt_3:bertels
LongWordBolt_3:jackson
LongWordBolt_3:jackson

きちんとShort側では短い単語だけ割り振られ、
Long側には長い単語だけ割り振られていますね。
かつ、同じ単語は同じBoltで処理されているため、グルーピングも適用されているようです。

というわけで、
下記の2点ができることがわかりました。

1.Tupleの内容に応じて次のBoltを分岐させることが可能なこと
2.Boltを分岐させた場合でもFieldGroupingは適用可能なこと

尚、全コードについては(DecisionTestTopology)を参照してください。

3.で、わかって何が嬉しいんだっけ?

StormがTupleを分岐して流すことが分かったことで、
Stormが取得したイベントやデータに応じて処理を分けられることがわかりました。

1でも書いたとおり、
1Topologyで複数のイベントに対して異なる処理を適用できるわけですね。

そんなわけで、Stormを使ってみようかと考えている皆さん、
条件分岐は出来るよ、という前提で検討を進めてみてください。

それでは。

Storm0.8.1公開。開発における足回りが充実しました。

こんにちは。kimukimuです。

つい先日、Stormの最新版、0.8.1の正式版リリースが公開されました。
Stormのダウンロードページから取得可能

前回の0.7系 > 0.8系のように大きな機能の追加・・・があるわけではないのですが、
実際に開発を行う上で非常に重要な足回りが複数リリースされています。
細かいバグフィックスやインタフェース追加もありますので、個人的に重要だと思う要素をかいつまんで紹介します。

では、追加された機能について挙げてみますね。
前回と同じく、nathanmarzさんが投稿している0.8.1リリース通知を基にしています。

1.Stormのユニットテスト用モジュールStorm's Unit追加

今回のリリース一番の目玉です。
新しい機能・・・というわけではないのですが、
JUnitによるテストコードでStormをテストするためのモジュールがリリースされました。

JUnit上でローカルモードでクラスタを動作させ、
指定したTupleを流して実際の動作を確認可能になっています。
その際、流したTupleや結果を取得する口が設けられているため、
取得口を通して検証が可能・・・というものです。

また、時間経過をシミュレーションするAPIも追加されています。

2.Spoutの待ち処理を設定で自動設定できるようになった

StormのSpoutは基本的に「取ってこれるだけデータを取ってくる」という動作となるため、
データの取得メソッドをBusyLoopのように延々実行し続ける形になります。

ですが、下記の2ケースにおいては「待ち」を挟む必要が出てきます。
A.Spoutが読み込めるTuple数の最大値に達した場合
 #現状はnextTupleが呼ばれないものの、CPUを消費します
B.Spoutがデータソースから何も取得できなかった場合
 #データが無い場所に何度も取得しに行っても無駄なため

この「待ち」を設定で行えるようになりました。
これまではAは無理、Bを各Spoutで実装していたため便利になっています。

3.ローカルモードでもTupleのタイムアウトが発生するようになった

検証の上では非常にありがたい機能追加です。
StormはTupleがタイムアウトするとSpoutに通知が返ってきますが、
これまではクラスタ上で動作する際にしかタイムアウトが発生しませんでした。

そのため、デバッガを動作させた状態でタイムアウトの動作を確認することが困難だったのですが、
ローカルモードでも発生するようになったため、デバッガでの確認が容易になりました。

4.総括して、何がうれしくなったの?

0.8.1のリリースでは「開発のしやすさ」「扱いやすさ」の向上・・・だと思います。
StormUnitの追加によって動作検証が小さい単位で確認できるようになりましたし、
タイムアウトまで含めてTupleの機能がデバッガで確認できるようになり、開発しやすくなりました。

後は明らかな無限ループ突入を回避することが設定で可能になり、
「知っていないとリソースを食いつぶす」というケースが少なくなりました。
かなり扱いやすくなったのではないでしょうか。

今後のリリースも楽しみですね^^

Stormの簡単インストーラを公開しました!

こんにちは。kimukimuです。

Trident APIではありませんが、Stormについてお知らせを。

Stormの簡単インストーラを公開しました!
https://github.com/acromusashi/storm-installer

これまで書いてきた記事では基本的にStormは
依存パッケージをビルドしてインストールを行っていたのですが、
簡単インストーラを使用することで、RPMでのインストールが可能となります!

また、同梱している追加のパッケージ(storm-service)をインストールすることで、
serviceコマンドからStormを起動させることが可能になります。

今までインストールの手間がかかるからと遠慮していた方、
実際にStormを使ってアプリケーションを作りたい方、
是非使ってください^^