読者です 読者をやめる 読者になる 読者になる

Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

RxJavaを使ってCallback Hellから脱出する( Vert.x がいいね!第5回 )

rxjava java vert.x

id:KenichiroMurata(@ )です。

本記事はJava Advent Calendar 2013 - Adventarの6日目の記事です。

f:id:acro-engineer:20131206014754j:plain

皆さん、RxJava 使っていますか?

RxJava はNetflixが開発する Reactive ExtensionsJava版です。Reactive Extensions (Rx)はReactive Programmingを可能にするライブラリです。

私はReactive Programmingとはなんぞや?と語れるほどには詳しくないので、ここでは asynchronous で event-based なプログラムを書くのに便利なライブラリというレベルの紹介とさせて頂きます。:-)

さて、なぜ RxJava なのか?というと Vert.x を使って、asynchronous で eventbusベースのプログラムを書こうとすると、どうしてもコールバックを多段に書くことなり、ネストの深いコード(Callback Hell)になってしまうのを改善したいからです。

そこで本記事では、Vert.xとVert.xにてRxJavaを使えるようにするためのモジュールである mod-rxvertx を使って、Callback Hellからいかに脱出できるのか?を試したいと思います。

ちなみに、mod-rxvertxはVert.xのAPIをObservableを返すようにラップしてくれるライブラリで、現在では以下のAPIに対応しています。

  • EventBus
  • HttpServer/HttpClient
  • NetServer/NetClient
  • Timer

また、RxJavaそのものについては、Wikiページが充実していますので、ぜひご覧下さい。


目次は以下の通りです。

  1. 環境
  2. BusModとIntegration Testを"Rx-ify"してみる
  3. Callback Hellはいかに解決されたか
  4. RxJavaを使うとできること
  5. まとめ

1. 環境

本記事で試した環境について説明します。

  • Vert.x 2.1M1
  • mod-rxvertx 1.0.0-beta2
  • RxJava 0.15.1

また、今回の全ソースはこちらにありますのでcloneしてお試しください。git clone したら以下を実行することで、eclipseからもJUnitテストを動かして試すことができます。

ken@vertx-test $ ./gradlew test
ken@vertx-test $ ./gradlew copyModJson
ken@vertx-test $ ./gradlew eclipse

2. BusModとIntegration Testを"Rx-ify"してみる

それでは、さっそくコードを書いてみましょう。題材は前回のBusModとそのIntegration Testです。

まずはVert.xで通常に書いた場合のコードです。

BusModVerticle.java

public class BusModVerticle extends Verticle {

  public void start() {
    container.logger().info("BusModVerticle start.");

    vertx.eventBus().registerHandler("muraken720.vertx.mod.testexample",
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> message) {
            ConcurrentMap<String, String> map = vertx.sharedData().getMap(
                "muraken720.testexample");

            JsonObject json = message.body();

            if ("add".equals(json.getString("action"))) {
              String key = json.getString("key");
              String value = json.getString("value");
              map.put(key, value);
            } else {
              message.reply(new JsonObject().putString("status", "error")
                  .putString("message", "unknown action."));
            }

            message.reply(new JsonObject().putString("status", "ok"));
          }
        });
  }
}

これをRxJava(mod-rxvertx)を使って書くと次のようになります。

RxBusModVerticle.java

public class RxBusModVerticle extends Verticle {

  public void start() {
    container.logger().info("RxBusModVerticle start.");

    RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    Observable<RxMessage<JsonObject>> obs = rxEventBus
        .<JsonObject> registerHandler("muraken720.vertx.mod.testexample");

    obs.subscribe(new Action1<RxMessage<JsonObject>>() {
      @Override
      public void call(RxMessage<JsonObject> message) {
        ConcurrentMap<String, String> map = vertx.sharedData().getMap(
            "muraken720.testexample");

        JsonObject json = message.body();

        if ("add".equals(json.getString("action"))) {
          String key = json.getString("key");
          String value = json.getString("value");
          map.put(key, value);
        } else {
          message.reply(new JsonObject().putString("status", "error")
              .putString("message", "unknown action."));
        }

        message.reply(new JsonObject().putString("status", "ok"));
      }
    });
  }
}

EventBusをラップしたRxEventBusを使います。Integration Test側のコードも見てみましょう。

BusModVerticleTest.testAddAction

  @Test
  public void testAddAction() {
    container.logger().info("in testAddAction()");

    JsonObject request = new JsonObject().putString("action", "add")
        .putString("key", "name").putString("value", "@muraken720");

    container.logger().info("request message: " + request);

    vertx.eventBus().send("muraken720.vertx.mod.testexample", request,
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> reply) {
            JsonObject json = reply.body();
            container.logger().info("response message: " + json);

            assertEquals("ok", json.getString("status"));

            ConcurrentMap<String, String> map = vertx.sharedData().getMap(
                "muraken720.testexample");

            assertEquals("@muraken720", map.get("name"));

            testComplete();
          }
        });
  }

