Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

RxJavaを使ってCallback Hellから脱出する( Java8 ラムダ編 )

id:KenichiroMurata(@ )です。

f:id:acro-engineer:20131206014754j:plain

皆さん、RxJava 使っていますか?

前回はVert.xのmod-rxvertxを使い、RxJavaによってCallback Hellから脱出する方法を説明しました。本記事は、せっかくなのでJava8のラムダを使ったらさらにどのようになるのか?を試してみた補足記事です。

目次は以下の通りです。

  1. 環境
  2. Java8 ラムダによってCallback Hellはいかに解決されたか
  3. まとめ

1. 環境

本記事で試した環境について説明します。

  • Vert.x 2.1M1
  • mod-rxvertx 1.0.0-beta2
  • RxJava 0.15.1
  • Java 1.8.0-ea(build 1.8.0-ea-b118)
  • IntelliJ IDEA 13 CE

Java8を試すために、IntelliJ IDEA 13 CEを使いました。Java8をインストールし、IDEAを使う場合は以下のように初期化してください。

ken@vertx-test $ ./gradlew test
ken@vertx-test $ ./gradlew copyModJson
ken@vertx-test $ ./gradlew idea

2. Java8 ラムダによってCallback Hellはいかに解決されたか

まずは、通常のVert.xで書いた場合のコードのJava8 ラムダ版は次のようになります。例を分かりやすくするために、JSONメッセージの生成部などは外部メソッド化しました。

J8BusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    final EventBus eventBus = vertx.eventBus();
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap("muraken720.testexample");

    JsonObject req1 = createAddRequest("@muraken720");

    // リクエスト1
    eventBus.send("muraken720.vertx.mod.testexample", req1,
        (Handler<Message<JsonObject>>) reply -> {
          assertEquals("ok", reply.body().getString("status"));
          assertEquals("@muraken720", map.get("name"));

          JsonObject req2 = createAddRequest("Kenichiro");

          // リクエスト2
          eventBus.send("muraken720.vertx.mod.testexample", req2,
              (Handler<Message<JsonObject>>) reply1 -> {
                assertEquals("ok", reply.body().getString("status"));
                assertEquals("Kenichiro", map.get("name"));

                JsonObject req3 = createAddRequest("Murata");

                // リクエスト3
                eventBus.send("muraken720.vertx.mod.testexample", req3,
                    (Handler<Message<JsonObject>>) reply2 -> {
                      assertEquals("ok", reply.body().getString("status"));
                      assertEquals("Murata", map.get("name"));

                      testComplete();
                    });
              });
        });
  }

ラムダ式により、無名クラスの部分がスッキリしていますね。

それではこれをRxJavaを使ってコードを書くと次のようになります。

RxJ8BusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap("muraken720.testexample");

    JsonObject req1 = createAddRequest("@muraken720");

    // リクエスト1
    Observable<RxMessage<JsonObject>> obs1 = rxEventBus.send("muraken720.vertx.mod.testexample", req1);

    Observable<RxMessage<JsonObject>> obs2 = obs1.flatMap(reply -> {
      assertEquals("ok", reply.body().getString("status"));
      assertEquals("@muraken720", map.get("name"));

      JsonObject req2 = createAddRequest("Kenichiro");

      return rxEventBus.send("muraken720.vertx.mod.testexample", req2);
    });

    // リクエスト2
    Observable<RxMessage<JsonObject>> obs3 = obs2.flatMap(reply -> {
      assertEquals("ok", reply.body().getString("status"));
      assertEquals("Kenichiro", map.get("name"));

      JsonObject req3 = createAddRequest("Murata");

      return rxEventBus.send("muraken720.vertx.mod.testexample", req3);
    });

    // リクエスト3
    obs3.subscribe(reply -> {
      assertEquals("ok", reply.body().getString("status"));
      assertEquals("Murata", map.get("name"));

      testComplete();
    });
  }

Java8のラムダ式を使うことにより、RxJavaのAction1、Function1といった無味乾燥なクラス名も登場しなくなり、非常にスッキリしました。コード自体も縦には長くなっていますが、ネストが深くならないことがよく分かると思います。

3 まとめ

いかかでしょうか?Java8のラムダを使うことで、Callback Hellから脱出するだけでなく、その記述方法のシンプルになることがご覧頂けたと思います。Java8の正式リリースは3月?だったと思いますが、そのJava8に向けて、RxJavaもVert.xもさらに進化していくと思います。今後どのようになるのか?を想像しながら、あれこれ試してみると面白いと思います。

See also

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

RxJavaを使ってCallback Hellから脱出する( Vert.x がいいね!第5回 )

id:KenichiroMurata(@ )です。

本記事はJava Advent Calendar 2013 - Adventarの6日目の記事です。

f:id:acro-engineer:20131206014754j:plain

皆さん、RxJava 使っていますか?

RxJava はNetflixが開発する Reactive ExtensionsJava版です。Reactive Extensions (Rx)はReactive Programmingを可能にするライブラリです。

私はReactive Programmingとはなんぞや?と語れるほどには詳しくないので、ここでは asynchronous で event-based なプログラムを書くのに便利なライブラリというレベルの紹介とさせて頂きます。:-)

さて、なぜ RxJava なのか?というと Vert.x を使って、asynchronous で eventbusベースのプログラムを書こうとすると、どうしてもコールバックを多段に書くことなり、ネストの深いコード(Callback Hell)になってしまうのを改善したいからです。

そこで本記事では、Vert.xとVert.xにてRxJavaを使えるようにするためのモジュールである mod-rxvertx を使って、Callback Hellからいかに脱出できるのか?を試したいと思います。

ちなみに、mod-rxvertxはVert.xのAPIをObservableを返すようにラップしてくれるライブラリで、現在では以下のAPIに対応しています。

  • EventBus
  • HttpServer/HttpClient
  • NetServer/NetClient
  • Timer

また、RxJavaそのものについては、Wikiページが充実していますので、ぜひご覧下さい。


目次は以下の通りです。

  1. 環境
  2. BusModとIntegration Testを"Rx-ify"してみる
  3. Callback Hellはいかに解決されたか
  4. RxJavaを使うとできること
  5. まとめ

1. 環境

本記事で試した環境について説明します。

  • Vert.x 2.1M1
  • mod-rxvertx 1.0.0-beta2
  • RxJava 0.15.1

また、今回の全ソースはこちらにありますのでcloneしてお試しください。git clone したら以下を実行することで、eclipseからもJUnitテストを動かして試すことができます。

ken@vertx-test $ ./gradlew test
ken@vertx-test $ ./gradlew copyModJson
ken@vertx-test $ ./gradlew eclipse

2. BusModとIntegration Testを"Rx-ify"してみる

それでは、さっそくコードを書いてみましょう。題材は前回のBusModとそのIntegration Testです。

まずはVert.xで通常に書いた場合のコードです。

BusModVerticle.java

public class BusModVerticle extends Verticle {

  public void start() {
    container.logger().info("BusModVerticle start.");

    vertx.eventBus().registerHandler("muraken720.vertx.mod.testexample",
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> message) {
            ConcurrentMap<String, String> map = vertx.sharedData().getMap(
                "muraken720.testexample");

            JsonObject json = message.body();

            if ("add".equals(json.getString("action"))) {
              String key = json.getString("key");
              String value = json.getString("value");
              map.put(key, value);
            } else {
              message.reply(new JsonObject().putString("status", "error")
                  .putString("message", "unknown action."));
            }

            message.reply(new JsonObject().putString("status", "ok"));
          }
        });
  }
}

これをRxJava(mod-rxvertx)を使って書くと次のようになります。

RxBusModVerticle.java

public class RxBusModVerticle extends Verticle {