これをRxJava(mod-rxvertx)を使って書くと次のようになります。

RxBusModVerticleTest.testAddAction

  @Test
  public void testAddAction() {
    container.logger().info("in testAddAction()");

    JsonObject request = new JsonObject().putString("action", "add")
        .putString("key", "name").putString("value", "@muraken720");

    container.logger().info("request message: " + request);

    RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    Observable<RxMessage<JsonObject>> obs = rxEventBus.send(
        "muraken720.vertx.mod.testexample", request);

    obs.subscribe(new Action1<RxMessage<JsonObject>>() {
      @Override
      public void call(RxMessage<JsonObject> reply) {
        JsonObject json = reply.body();
        container.logger().info("response message: " + json);

        assertEquals("ok", json.getString("status"));

        ConcurrentMap<String, String> map = vertx.sharedData().getMap(
            "muraken720.testexample");

        assertEquals("@muraken720", map.get("name"));

        testComplete();
      }
    });
  }

いかがでしょうか?あれ、あまり変わらない?

そうなんですよね。最初にRxJavaを使った例を見たとき、言うほどCallback Hellは解消してなくない?と私は思ってしまいました。まぁ、ただこの例があまりにも単純な例なので、その効果もよく分からない、むしろ複雑になっているではないか?とすら思います。

ただ、実際にコードを書いてみると、ネストが1段減るだけでも、書きやすくなったと感じます。

この例ではあまりにも単純なので、もう少し複雑なコードを書いてみましょう。

3. Callback Hellはいかに解決されたか

それでは、今回のBusModに対してシーケンシャルにリクエストを投げるコードを書いてみます。つまり、リクエストを投げて、レスポンスを受けたら次のリクエストを投げる。さらにレスポンスを受けたら次のリクエストを投げるという例です。

まずは、通常のVert.xで書いた場合のコードは次のようになります。

BusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    container.logger().info("in testSerialAction()");

    final EventBus eventBus = vertx.eventBus();
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap(
        "muraken720.testexample");

    // リクエスト1
    eventBus.send("muraken720.vertx.mod.testexample",
        new JsonObject().putString("action", "add").putString("key", "name")
            .putString("value", "@muraken720"),
        new Handler<Message<JsonObject>>() {
          @Override
          public void handle(Message<JsonObject> reply) {
            assertEquals("ok", reply.body().getString("status"));
            assertEquals("@muraken720", map.get("name"));

            // リクエスト2
            eventBus.send(
                "muraken720.vertx.mod.testexample",
                new JsonObject().putString("action", "add")
                    .putString("key", "name").putString("value", "Kenichiro"),
                new Handler<Message<JsonObject>>() {
                  @Override
                  public void handle(Message<JsonObject> reply) {
                    assertEquals("ok", reply.body().getString("status"));
                    assertEquals("Kenichiro", map.get("name"));

                    // リクエスト3
                    eventBus.send(
                        "muraken720.vertx.mod.testexample",
                        new JsonObject().putString("action", "add")
                            .putString("key", "name")
                            .putString("value", "Murata"),
                        new Handler<Message<JsonObject>>() {
                          @Override
                          public void handle(Message<JsonObject> reply) {
                            assertEquals("ok", reply.body().getString("status"));
                            assertEquals("Murata", map.get("name"));

                            testComplete();
                          }
                        });
                  }
                });
          }
        });
  }

ご覧の通り、最初にリクエストを投げてレスポンスを受けるHandlerの中で、次のリクエスト投げる。さらにそのレスポンスを受けるHandlerの中で、次のリクエストを投げる・・・という具合で、どんどんネストが深くなっています。これぞ Callback Hell です。

それではこれをRxJavaを使ってコードを書くと次のようになります。

RxBusModVerticleTest.testSerialAction

  @Test
  public void testSerialAction() {
    container.logger().info("in testSerialAction()");

    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
    final ConcurrentMap<String, String> map = vertx.sharedData().getMap(
        "muraken720.testexample");

    // リクエスト1
    Observable<RxMessage<JsonObject>> obs1 = rxEventBus.send(
        "muraken720.vertx.mod.testexample",
        new JsonObject().putString("action", "add").putString("key", "name")
            .putString("value", "@muraken720"));

    Observable<RxMessage<JsonObject>> obs2 = obs1
        .flatMap(new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() {
          @Override
          public Observable<RxMessage<JsonObject>> call(
              RxMessage<JsonObject> reply) {
            assertEquals("ok", reply.body().getString("status"));
            assertEquals("@muraken720", map.get("name"));

            // リクエスト2
            return rxEventBus.send(
                "muraken720.vertx.mod.testexample",
                new JsonObject().putString("action", "add")
                    .putString("key", "name").putString("value", "Kenichiro"));
          }
        });

    Observable<RxMessage<JsonObject>> obs3 = obs2
        .flatMap(new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() {
          @Override
          public Observable<RxMessage<JsonObject>> call(
              RxMessage<JsonObject> reply) {
            assertEquals("ok", reply.body().getString("status"));
            assertEquals("Kenichiro", map.get("name"));

            // リクエスト3
            return rxEventBus.send(
                "muraken720.vertx.mod.testexample",
                new JsonObject().putString("action", "add")
                    .putString("key", "name").putString("value", "Murata"));
          }
        });

    obs3.subscribe(new Action1<RxMessage<JsonObject>>() {
      @Override
      public void call(RxMessage<JsonObject> reply) {
        assertEquals("ok", reply.body().getString("status"));
        assertEquals("Murata", map.get("name"));

        testComplete();
      }
    });
  }

いかがでしょうか?深くなっていたネストが縦方向に展開されて、ネストが1段で収まっています。少しコードがゴテゴテしてますが、Java8のラムダに対応すれば、もっとスッキリとして見通しがよいコードになりそうです。

4. RxJavaを使うとできること

RxJava にはもっと便利な機能がたくさんあります。ここでmod-rxvertxのテストコード(EventBusIntegrationTest)からいくつか例を紹介します。

Observable.concat

  @Test
  public void testSimpleSerial() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
    final AtomicInteger totalReqs = new AtomicInteger(3);
    final AtomicInteger activeReqs = new AtomicInteger(0);
    
    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        System.out.println("serial-foo["+message.body()+"]");
        message.reply("pong!");
        activeReqs.incrementAndGet();
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.observeSend("foo", "ping!A");
    Observable<RxMessage<String>> obs2 = rxEventBus.observeSend("foo", "ping!B");
    Observable<RxMessage<String>> obs3 = rxEventBus.observeSend("foo", "ping!C");

    Observable<RxMessage<String>> concatenated = Observable.concat(obs1, obs2, obs3);

    concatenated.subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        System.out.println("serial-resp["+message.body()+"]");
        assertEquals("pong!", message.body());
        assertEquals(0,activeReqs.decrementAndGet());
        if (totalReqs.decrementAndGet()==0)
          testComplete();
      }
    });
  }

Observable.concatは2つ以上のObservableをシーケンシャルに実行するように連結してくれます。

ここまで見てくると気づくと思いますが、Observableで処理を書く場合、宣言した処理は直ぐには実行されず、subscribeを呼び出した段階になって実行(遅延実行)されます。コーディングスタイルとしては、Observableに対する処理を宣言的に記述して行き、最後にsubscribeを呼んで処理を実行、という流れになります。

Observable.merge

  @Test
  public void testGather() {

    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> msg) {
        System.out.println("receive");
        msg.reply("pong"+msg.body());
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.send("foo", "A");
    Observable<RxMessage<String>> obs2 = rxEventBus.send("foo", "B");
    Observable<RxMessage<String>> obs3 = rxEventBus.send("foo", "C");
    Observable<RxMessage<String>> merged = Observable.merge(obs1, obs2, obs3);
    
    merged.takeLast(1).subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> message) {
        assertEquals("pongC", message.body());
        testComplete();
      }
    });
  }

Observable.mergeはパラレルに処理を実行し、全ての処理が完了するまで待ちます。

Observable.reduce

  @Test
  public void testConcatResults() {
    final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());

    rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
      @Override
      public void call(RxMessage<String> msg) {
        System.out.println("receive: " + msg.body());
        msg.reply("pong"+msg.body());
      }
    });

    Observable<RxMessage<String>> obs1 = rxEventBus.send("foo", "A");
    Observable<RxMessage<String>> obs2 = rxEventBus.send("foo", "B");
    Observable<RxMessage<String>> obs3 = rxEventBus.send("foo", "C");
    Observable<RxMessage<String>> merged = Observable.merge(obs1, obs2, obs3);
    Observable<String> result = merged.reduce("", new Func2<String, RxMessage<String>, String>() {
      @Override
      public String call(String accum, RxMessage<String> reply) {
        return accum + reply.body();
      }
    });

    result.takeFirst().subscribe(new Action1<String>() {
      @Override
      public void call(String value) {
        assertEquals("pongApongBpongC", value);
        testComplete();
      }
    });
  }

先のObservable.mergeで一括りにしたパラレル処理の処理結果をrudeceで処理してまとめる、なんてこともできます。

ObservableにはTransforming、Filtering、Combiningといった様々な機能がありますので、ぜひWikiページをご覧下さい。

5 まとめ

いかかでしょうか?Vert.xによるプログラミングは asynchronous で eventbusベースなためにネストが深くなり、Callback Hell に陥りがちです。この Callback Hell から脱出する方法の一つとして、ぜひRxJava( mod-rxvertx )を試してみてください。私もまださわり始めたばかりなのですが、いかんせん日本語の情報はまだ少ないので、こんなこともできる!という情報などある方は、このブログのコメントや、tweetで教え頂けると嬉しいです。

See also

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