  public void start() {
    container.logger().info("RxBusModVerticle start.");

    RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    Observable<RxMessage<JsonObject>> obs = rxEventBus
        .<JsonObject> registerHandler("muraken720.vertx.mod.testexample");

    obs.subscribe(new Action1<RxMessage<JsonObject>>() {
      @Override
      public void call(RxMessage<JsonObject> message) {
        ConcurrentMap<String, String> map = vertx.sharedData().getMap(
            "muraken720.testexample");

        JsonObject json = message.body();

        if ("add".equals(json.getString("action"))) {
          String key = json.getString("key");
          String value = json.getString("value");
          map.put(key, value);
        } else {
          message.reply(new JsonObject().putString("status", "error")
              .putString("message", "unknown action."));
        }

        message.reply(new JsonObject().putString("status", "ok"));
      }
    });
  }
}

EventBusをラップしたRxEventBusを使います。Integration Test側のコードも見てみましょう。

BusModVerticleTest.testAddAction

  @Test
  public void testAddAction() {
    container.logger().info("in testAddAction()");

    JsonObject request = new JsonObject().putString("action", "add")
        .putString("key", "name").putString("value", "@muraken720");

    container.logger().info("request message: " + request);

    vertx.eventBus().send("muraken720.vertx.mod.testexample", request,
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> reply) {
            JsonObject json = reply.body();
            container.logger().info("response message: " + json);

            assertEquals("ok", json.getString("status"));

            ConcurrentMap<String, String> map = vertx.sharedData().getMap(
                "muraken720.testexample");

            assertEquals("@muraken720", map.get("name"));

            testComplete();
          }
        });
  }

これをRxJava(mod-rxvertx)を使って書くと次のようになります。

RxBusModVerticleTest.testAddAction

  @Test
  public void testAddAction() {
    container.logger().info("in testAddAction()");

    JsonObject request = new JsonObject().putString("action", "add")
        .putString("key", "name").putString("value", "@muraken720");

    container.logger().info("request message: " + request);

    RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    Observable<RxMessage<JsonObject>> obs = rxEventBus.send(
        "muraken720.vertx.mod.testexample", request);

    obs.subscribe(new Action1<RxMessage<JsonObject>>() {
      @Override
      public void call(RxMessage<JsonObject> reply) {
        JsonObject json = reply.body();
        container.logger().info("response message: " + json);

        assertEquals("ok", json.getString("status"));

        ConcurrentMap<String, String> map = vertx.sharedData().getMap(
            "muraken720.testexample");

        assertEquals("@muraken720", map.get("name"));

        testComplete();
      }
    });
  }

いかがでしょうか?あれ、あまり変わらない?

そうなんですよね。最初にRxJavaを使った例を見たとき、言うほどCallback Hellは解消してなくない?と私は思ってしまいました。まぁ、ただこの例があまりにも単純な例なので、その効果もよく分からない、むしろ複雑になっているではないか?とすら思います。

ただ、実際にコードを書いてみると、ネストが1段減るだけでも、書きやすくなったと感じます。

この例ではあまりにも単純なので、もう少し複雑なコードを書いてみましょう。

3. Callback Hellはいかに解決されたか

それでは、今回のBusModに対してシーケンシャルにリクエストを投げるコードを書いてみます。つまり、リクエストを投げて、レスポンスを受けたら次のリクエストを投げる。さらにレスポンスを受けたら次のリクエストを投げるという例です。

まずは、通常のVert.xで書いた場合のコードは次のようになります。

BusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    container.logger().info("in testSerialAction()");

    final EventBus eventBus = vertx.eventBus();
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap(
        "muraken720.testexample");

    // リクエスト1
    eventBus.send("muraken720.vertx.mod.testexample",
        new JsonObject().putString("action", "add").putString("key", "name")
            .putString("value", "@muraken720"),
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> reply) {
            assertEquals("ok", reply.body().getString("status"));
            assertEquals("@muraken720", map.get("name"));

            // リクエスト2
            eventBus.send(
                "muraken720.vertx.mod.testexample",
                new JsonObject().putString("action", "add")
                    .putString("key", "name").putString("value", "Kenichiro"),
                new Handler<Message<JsonObject>>() {
                  @Override
                  public void handle(Message<JsonObject> reply) {
                    assertEquals("ok", reply.body().getString("status"));
                    assertEquals("Kenichiro", map.get("name"));

                    // リクエスト3
                    eventBus.send(
                        "muraken720.vertx.mod.testexample",
                        new JsonObject().putString("action", "add")
                            .putString("key", "name")
                            .putString("value", "Murata"),
                        new Handler<Message<JsonObject>>() {
                          @Override
                          public void handle(Message<JsonObject> reply) {
                            assertEquals("ok", reply.body().getString("status"));
                            assertEquals("Murata", map.get("name"));

                            testComplete();
                          }
                        });
                  }
                });
          }
        });
  }

ご覧の通り、最初にリクエストを投げてレスポンスを受けるHandlerの中で、次のリクエスト投げる。さらにそのレスポンスを受けるHandlerの中で、次のリクエストを投げる・・・という具合で、どんどんネストが深くなっています。これぞ Callback Hell です。

それではこれをRxJavaを使ってコードを書くと次のようになります。

RxBusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    container.logger().info("in testSerialAction()");

    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap(
        "muraken720.testexample");

    // リクエスト1
    Observable<RxMessage<JsonObject>> obs1 = rxEventBus.send(
        "muraken720.vertx.mod.testexample",
        new JsonObject().putString("action", "add").putString("key", "name")
            .putString("value", "@muraken720"));

    Observable<RxMessage<JsonObject>> obs2 = obs1
        .flatMap(new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() {
          @Override
          public Observable<RxMessage<JsonObject>> call(
              RxMessage<JsonObject> reply) {
            assertEquals("ok", reply.body().getString("status"));
            assertEquals("@muraken720", map.get("name"));

            // リクエスト2
            return rxEventBus.send(
                "muraken720.vertx.mod.testexample",
                new JsonObject().putString("action", "add")
                    .putString("key", "name").putString("value", "Kenichiro"));
          }
        });

    Observable<RxMessage<JsonObject>> obs3 = obs2
        .flatMap(new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() {
          @Override
          public Observable<RxMessage<JsonObject>> call(
              RxMessage<JsonObject> reply) {
            assertEquals("ok", reply.body().getString("status"));
            assertEquals("Kenichiro", map.get("name"));

            // リクエスト3
            return rxEventBus.send(
                "muraken720.vertx.mod.testexample",
                new JsonObject().putString("action", "add")
                    .putString("key", "name").putString("value", "Murata"));
          }
        });

    obs3.subscribe(new Action1<RxMessage<JsonObject>>() {
      @Override
      public void call(RxMessage<JsonObject> reply) {
        assertEquals("ok", reply.body().getString("status"));
        assertEquals("Murata", map.get("name"));

        testComplete();
      }
    });
  }

いかがでしょうか?深くなっていたネストが縦方向に展開されて、ネストが1段で収まっています。少しコードがゴテゴテしてますが、Java8のラムダに対応すれば、もっとスッキリとして見通しがよいコードになりそうです。

4. RxJavaを使うとできること

RxJava にはもっと便利な機能がたくさんあります。ここでmod-rxvertxのテストコード(EventBusIntegrationTest)からいくつか例を紹介します。

Observable.concat

  @Test
  public void testSimpleSerial() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
    final AtomicInteger totalReqs = new AtomicInteger(3);
    final AtomicInteger activeReqs = new AtomicInteger(0);
    
    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        System.out.println("serial-foo["+message.body()+"]");
        message.reply("pong!");
        activeReqs.incrementAndGet();
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.observeSend("foo", "ping!A");
    Observable<RxMessage<String>> obs2 = rxEventBus.observeSend("foo", "ping!B");
    Observable<RxMessage<String>> obs3 = rxEventBus.observeSend("foo", "ping!C");

    Observable<RxMessage<String>> concatenated = Observable.concat(obs1, obs2, obs3);

    concatenated.subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        System.out.println("serial-resp["+message.body()+"]");
        assertEquals("pong!", message.body());
        assertEquals(0,activeReqs.decrementAndGet());
        if (totalReqs.decrementAndGet()==0)
          testComplete();
      }
    });
  }

Observable.concatは2つ以上のObservableをシーケンシャルに実行するように連結してくれます。

ここまで見てくると気づくと思いますが、Observableで処理を書く場合、宣言した処理は直ぐには実行されず、subscribeを呼び出した段階になって実行(遅延実行)されます。コーディングスタイルとしては、Observableに対する処理を宣言的に記述して行き、最後にsubscribeを呼んで処理を実行、という流れになります。

Observable.merge

  @Test
  public void testGather() {

    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> msg) {
        System.out.println("receive");
        msg.reply("pong"+msg.body());
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.send("foo", "A");
    Observable<RxMessage<String>> obs2 = rxEventBus.send("foo", "B");
    Observable<RxMessage<String>> obs3 = rxEventBus.send("foo", "C");
    Observable<RxMessage<String>> merged = Observable.merge(obs1, obs2, obs3);
    
    merged.takeLast(1).subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        assertEquals("pongC", message.body());
        testComplete();
      }
    });
  }

Observable.mergeはパラレルに処理を実行し、全ての処理が完了するまで待ちます。

Observable.reduce

  @Test
  public void testConcatResults() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> msg) {
        System.out.println("receive: " + msg.body());
        msg.reply("pong"+msg.body());
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.send("foo", "A");
    Observable<RxMessage<String>> obs2 = rxEventBus.send("foo", "B");
    Observable<RxMessage<String>> obs3 = rxEventBus.send("foo", "C");
    Observable<RxMessage<String>> merged = Observable.merge(obs1, obs2, obs3);
    Observable<String> result = merged.reduce("", new Func2<String, RxMessage<String>, String>() {
      @Override
      public String call(String accum, RxMessage<String> reply) {
        return accum + reply.body();
      }
    });

    result.takeFirst().subscribe(new Action1<String>() {
      @Override
      public void call(String value) {
        assertEquals("pongApongBpongC", value);
        testComplete();
      }
    });
  }

先のObservable.mergeで一括りにしたパラレル処理の処理結果をrudeceで処理してまとめる、なんてこともできます。

ObservableにはTransforming、Filtering、Combiningといった様々な機能がありますので、ぜひWikiページをご覧下さい。

5 まとめ

いかかでしょうか?Vert.xによるプログラミングは asynchronous で eventbusベースなためにネストが深くなり、Callback Hell に陥りがちです。この Callback Hell から脱出する方法の一つとして、ぜひRxJava( mod-rxvertx )を試してみてください。私もまださわり始めたばかりなのですが、いかんせん日本語の情報はまだ少ないので、こんなこともできる!という情報などある方は、このブログのコメントや、tweetで教え頂けると嬉しいです。

See also

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Vert.x がいいね!(第4回:テストする)

id:KenichiroMurata(@ )です。

f:id:acro-engineer:20131221154226j:plain:w200

皆さん、Vert.x 使っていますか?

私は先日(11/9)開催された JJUG CCC 2013 Fallに参加し、「Over the Node.js. An Introduction to Vert.x」というタイトルで発表させて頂きました。

内容はこの「Vert.x がいいね!」の第1回〜第3回で紹介した内容をベースとして、現在の最新バージョンであるVert.x 2.1Mの新機能の紹介、node.jsで開発したfront-endとvert.xアプリケーションをeventbusでつなぐためのnpmモジュール、vertx-eventbus-clientなどを紹介しています。スライドは以下にありますので、興味のある方はぜひご覧下さい。



さて、少し間が空いてしましいましたが、第4回目となる今回は「テストする」と題して、Vert.xのテストのやり方について書きます。

目次は以下の通りです。

  1. Vert.x's Tests
  2. BusMod Common Practices
  3. Integration Tests
  4. Gradle Jacoco Plugin (Code coverage)
  5. まとめ

1. Vert.x's Tests

Vert.xのテストについては公式ドキュメントの開発ガイドGradle Project Templateを見ると情報があります。

Vert.xのテストにはunitテストとintegrationテストという2種類のテストがあります。

unitテスト

unitテストは特別なものではなく、JUnitを使った通常のunitテストです。Vert.xアプリケーションはVerticleを作成し、そのVerticleをVert.x Instance上で動作させるのですが、このunitテストは、ただ単純に対象のクラスをテストするだけです。

integrationテスト

では、Vert.x 上でテスト対象のVerticleを実際に動作させたテストをしたい場合はどのようにすれば良いでしょうか?例えばeventbusを使ったメッセージによってRequestを受け、なにか処理をしてからResponseを返すVerticleを作ってテストしたい場合です。

Vert.xではintegrationテストという位置づけのテストを定義しており、このintgrationテストがテスト対象のVerticleをVert.x上で動作させてテストを行うものです。Vert.xではその実現のために testtoolsを提供しており、このtesttoolsの中に、JUnitを拡張したTestVerticleというテストクラスを用意しています。

このTestVerticleを継承したテストクラスを作成することで、Vert.xのAPIが使えるようになるため、テスト対象のVerticleをVert.x上で動かしたテストを簡単に作成することができます。

2. BusMod Common Practices

それではここから、eventbusを使ったモジュールを作成し、そのintegrationテストを書いていきます。

Vert.xでモジュールを作成する場合、基本のインタフェースはeventbusによるReq/Res、またはPub/Subメッセージになります。eventbusをインタフェースとしたモジュールのことをVert.xでは慣例的にBusModと呼んでいるようです。

このBusModを作成するにあたり、公式モジュールやその他モジュールをいくつか調べると、メッセージインタフェースの共通プラクティスと言えるものがあるようですので、今回のモジュールもそれに従いたいと思います。

2.1. メッセージにはJSONメッセージを使う

Vert.xの特長としてPolyglotがあります。Java, Groovy, JavaScript, Ruby, Python, Scala, Clojureなど様々なJVM上で動作する言語を使うことができます。言語間の差分を意識せずにメッセージをやり取りするには、メッセージにJSONを使うことが推奨されいます。

2.2. Req/Resのメッセージにはstatusプロパティを使う

Req/Res型のメッセージを使う場合、Requestに対する結果が正常だったのか、異常だったのかを知る必要があります。この正常/異常の処理結果を知るためのフィールドとして、JSONメッセージにstatusプロパティを設けて、正常の場合は"ok"、異常だった場合は"error"を返すようにします。

  • Responseのstatusプロパティでok/errorを表現する
  {"status":"ok"}

2.3. statusがerrorの場合はmessageプロパティでエラーメッセージを返す

上記のstatusが"error"だった場合、何がエラーだったのか原因を知るためにエラーメッセージが必要です。statusが"error"であった場合は、messageプロパティを設けて、そのエラーメッセージを設定して返すようにします。

  • Responseのstatusがerror時にはmessageプロパティを使う
  {"status":"error","message":"unknown action."}

2.4. メッセージの種類を分けるにはactionプロパティを使う

eventbusによってメッセージを受けるためには、eventbusに対して受信するためにドメイン(イベントパス)を指定してHandlerを登録します。1つのドメインに対するメッセージの中で、メッセージの種別によって処理を切り替えるような実装をする際には、actionプロパティを設けて、その値によって、例えば"add"、"update"、"delete"、"search"などを使います。

  • Requestのactionプロパティでメッセージの種別を分ける
  {"action":"add","key":"name","value":"@muraken720"}

3. Integration Tests

それでは実際にBusModを作成し、そのintegrationテストを書いてみましょう。まずは、テスト対象となるBusModです。

BusModVerticle

public class BusModVerticle extends Verticle {

  public void start() {

    vertx.eventBus().registerHandler("muraken720.vertx.mod.testexample",
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> message) {
            ConcurrentMap<String, String> map = vertx.sharedData().getMap(
                "muraken720.testexample");

            JsonObject json = message.body();

            if ("add".equals(json.getString("action"))) {
              String key = json.getString("key");
              String value = json.getString("value");
              map.put(key, value);
            } else {
              message.reply(new JsonObject().putString("status", "error")
                  .putString("message", "unknown action."));
            }

            message.reply(new JsonObject().putString("status", "ok"));
          }
        });
  }
}

このBusModの仕様を簡単に列挙します。

  • イベントの受信ドメインは「muraken720.vertx.mod.testexample」とする
  • JSONメッセージを使う
  • Requestメッセージにて、action="add" を指定した場合、指定されたkeyとvalueによってSharedDataのmapにデータを登録する
  • 正常に処理した場合は status="ok" のResponseメッセージを返す
  • Requestメッセージにて、action="add"以外の値が指定された場合、status="error" のResponseメッセージを返す

BusModVerticleTest

上記のBusModをテストするためのテストクラス、BusModVerticleTestを作成します。
モジュールのintegrationテストをするには、TestVerticleを継承します。また、startメソッドにて初期化が必要になります。

public class BusModVerticleTest extends TestVerticle {

  @Override
  public void start() {
    // Make sure we call initialize() - this sets up the assert stuff so assert
    // functionality works correctly
    initialize();
    // Deploy the module - the System property `vertx.modulename` will contain
    // the name of the module so you
    // don't have to hardecode it in your tests
    container.deployModule(System.getProperty("vertx.modulename"),
        new AsyncResultHandler<String>() {
          @Override
          public void handle(AsyncResult<String> asyncResult) {
            // Deployment is asynchronous and this this handler will be called
            // when it's complete (or failed)
            if (asyncResult.failed()) {
              container.logger().error(asyncResult.cause());
            }
            assertTrue(asyncResult.succeeded());
            assertNotNull("deploymentID should not be null",
                asyncResult.result());
            // If deployed correctly then start the tests!
            startTests();
          }
        });
  }
}

ここはGradle Project Templateの実装をそのまま利用します。やっていることは、モジュールをVert.xのcontainerにdeployし、deployが完了したらテストを開始するというものです。

BusModVerticleTest.testAddAction

それでは次に正常系となる1つ目のテストケース、「testAddAction」を作成します。

  @Test
  public void testAddAction() {
    container.logger().info("in testAddAction()");

    JsonObject request = new JsonObject().putString("action", "add")
        .putString("key", "name").putString("value", "@muraken720");

    container.logger().info("request message: " + request);

    vertx.eventBus().send("muraken720.vertx.mod.testexample", request,
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> reply) {
            JsonObject json = reply.body();
            container.logger().info("response message: " + json);

            assertEquals("ok", json.getString("status"));

            ConcurrentMap<String, String> map = vertx.sharedData().getMap(
                "muraken720.testexample");

            assertEquals("@muraken720", map.get("name"));

            testComplete();
          }
        });
  }

Request用のメッセージを生成し、eventbusに対してメッセージを送信します。Responseメッセージを受信したら、statusが"ok"であること、さらに、SharedDataのmapを取得し、要求したkeyとvalueが保存されていることを確認します。

TestVerticleでテストケースを書く際には1つ決まりごとがあります。それはテストケースの最後に「testComplete()」を呼び出してテストケースの終了を通知するというものです。これは非同期APIによって処理が行われるため、テストを終わりを通知しなければ完了したことが分からないためです。

BusModVerticleTest.testUnknownAction

それでは異常系となる2つ目のテストケース、「testUnknownAction」を作成します。

  @Test
  public void testUnknownAction() {
    container.logger().info("in testUnknownAction()");

    JsonObject request = new JsonObject().putString("action", "unknown")
        .putString("key", "name").putString("value", "@muraken720");

    container.logger().info("request message: " + request);

    vertx.eventBus().send("muraken720.vertx.mod.testexample", request,
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> reply) {
            JsonObject json = reply.body();
            container.logger().info("response message: " + json);

            assertEquals("error", json.getString("status"));
            assertEquals("unknown action.", json.getString("message"));

            testComplete();
          }
        });
  }

TestVerticleによるテストケースはVert.xのAPIを使う以外は通常のJUnitにと同じですから、特に問題なく書けると思います。

テストを実行する

繰り返しになりますが、integrationテストに利用するTestVerticleはJUnitを拡張したものですので、テストの実行はEclipseからテストクラスを選択し、右クリックから「Run As > JUnit Test」で実行することができます。テストはEclipseから実行しますので、デバッグも同様にできます。

また、Gradle Project Templateを使っているので、gradleを使ってのテストの実行も可能です。

ken@vertx-test $ ./gradlew test

上記で実行することができます。

4. Gradle Jacoco Plugin (Code coverage)

今回のテストプロジェクトはgithubのこちらにありますので、全体を見たい方は参照してください。
このプロジェクトには、gradleのjacoco pluginを設定し、カバレッジを計測するできるようにしていますので、テストのカバレッジレポートを見ることができます。

ken@vertx-test $ ./gradlew test

上記のテスト実行で、build/reports/tests/index.htmlにテストレポートが、

f:id:acro-engineer:20131119155504p:plain

ken@vertx-test $ ./gradlew jacocoTestReport

上記のタスク実行で、build/reports/jacoco/index.htmlにカバレッジレポートが生成されます。

f:id:acro-engineer:20131119155632p:plain

テストをする上で、カバレッジを知りたいことがありますが、gradleのjacoco pluginを使えば簡単に確認できます。

5 まとめ

いかかでしょうか?Vert.xでアプリケーションを開発する上で一番開発が多いのがこのBusModです。非常に簡単なテストの例ではありましたが、そのテストをintegrationテストを使うことで簡単に実施できることが分かったと思います。ノンブロッキングで非同期なアプリケーションを開発する場合でも、このようにテストが書ければ、開発が捗りますね。

次回は?

Vert.xのテストの続きかコールバックヘルにどう対応するか辺りを取り上げたいと思います。

See also

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ


11/24(日) DODA転職フェアに参加します!ぜひブースにお立ち寄りください。

出展企業を探す【総合エリア】 | 転職フェア・適職フェア(DODA)

Vert.x がいいね!(第3回:Event LoopsとVerticle Instances)

id:KenichiroMurataです。

f:id:acro-engineer:20131221154226j:plain:w200

皆さん、Vert.x 使っていますか?

前回の「Vert.x がいいね!(第2回:開発環境を構築する) - Taste of Tech Topics」に続き、今回はVert.xの肝であるEvent LoopsとVerticle Instancesについて書きます。

目次は以下の通りです。

  1. Vert.x's Threads
  2. Event Loops
  3. Verticle Instances
  4. Event Loops & Verticle Instances
  5. Performance Test

1. Vert.x's Threads

まずはVert.xのスレッドがどのようになっているのか確認してみましょう。前回も利用したHelloWorldVerticleを利用して、リモートデバッグによってbreak pointで停止させ、動作しているスレッドを見てみます。

f:id:acro-engineer:20130822025227p:plain

図の通り、main threadが1個、vert.x-eventloop-threadが0〜3の4個、vert.x-worker-threadが0〜19の20個が動作していることが分かります。

今回はeventloop-threadに注目したい訳ですが、デフォルトで4個のeventloop-threadが動作しています。ここがEvent Loopが1つのnode.jsとVert.xが異なる点ですね。

2. Event Loops

さて、ではなぜeventloop-threadは4個なのでしょうか。これはVert.xのソースを確認すると分かります。見るべきソースはVertxExecutorFactory.javaです。

  public static EventLoopGroup eventLoopGroup(String poolName) {
    int poolSize = Integer.getInteger("vertx.pool.eventloop.size", Runtime.getRuntime().availableProcessors());
    return new NioEventLoopGroup(poolSize, new VertxThreadFactory(poolName));
  }
Runtime.getRuntime().availableProcessors()

とある通り、Vert.xはCPUのプロセッサ数を判別して、その環境にあったeventloop-threadの数になるように実装されています。

実際に私の環境で以下のように確認してみるとcpu.thread_countが4なので、2coreのハイパースレッディングで4core相当に見えるために、eventloop-threadも4個になっているようです。

ken@my-vertx-module $ sysctl -a machdep.cpu
(略)
machdep.cpu.core_count: 2
machdep.cpu.thread_count: 4

3. Verticle Instances

一方で、公式サイトのドキュメントを見ると、vertxコマンドでVerticleやModuleを実行する際に、「-instances」オプションを指定することで、VertilcleやModuleのインスタンス数を指定することができ、コア数に応じてアプリケーションをスケールすることができるぜ!と書かれています。

ここは試しに何も考えず、「-instances 10」と指定してvertxコマンドを実行してみましょう。

ken@my-vertx-module $ vertx runmod com.mycompany~my-module~0.0.1 -cp bin -instances 10
Verticle start. 
Verticle start. 
Verticle start. 
Verticle start. 
Verticle start. 
Verticle start. 
Verticle start. 
Verticle start. 
Verticle start. 
Verticle start. 

ご覧の通り、10個のHelloWorldVerticleのインスタンスが生成され、動作していることが分かります。これも実際にVert.xのソースを確認すると分かります。見るべきソースはDefaultPlatformManager.javaです。

      for (int i = 0; i < instances; i++) {
        // Launch the verticle instance
        Runnable runner = new Runnable() {
          public void run() {
            Verticle verticle;
            try {
              verticle = verticleFactory.createVerticle(main);
            } catch (Throwable t) {
              handleDeployFailure(t, deploymentID, aggHandler);
              return;
            }
            try {
              addVerticle(deployment, verticle, verticleFactory, modID, main);
              setPathResolver(modID, modDir);
              DefaultFutureResult<Void> vr = new DefaultFutureResult<>();
              verticle.start(vr);
              vr.setHandler(new Handler<AsyncResult<Void>>() {
                @Override
                public void handle(AsyncResult<Void> ar) {
                  if (ar.succeeded()) {
                    aggHandler.complete();
                  } else {
                    handleDeployFailure(ar.cause(), deploymentID, aggHandler);
                  }
                }
              });
            } catch (Throwable t) {
              handleDeployFailure(t, deploymentID, aggHandler);
            }
          }
        };

        if (worker) {
          vertx.startInBackground(runner, multiThreaded);
        } else {
          vertx.startOnEventLoop(runner);
        }
      }

この他、関連する以下のソースを見るとだいたいの動作が分かります。

簡単に書くと、Verticleを生成する際に、EventLoopGroupから1つのeventloop-threadを取得し、対象のVerticleに対応づけて、その組み合わせを保持しています。この処理によって、1つのVerticleインスタンスは最初に対応づけられたeventloop-threadからしか呼ばれないことが保証されます。

Verticleを開発する場合に、他のスレッドから呼ばれることがないため、同期や排他などを考えることなく、シンプルなコードを書くことができる、というVert.xの特長はこのように実現されています。

4. Event Loops & Verticle Instances

ここまでの内容から、Event LoopsとVerticle Instancesの関係について整理してみましょう。

  • Event Loopsは環境のコア数によってスレッド数が決まる
  • Vertcile Instancesは「-instances」オプションの指定によってインスタンス数が決まる
  • 1つのVerticleインスタンスが生成される際に、Event Loopsから1つのeventloop-threadが割り当てられる

例えば、4個のeventloop-threadに対して、10個のVerticleインスタンスを生成した場合、

  1. vertx-eventloop-thread-0
    • Vertcile-0
    • Vertcile-4
    • Vertcile-8
  2. vertx-eventloop-thread-1
    • Vertcile-1
    • Vertcile-5
    • Vertcile-9
  3. vertx-eventloop-thread-2
    • Vertcile-2
    • Vertcile-6
  4. vertx-eventloop-thread-3
    • Vertcile-3
    • Vertcile-7

このような割り当てとなります(あくまで例です)。

ここで考えてみましょう。

Verticleを複数インスタンス生成して処理をスケールさせたいとしても、実際の所はEvent Loopsのスレッド数以上にはマルチに処理できないため、Event Loopsのスレッド数以上のVerticle Instancesを指定しても効果がないと考えられます。

5. Performance Test

そこで実際に性能計測を行ってみましょう。私の環境ではeventloop-threadは4個なので、Verticleのインスタンス数を1、2、4、8と変化させた場合の処理性能を計測し、その違いを確認したいと思います。(注意:この計測はあくまでも私の環境で、各条件での処理性能を相対的に確認するものであり、Vert.xの性能値を評価するものではありません)

5.1 事前準備

測定に使うのは、再三登場しているHelloWorldVerticleです。クライアントからの要求をシミュレートするために、Http Performance exampleのClientを利用します。

また、OSのチューニングとして、maxfiles、somaxconnを適当な大きさに設定しています。

ken@my-vertx-module $ launchctl limit
(略)
	maxfiles    65535          65535          
ken@my-vertx-module $ sysctl -a | grep somax
kern.ipc.somaxconn: 2048

5.2 検証手順

まずはサーバを起動します。

ken@my-vertx-module $ vertx runmod com.mycompany~my-module~0.0.1 -cp bin (-instances 2/4/8)

クライアントは別のターミナルを2つ開いて、それぞれで以下を実行します。

ken@java $ vertx run perf/RateCounter.java -cluster
Starting clustering... 
No cluster-host specified so using address 10.0.1.6 
ken@java $ vertx run httpperf/PerfClient.java -instances 4 -cluster
Starting clustering... 
No cluster-host specified so using address 10.0.1.6 

負荷をかけるため、クライアント側も「-instances 4」を指定してマルチコアを使うようにしています。
PerfClientは1インスタンス辺り100接続をするため、計400接続を同時に行って、リクエストを送信します。

5.3 測定結果

5.3.1 Verticleインスタンス数1(「-instances」指定なし)
71968 Rate: count/sec: 124666.66666666667 Average rate: 120609.15962650067
74968 Rate: count/sec: 124666.66666666667 Average rate: 120771.52918578594
77968 Rate: count/sec: 123333.33333333333 Average rate: 120870.10055407346
80967 Rate: count/sec: 125375.12504168056 Average rate: 121036.96567737473
83967 Rate: count/sec: 123333.33333333333 Average rate: 121119.01104005145
86967 Rate: count/sec: 123333.33333333333 Average rate: 121195.39595478745
89967 Rate: count/sec: 126000.0 Average rate: 121355.60816743918
92967 Rate: count/sec: 126000.0 Average rate: 121505.48043929566
95968 Rate: count/sec: 123958.68043985339 Average rate: 121582.19406468823
98968 Rate: count/sec: 125333.33333333333 Average rate: 121695.9017056018
5.3.2 Verticleインスタンス数2(「-instances 2」)
72824 Rate: count/sec: 179393.13104368123 Average rate: 168241.23915192793
75825 Rate: count/sec: 176607.79740086637 Average rate: 168572.37059017475
78825 Rate: count/sec: 178666.66666666666 Average rate: 168956.54931810973
81825 Rate: count/sec: 176000.0 Average rate: 169214.78765658417
84825 Rate: count/sec: 178000.0 Average rate: 169525.49366342468
87825 Rate: count/sec: 172000.0 Average rate: 169610.01992598918
90825 Rate: count/sec: 171333.33333333334 Average rate: 169666.94192127718
93825 Rate: count/sec: 175333.33333333334 Average rate: 169848.12150279776
96825 Rate: count/sec: 178000.0 Average rate: 170100.69713400464
99825 Rate: count/sec: 176666.66666666666 Average rate: 170298.02153769095
5.3.3 Verticleインスタンス数4(「-instances 4」)
72069 Rate: count/sec: 189270.24325224926 Average rate: 173805.6584661921
75068 Rate: count/sec: 190730.24341447148 Average rate: 174481.8031651303
78068 Rate: count/sec: 190000.0 Average rate: 175078.13700876158
81068 Rate: count/sec: 189333.33333333334 Average rate: 175605.66438052006
84069 Rate: count/sec: 191936.02132622458 Average rate: 176188.60697760174
87068 Rate: count/sec: 190063.3544514838 Average rate: 176666.51352965497
90068 Rate: count/sec: 188666.66666666666 Average rate: 177066.21663631924
93069 Rate: count/sec: 193268.91036321226 Average rate: 177588.6707711483
96068 Rate: count/sec: 193397.79926642214 Average rate: 178082.19178082192
99068 Rate: count/sec: 192666.66666666666 Average rate: 178523.84220939153
5.3.4 Verticleインスタンス数8(「-instances 8」)
70314 Rate: count/sec: 185333.33333333334 Average rate: 161646.32932275222
73315 Rate: count/sec: 186604.4651782739 Average rate: 162667.93971220078
76314 Rate: count/sec: 186728.90963654552 Average rate: 163613.4916267002
79313 Rate: count/sec: 187395.79859953318 Average rate: 164512.7532686949
82314 Rate: count/sec: 184605.1316227924 Average rate: 165245.28026824113
85313 Rate: count/sec: 186728.90963654552 Average rate: 166000.492304807
88314 Rate: count/sec: 186604.4651782739 Average rate: 166700.63636569513
91315 Rate: count/sec: 177940.68643785405 Average rate: 167070.0323057548
94314 Rate: count/sec: 179393.13104368123 Average rate: 167461.882647327
97314 Rate: count/sec: 178666.66666666666 Average rate: 167807.3041905584

5.4 まとめ

(1)に比べて、(2)では2つのeventloop-threadで処理しているので、処理性能が向上しています。

(2)と(3)では(3)の方が処理性能は確かに向上しているものの、増分は想定よりも低くなっています。
ただし、これは(2)の時点でCPU使用率が既に80〜90%近くになっており、これ以上性能を出せない状況にあることが考えられます。

(4)は(3)に比べて、処理性能が下がっています。
Verticleのインスタンス数がeventloop-threadの数よりも大きいことが逆に処理性能を悪化させているのか、マシンリソースの問題かは分かりませんが、より性能が向上しないことは分かりました。


と言うわけで、Event LoopsとVerticle Instancesの関係について確認してきました。

Vert.xによるアプリケーションの開発で処理性能を引き出したい場合には、Event Loopの数と、Vertcileのインスタンス数について、ここまで見てきた関係を頭に入れながら開発することが重要となります。

次回は?

Vert.xのテストについてその手法を確立したいので、次回はその辺りを取り上げたいと思います。

Vert.x がいいね!(第2回:開発環境を構築する)

id:KenichiroMurataです。

f:id:acro-engineer:20131221154226j:plain:w200

皆さん、Vert.x 使っていますか?

前回の「Vert.x がいいね!(第1回:入門する) - Taste of Tech Topics」に続き、第2回目となる今回は開発環境の構築について書きます。

目次は以下の通りです。

  1. environment
  2. install
  3. setting
  4. gradle project template
  5. auto redeoploy
  6. remote debugging

1. environment

Vert.xをインストールをして開発を行う前に、必要な環境を揃えましょう。私の環境はOS X Mountain Lionですので、Windowsの方は環境に合わせて読み替えてください。

JDK 1.7

Vert.xはJDK 1.7.0以上が必要になります。JDK 1.7は「Java SE - Downloads | Oracle Technology Network | Oracle」からダウンロードしてインストールしてください。

私の環境では以下になります。

ken@~ $ java -version
java version "1.7.0_25"
Java(TM) SE Runtime Environment (build 1.7.0_25-b15)
Java HotSpot(TM) 64-Bit Server VM (build 23.25-b01, mixed mode)

Eclipse

開発にはEclipse Kepler (4.3) IDE for Java EE Developers を使います。Java EE Developersを使うのは、HTML/CSS/JavaScript(JSON)を開発で使うので、関連ツールが最初から入っていると楽だからです。

Eclipse Downloads」からダウンロードしてインストールしてください。

2. install

それでは次はVert.xのインストールです。インストール手順は公式サイトに説明があります。「ダウンロードページ」からサイトに行って、vert.x-2.0.0-final.tar.gz をダウンロードしてください。適当なディレクトリで解凍しましょう。

パスを通す

Vert.xのアーカイブを解凍したら、vertxコマンドが使えるようにパスを通します。私の場合は、ホーム直下に配置したので、以下のように.bashrcにPATHを定義しました。

ken@~ $ cat .bashrc 
PS1="\u@\W \$ "
PATH="$PATH":/Users/ken/vert.x-2.0.0-final/bin

ken@~ $ source .bashrc 

パスを通して、有効化したらvertxコマンドを実行してください。usageが表示されれば完了です。

ken@~ $ vertx

Hello World!

前回も紹介しましたが、Hello World的なプログラムを作成し、動作確認をしてみましょう。
以下の内容をHelloWorldVerticle.javaというファイル名で保存してください。

import org.vertx.java.core.Handler;
import org.vertx.java.core.http.HttpServerRequest;
import org.vertx.java.platform.Verticle;

public class HelloWorldVerticle extends Verticle {

  public void start() {
    container.logger().info("Verticle start.");
    vertx.createHttpServer().requestHandler(new Handler<HttpServerRequest>() {
      public void handle(HttpServerRequest req) {
        req.response().headers().set("Content-Type", "text/plain");
        req.response().end("Hello World!");
      }
    }).listen(8080);
  }
}

ファイルを作成したら、次のようにコマンドを実行し、ブラウザからlocalhost:8080にアクセスしてください。

ken@work $ vertx run HelloWorldVerticle.java 
Verticle start. 

.classではなく、.javaなファイルを指定しても動作するところがVert.xの面白い所ですね。

3. setting

開発をする上ではVert.xのログファイルを確認します。デフォルト設定はOSのtmpディレクトリにvertx.logが出力されるのですが、私の環境では権限の問題などあり、そのままだと出力されません。そこで、vert.xのディレクトリにlogディレクトリを作成し、そこにvertx.logが出力されるように設定変更しましょう。

設定ファイルは「vert.x-2.0.0-final/conf/logging.properties」になります。このファイルの以下の部分を変更します。

# Put the log in the system temporary directory
#java.util.logging.FileHandler.pattern=%t/vertx.log
java.util.logging.FileHandler.pattern=/Users/ken/vert.x-2.0.0-final/log/vertx.log

この設定をした後に、再度HelloWorldVerticleを実行してみましょう。先ほど指定したvertx.logにログが出力されているはずです。

ken@~ $ cat vert.x-2.0.0-final/log/vertx.log
[vert.x-eventloop-thread-2] 03:06:37,789 INFO [null-HelloWorldVerticle.java-704507910]  Verticle start.

最初からスレッド名とか出てくれるのはありがたいですね。

4. gradle project template

Vert.xではVerticleか、1つ以上のVerticleを含んだModuleという形式で開発を行います。簡単なプロトタイプでは直接Verticleを作成する形式でよいと思いますが、ちゃんとしたアプリケーションや再利用可能なモジュールを開発するのであればVerticleが1つであってもModuleとして開発した方がよいと思います。

Vert.xではこのModuleを開発するために、Mavenarchetypeを使ったproject templateによる開発と、Gradleによる project templateを使った開発の2つの方法を用意しています。ここではGradleによるproject templateを使った開発方法について説明します。Developing Vert.x Modules with Gradleを参考にします。

ちなみに、GradleはGroovyによるビルドツールですが、このproject templateに内包されているため、別途インストールは不要です。この辺りも便利ですね。

git cloneして、Eclipseにインポートする

まずは、vertx-gradle-templateからgit cloneします。このgradle project templateはJava、Groovy、JavaScriptRubyPythonの全てのサンプルソースとそのテストコードが含まれており、非常に勉強になります。ですが、その分いろいろと含んでおり、testの実行にも時間が掛かります。そこで、とりあえずJavaでだけの場合を必要最小限に知りたいという人(私のことです)向けに、ミニマムなJava用のgradle project templateとして、vert.x-module-template-for-javaを作成しましたので、こちらの方が好みの方はぜひご利用ください。

ken@work $ git clone https://github.com/vert-x/vertx-gradle-template.git my-vertx-module
ken@work $ cd my-vertx-module/
ken@my-vertx-module $ git remote rm origin

git cloneしたら、早速テストを動かして確認してみましょう。

ken@my-vertx-module $ ./gradlew test

次にEclipseからプロジェクトとしてインポートできるように、eclipseタスクを実行します。

ken@my-vertx-module $ ./gradlew eclipse

これでEclipseプロジェクトとしてのセットアップが完了するので、Eclipseからインポートできるようになります。

mod.json

Moduleを開発する際にはmod.jsonという設定ファイルを作成する必要があります。gradle project templateでは「src/main/resources/mod.json」にあります。このmod.jsonでは以下のようにmainとしてModuleのエントリポイントとなるVerticleを定義する必要があります。

    "main":"com.mycompany.myproject.PingVerticle",

また、このmod.jsonには、main以外にもModuleをリポジトリに公開する際に必要となるdescription、licenses、authorなどを定義します。詳細はファイルを確認してください(開発だけの場合にはmainだけでOKです)。

HelloWolrdVerticleをModuleとして動かす

先ほど作成したHelloWorldVerticle.javaをModuleとして動かしてみましょう。HelloWorldVerticle.javaを「src/main/java」以下に配置して、mod.jsonを以下のように変更します。

  // "main":"com.mycompany.myproject.PingVerticle",
  "main":"HelloWorldVerticle",

ここまでできたら、runModEclipseタスクを利用して動作させてみましょう。

ken@my-vertx-module $ ./gradlew runModEclipse

ブラウザでlocalhost:8080にアクセスすると動作が確認できます。

ただし、この方法は2つの問題があります。

  1. ログ出力がされない
  2. gradleによる起動は時間がかかる

1つ目のログ出力されないという問題はgradleがvertxのログ設定を上書きしてしまうため、vertxのログ出力が有効になりません。また、2つ目の起動に時間がかかるというのは、やってみると分かりますが、gradleの依存関係のあるタスクが全て動くためでしょう、私の環境では約10秒はかかります。よって、繰り返し実行するにはちょっとつらい状況です。

そこで、TIPS的な内容になりますが、次の方法で行うことをオススメします。

vertx runmodでModuleを動かす

./gradlew runModEclipseではなく、vertx runmod コマンドを使ってModuleを動かすと、ログ出力も正しく行われ、起動にも時間がかかりません。実際には以下のようにモジュール名とclass pathを指定して実行します。

ken@my-vertx-module $ vertx runmod com.mycompany~my-module~0.0.1 -cp bin

cpオプションで指定しているbinディレクトリはEclipseプロジェクトでのoutputディレクトリです。

ただし、vertx runmodでModuleを実行するには、カレントディレクトリの直下のmodsディレクトリ配下にModuleをインストール(配置)しておく必要があります。そうなるとこの実行方法をとる場合に、毎回作成したModuleをmods以下に配置する必要があります。ただ、実際の所は、mods以下にモジュール名のディレクトリとその配下にmod.jsonのみが存在すれば、後は-cpで指定したbinディレクトリにあるclassファイルを利用することで動作させることができます。

mod.jsonは一度定義すればほとんど変更はしないので、最初にこの作業を行えば、後はvertx runmodで開発のフローを回すことができます。

とは言っても手作業でこれをやるのは手間なので、私はgradleのタスクに以下のようなcopyModJsonというタスクを作って、実行するようにしています。

task copyModJson( type:Copy, dependsOn: 'copyMod', description: 'Copy the mod.json into the local mods directory for runmod auto-redeploy' ) {
  file("mods/$moduleName").mkdirs()
  from "build/mods/$moduleName/mod.json"
  into "mods/$moduleName"
}

このタスクを「gradle/vertx.gradle」に追加しています。vert.x-module-template-for-javaにはこのタスクを追加したvertx.gradleが含まれているので、こちらを使うか、ソースを確認してください。

上記のタスクが使えるようになっていれば、以下のように実行することで、準備が整います。

ken@my-vertx-module $ ./gradlew copyModJson

後は、先ほど紹介した通り、vertx runmodコマンドでModuleを実行してください。

ken@my-vertx-module $ vertx runmod com.mycompany~my-module~0.0.1 -cp bin

ちなみに、モジュール名はdomain~modulename~versionという形式と決まっており、これらの定義は「gradle.properties」に以下のように定義されていますので、必要に応じて変更してください。

# E.g. your domain name
modowner=com.mycompany

# Your module name
modname=my-module

# Your module version
version=0.0.1

5. auto redeoploy

Vert.xのModuleにはauto-redeployという機能があります。これはソースやclassファイルが変更されたらvertxを再起動せずにauto-redeployしてくれるという機能です。これも非常に便利です。デフォルトでは設定はOFFになっているので、mod.jsonに以下のように定義して有効にしましょう。

  //"main":"com.mycompany.myproject.PingVerticle",
  "main":"HelloWorldVerticle",
  "auto-redeploy": true,

これで準備は整いましたので、実際に試してみましょう。まずは先ほどと同様にvertx runmodコマンドでModuleを実行します。

ken@my-vertx-module $ vertx runmod com.mycompany~my-module~0.0.1 -cp bin
Verticle start. 

ブラウザでアクセスすると「Hello World!」が返ってきます。次は試しにEclipse上でHelloWorldVerticle.javaを以下のように変更し、保存します。

    vertx.createHttpServer().requestHandler(new Handler<HttpServerRequest>() {
      public void handle(HttpServerRequest req) {
        req.response().headers().set("Content-Type", "text/plain");
        //req.response().end("Hello World!");
        req.response().end("Hello World!!!");
      }
    }).listen(8080);

保存すればEclipseが自動的にコンパイルをするので、binディレクトリ以下のclassファイルが更新されます。
そうするとvertxのログに以下のように出力され、変更を検知したvertxがauto-deployしたことが分かります。

ken@my-vertx-module $ vertx runmod com.mycompany~my-module~0.0.1 -cp bin
Verticle start. 
Module com.mycompany~my-module~0.0.1 has changed, reloading it. 
Verticle start. 

ブラウザを実際に再読込させてみると「Hello World!!!」が返ってきます。う〜む、便利ですね。

6. remote debugging

Moduleを実際にデバッグするには、通常のjavaプログラムと同じようにremote debuggingができます。設定方法は簡単です。「vert.x-2.0.0-final/bin/vertx」の冒頭部分にJVM_OPTSを定義する箇所がありますので、以下のように定義します。

#JVM_OPTS="-XX:+CMSClassUnloadingEnabled -XX:-UseGCOverheadLimit"
#JVM_OPTS=""
JVM_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,address=8000,server=y,suspend=n"

これで後は、vertx runmodコマンドを実行し、Eclipseからremote debuggingで8000ポートにアタッチすれば、あとはブレークポイントを設定するなり、なんなりお好きにどうぞ!、という感じです。

長くなりましたが、Vert.xによるModule開発をするための開発環境の構築について説明してきました。ぜひ一度お試し下さい。
私もまだまだ試行錯誤している状態ですので、こんなやり方しているよ!という情報がありましたら、お寄せ頂けるとありがたいです。

Vert.x がいいね!(第1回:入門する)

id:KenichiroMurataです。

最近2.0がリリースされたVert.xについて、皆さんご存じでしょうか?

f:id:acro-engineer:20131221154226j:plain:w200

Vert.xは一言で表現すると、

Vert.x is a polyglot, non-blocking, event-driven application platform that runs on the JVM.

です(公式サイトより引用)。

JVM上でのnon-blockingでevent-drivenなプラットフォームな所が私のお気に入りポイントです!まぁ、Java屋さんですからね。うちは。
そんなお気に入りのVert.xについて、これから数回に分けて本ブログにて記事を書いて行こうと思います。

Vert.xを知る

Vert.xを知るには、ドキュメントが充実した公式サイトを読むのが一番です。ですが、ボリュームがかなりあるので、オススメのサイトを紹介します。

多すぎてボリュームあるじゃんという突っ込みはなしで(^^;

Vert.xの良いところ

まぁ、たくさんあるんですけど、個人的には次の5つです。

  1. asynchronous programming model for writing scalable non-blocking applications
  2. advantage of the JVM and scales seamlessly over available cores
  3. a distributed event bus
  4. a powerful module system
  5. polyglot

1. asynchronous programming model for writing scalable non-blocking applications

Node.jsが人気な理由も同じですが、ノンブロッキングな非同期APIにより、少ないリソースで多数のクライアントからの要求を処理できます。

参照: C10K問題

Vert.xによるアプリケーションはVerticleと呼ばれる実行可能なパッケージを作ってVert.x Instance(Vert.xのプロセス)上で動作させるのですが、このVerticleは必ず1つのEvent Loop(スレッド)が割り当てられます。そのため、Verticleのコードを書く上では、同期や排他などを考える必要がなく、ソースコードが簡単になります。

2. advantage of the JVM and scales seamlessly over available cores

Node.jsはシングルスレッドで処理するので、マルチスレッドに処理したい場合はclusterを使って別のプロセスを立てる必要があります。ですが、Vert.xではJVMによるマルチスレッドが利用できるので、1つのVert.x Instanceに対して、起動時にinstance数を指定することで、マルチスレッドで動作するようになり、マルチコアリソースを簡単に活用することができます。さらに、異なるサーバ上にVert.x Instanceを動作させて、clusterを組むことも簡単です。

3. a distributed event bus

Vert.xはアーキテクチャの土台としてdistributed event busを持っています。このevent busを使うことで、作成したVerticleや1つ以上のVerticleをまとめたモジュール(Module)間を疎結合につなぐことができます。event busはPoint to PointとPub/Subメッセージングが使えます。メッセージにはいろいろな型が使えますが、Vert.xでは多言語でコードを書くことができるので、JSONを使うのが一番良いようです。

複数のVert.x Instanceをクラスタ化するのにもこのevent busを使います。また、event bus bridgeを使うと、クライアント側のjsともevent busを共有できるようになります。

4. a powerful module system

Vert.xは強力なモジュールシステムを持っています。モジュールはmavenBintrayに公開することができ、さらに、そこからインストールすることができます。Vert.x Module Registryには既に以下のようなモジュールが公開されています。

  • io.vertx~mod-web-server
  • io.vertx~mod-mongo-persistor
  • io.vertx~mod-auth-mgr
  • io.vertx~mod-work-queue
  • com.jetdrone~mod-redis-io
  • com.jetdrone~yoke(Node.jsのConnect相当)

5. polyglot

JVMを使って多言語対応をしているため、Vert.xのVerticleはJava、Groovy、JavaScriptRubyPythonで開発することができます。さらに近々Scala、Clojureもサポートされるとアナウンスされています。Vert.x 2.0からは各言語用の機能も上述のモジュールとして開発されており、必要なものだけを使うことができます。

個人的にはJavaで書ければ良いのですが、例えば他の人がJavaScriptやScalaを使って開発した優れたモジュールがあれば、それをevent busを介して利用できるのは素晴らしいことです。

その他

あげればキリがないですが、その他にも以下のような「かゆい所に手が届く」機能がたくさんあります。

  • configにはjsonを利用
  • loggerの提供
  • Vert.x Instance内で共有できるShareData
  • Verticle Instanceを複数起動する場合に、同一ポートをバインドしていても、Vert.x側で吸収してラウンドロビンに処理を振り分けてくれる
  • EclipseIntelliJ IDEAでの開発サポート
  • auto-redeploy
  • テストフレームワークJUnit拡張)

開発者とコミュニティ

開発をリードしているのは@timfoxさんで、この方はRabbitMQの開発者でもあります。@normanmaurerさんはNettyの開発者でもあり、Vert.xはNettyと密接に連携しながら開発されています。二人とも現在はRedHat所属のようです。

コミュニティはGoogle Groupsを中心に作られているのですが、このフォーラムの最初にある注意書きがステキです。

Please don't use StackOverflow to ask Vert.x questions - ask them here!

この注意書きのおかげで(?)、情報がバラバラにならずに、このフォーラムに集まっています。1日の投稿数も多くて活発であり、レスポンスも非常に早いです。私も少しお世話になりました。

また、つい最近Vert.xプロジェクトはEclipse Foundationの傘下になりました。

参照: eclipse/vert.x · GitHub

Vert.xでHello World

実際にVert.xによる開発ってどんな感じ?かは公式サイトをご覧下さい。Node.jsでも紹介されるようなweb-serverのコードを、JavaJavaScript、Groovy、RubyPythonで書いた例が紹介されています。

まぁ、これだけだとこの記事的に寂しいので、JavaによるHello Worldのコードをのせておきます。

public class MainVerticle extends Verticle {

  public void start() {
    container.logger().info("MainVerticle start.");

    vertx.createHttpServer().requestHandler(new Handler<HttpServerRequest>() {
      public void handle(HttpServerRequest req) {
        req.response().headers().set("Content-Type", "text/plain");
        req.response().end("Hello World!");
      }
    }).listen(8080);
  }
}

Vert.xを利用する場面

Vert.xがどのようなアプリケーションに向いているか?というと、WebSocket/SockJSを使ったリアルタイムWebアプリケーションのサーバ、RESTful APIによるサービスの実装とかでしょう。少ないリソース(サーバ)で多数のクライアントを処理することが可能であり、さらにスケールすることも容易という特長を活かしたい場面に使うと良いと思います。

後は、サーバ側でのクローラ−、Gatewayといったものを開発するのにも向いていると思います。

一般的なWebアプリケ-ションには向いていないとの声もあるようですが、Node.jsにExpressがあるように、Vert.xにはweb-serverやYokeといったモジュールもあるため、問題なく使えると思います。

Vert.xで注意すべきこと

良いことばかり書いてきたので、私の考える注意すべきこともあげておきましょう。でも、書いてみるとVert.x特有というよりは、non-blocking, event-driven application platformを使う場合に注意すべきことですね。

1. コードのネストが深くなる

非同期処理のコールバックを受けるハンドラを登録し、そのハンドラの中で、次の処理のコールバックのハンドラを登録して・・・となりがちです。これはコードの書き方で工夫すべき点ですが、何も考えないと見通しの悪いコードになってしまいます。

2. ブロックするコードを書いてはいけない

1つのEvent Loopを複数のVerticleが共有するため、ブロックするコードを書いてしまうと、他のVerticleの処理も影響を受けてしまいます。Thread.sleep()、Object.wait()やCPUヘビーな処理、ブロックするライブラリの呼び出し(例えばJDBC)はやめてね、とドキュメントにも書かれています。

じゃあ、実際に必要になったらどうすれば良いの?というと、Vert.xではWorker VerticleというEvent Loopとは別のスレッドプール(Worker Pool)上で動作するVerticleが用意されているので、これを使うことでEvent Loopからブロック処理を切り離すことができます。橋渡しはもちろんevent busで。

3. ステートフルな処理は状態遷移に注意すること

モジュール内部に状態を持つような機能を開発する場合には、状態遷移表/状態遷移図などを使った十分な設計と、ステートマシン的な実装上の仕組みを用意して開発しないと、後で大変なことになります。

次回は?

実際にインストールから開発環境の構築、デバッグなどの方法について書く予定です。

それでは!