id:KenichiroMurata(@muraken720 )です。
本記事はJava Advent Calendar 2013 - Adventarの6日目の記事です。
皆さん、RxJava 使っていますか?
RxJava はNetflixが開発する Reactive Extensions のJava版です。Reactive Extensions (Rx)はReactive Programmingを可能にするライブラリです。
私はReactive Programmingとはなんぞや?と語れるほどには詳しくないので、ここでは asynchronous で event-based なプログラムを書くのに便利なライブラリというレベルの紹介とさせて頂きます。:-)
さて、なぜ RxJava なのか?というと Vert.x を使って、asynchronous で eventbusベースのプログラムを書こうとすると、どうしてもコールバックを多段に書くことなり、ネストの深いコード(Callback Hell)になってしまうのを改善したいからです。
そこで本記事では、Vert.xとVert.xにてRxJavaを使えるようにするためのモジュールである mod-rxvertx を使って、Callback Hellからいかに脱出できるのか?を試したいと思います。
ちなみに、mod-rxvertxはVert.xのAPIをObservableを返すようにラップしてくれるライブラリで、現在では以下のAPIに対応しています。
- EventBus
- HttpServer/HttpClient
- NetServer/NetClient
- Timer
また、RxJavaそのものについては、Wikiページが充実していますので、ぜひご覧下さい。
目次は以下の通りです。
- 環境
- BusModとIntegration Testを"Rx-ify"してみる
- Callback Hellはいかに解決されたか
- RxJavaを使うとできること
- まとめ
1. 環境
本記事で試した環境について説明します。
- Vert.x 2.1M1
- mod-rxvertx 1.0.0-beta2
- RxJava 0.15.1
また、今回の全ソースはこちらにありますのでcloneしてお試しください。git clone したら以下を実行することで、eclipseからもJUnitテストを動かして試すことができます。
ken@vertx-test $ ./gradlew test ken@vertx-test $ ./gradlew copyModJson ken@vertx-test $ ./gradlew eclipse
2. BusModとIntegration Testを"Rx-ify"してみる
それでは、さっそくコードを書いてみましょう。題材は前回のBusModとそのIntegration Testです。
まずはVert.xで通常に書いた場合のコードです。
BusModVerticle.java
public class BusModVerticle extends Verticle { public void start() { container.logger().info("BusModVerticle start."); vertx.eventBus().registerHandler("muraken720.vertx.mod.testexample", new Handler<Message<JsonObject>>() { @Override public void handle(Message<JsonObject> message) { ConcurrentMap<String, String> map = vertx.sharedData().getMap( "muraken720.testexample"); JsonObject json = message.body(); if ("add".equals(json.getString("action"))) { String key = json.getString("key"); String value = json.getString("value"); map.put(key, value); } else { message.reply(new JsonObject().putString("status", "error") .putString("message", "unknown action.")); } message.reply(new JsonObject().putString("status", "ok")); } }); } }
これをRxJava(mod-rxvertx)を使って書くと次のようになります。
RxBusModVerticle.java
public class RxBusModVerticle extends Verticle { public void start() { container.logger().info("RxBusModVerticle start."); RxEventBus rxEventBus = new RxEventBus(vertx.eventBus()); Observable<RxMessage<JsonObject>> obs = rxEventBus .<JsonObject> registerHandler("muraken720.vertx.mod.testexample"); obs.subscribe(new Action1<RxMessage<JsonObject>>() { @Override public void call(RxMessage<JsonObject> message) { ConcurrentMap<String, String> map = vertx.sharedData().getMap( "muraken720.testexample"); JsonObject json = message.body(); if ("add".equals(json.getString("action"))) { String key = json.getString("key"); String value = json.getString("value"); map.put(key, value); } else { message.reply(new JsonObject().putString("status", "error") .putString("message", "unknown action.")); } message.reply(new JsonObject().putString("status", "ok")); } }); } }
EventBusをラップしたRxEventBusを使います。Integration Test側のコードも見てみましょう。
BusModVerticleTest.testAddAction
@Test public void testAddAction() { container.logger().info("in testAddAction()"); JsonObject request = new JsonObject().putString("action", "add") .putString("key", "name").putString("value", "@muraken720"); container.logger().info("request message: " + request); vertx.eventBus().send("muraken720.vertx.mod.testexample", request, new Handler<Message<JsonObject>>() { @Override public void handle(Message<JsonObject> reply) { JsonObject json = reply.body(); container.logger().info("response message: " + json); assertEquals("ok", json.getString("status")); ConcurrentMap<String, String> map = vertx.sharedData().getMap( "muraken720.testexample"); assertEquals("@muraken720", map.get("name")); testComplete(); } }); }
これをRxJava(mod-rxvertx)を使って書くと次のようになります。
RxBusModVerticleTest.testAddAction
@Test public void testAddAction() { container.logger().info("in testAddAction()"); JsonObject request = new JsonObject().putString("action", "add") .putString("key", "name").putString("value", "@muraken720"); container.logger().info("request message: " + request); RxEventBus rxEventBus = new RxEventBus(vertx.eventBus()); Observable<RxMessage<JsonObject>> obs = rxEventBus.send( "muraken720.vertx.mod.testexample", request); obs.subscribe(new Action1<RxMessage<JsonObject>>() { @Override public void call(RxMessage<JsonObject> reply) { JsonObject json = reply.body(); container.logger().info("response message: " + json); assertEquals("ok", json.getString("status")); ConcurrentMap<String, String> map = vertx.sharedData().getMap( "muraken720.testexample"); assertEquals("@muraken720", map.get("name")); testComplete(); } }); }
いかがでしょうか?あれ、あまり変わらない?
そうなんですよね。最初にRxJavaを使った例を見たとき、言うほどCallback Hellは解消してなくない?と私は思ってしまいました。まぁ、ただこの例があまりにも単純な例なので、その効果もよく分からない、むしろ複雑になっているではないか?とすら思います。
ただ、実際にコードを書いてみると、ネストが1段減るだけでも、書きやすくなったと感じます。
この例ではあまりにも単純なので、もう少し複雑なコードを書いてみましょう。
3. Callback Hellはいかに解決されたか
それでは、今回のBusModに対してシーケンシャルにリクエストを投げるコードを書いてみます。つまり、リクエストを投げて、レスポンスを受けたら次のリクエストを投げる。さらにレスポンスを受けたら次のリクエストを投げるという例です。
まずは、通常のVert.xで書いた場合のコードは次のようになります。
BusModVerticleTest.testSerialAction
@Test public void testSerialAction() { container.logger().info("in testSerialAction()"); final EventBus eventBus = vertx.eventBus(); final ConcurrentMap<String, String> map = vertx.sharedData().getMap( "muraken720.testexample"); // リクエスト1 eventBus.send("muraken720.vertx.mod.testexample", new JsonObject().putString("action", "add").putString("key", "name") .putString("value", "@muraken720"), new Handler<Message<JsonObject>>() { @Override public void handle(Message<JsonObject> reply) { assertEquals("ok", reply.body().getString("status")); assertEquals("@muraken720", map.get("name")); // リクエスト2 eventBus.send( "muraken720.vertx.mod.testexample", new JsonObject().putString("action", "add") .putString("key", "name").putString("value", "Kenichiro"), new Handler<Message<JsonObject>>() { @Override public void handle(Message<JsonObject> reply) { assertEquals("ok", reply.body().getString("status")); assertEquals("Kenichiro", map.get("name")); // リクエスト3 eventBus.send( "muraken720.vertx.mod.testexample", new JsonObject().putString("action", "add") .putString("key", "name") .putString("value", "Murata"), new Handler<Message<JsonObject>>() { @Override public void handle(Message<JsonObject> reply) { assertEquals("ok", reply.body().getString("status")); assertEquals("Murata", map.get("name")); testComplete(); } }); } }); } }); }
ご覧の通り、最初にリクエストを投げてレスポンスを受けるHandlerの中で、次のリクエスト投げる。さらにそのレスポンスを受けるHandlerの中で、次のリクエストを投げる・・・という具合で、どんどんネストが深くなっています。これぞ Callback Hell です。
それではこれをRxJavaを使ってコードを書くと次のようになります。
RxBusModVerticleTest.testSerialAction
@Test public void testSerialAction() { container.logger().info("in testSerialAction()"); final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus()); final ConcurrentMap<String, String> map = vertx.sharedData().getMap( "muraken720.testexample"); // リクエスト1 Observable<RxMessage<JsonObject>> obs1 = rxEventBus.send( "muraken720.vertx.mod.testexample", new JsonObject().putString("action", "add").putString("key", "name") .putString("value", "@muraken720")); Observable<RxMessage<JsonObject>> obs2 = obs1 .flatMap(new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() { @Override public Observable<RxMessage<JsonObject>> call( RxMessage<JsonObject> reply) { assertEquals("ok", reply.body().getString("status")); assertEquals("@muraken720", map.get("name")); // リクエスト2 return rxEventBus.send( "muraken720.vertx.mod.testexample", new JsonObject().putString("action", "add") .putString("key", "name").putString("value", "Kenichiro")); } }); Observable<RxMessage<JsonObject>> obs3 = obs2 .flatMap(new Func1<RxMessage<JsonObject>, Observable<RxMessage<JsonObject>>>() { @Override public Observable<RxMessage<JsonObject>> call( RxMessage<JsonObject> reply) { assertEquals("ok", reply.body().getString("status")); assertEquals("Kenichiro", map.get("name")); // リクエスト3 return rxEventBus.send( "muraken720.vertx.mod.testexample", new JsonObject().putString("action", "add") .putString("key", "name").putString("value", "Murata")); } }); obs3.subscribe(new Action1<RxMessage<JsonObject>>() { @Override public void call(RxMessage<JsonObject> reply) { assertEquals("ok", reply.body().getString("status")); assertEquals("Murata", map.get("name")); testComplete(); } }); }
いかがでしょうか?深くなっていたネストが縦方向に展開されて、ネストが1段で収まっています。少しコードがゴテゴテしてますが、Java8のラムダに対応すれば、もっとスッキリとして見通しがよいコードになりそうです。
4. RxJavaを使うとできること
RxJava にはもっと便利な機能がたくさんあります。ここでmod-rxvertxのテストコード(EventBusIntegrationTest)からいくつか例を紹介します。
Observable.concat
@Test public void testSimpleSerial() { final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus()); final AtomicInteger totalReqs = new AtomicInteger(3); final AtomicInteger activeReqs = new AtomicInteger(0); rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() { @Override public void call(RxMessage<String> message) { System.out.println("serial-foo["+message.body()+"]"); message.reply("pong!"); activeReqs.incrementAndGet(); } }); Observable<RxMessage<String>> obs1 = rxEventBus.observeSend("foo", "ping!A"); Observable<RxMessage<String>> obs2 = rxEventBus.observeSend("foo", "ping!B"); Observable<RxMessage<String>> obs3 = rxEventBus.observeSend("foo", "ping!C"); Observable<RxMessage<String>> concatenated = Observable.concat(obs1, obs2, obs3); concatenated.subscribe(new Action1<RxMessage<String>>() { @Override public void call(RxMessage<String> message) { System.out.println("serial-resp["+message.body()+"]"); assertEquals("pong!", message.body()); assertEquals(0,activeReqs.decrementAndGet()); if (totalReqs.decrementAndGet()==0) testComplete(); } }); }
Observable.concatは2つ以上のObservableをシーケンシャルに実行するように連結してくれます。
ここまで見てくると気づくと思いますが、Observableで処理を書く場合、宣言した処理は直ぐには実行されず、subscribeを呼び出した段階になって実行(遅延実行)されます。コーディングスタイルとしては、Observableに対する処理を宣言的に記述して行き、最後にsubscribeを呼んで処理を実行、という流れになります。
Observable.merge
@Test public void testGather() { final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus()); rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() { @Override public void call(RxMessage<String> msg) { System.out.println("receive"); msg.reply("pong"+msg.body()); } }); Observable<RxMessage<String>> obs1 = rxEventBus.send("foo", "A"); Observable<RxMessage<String>> obs2 = rxEventBus.send("foo", "B"); Observable<RxMessage<String>> obs3 = rxEventBus.send("foo", "C"); Observable<RxMessage<String>> merged = Observable.merge(obs1, obs2, obs3); merged.takeLast(1).subscribe(new Action1<RxMessage<String>>() { @Override public void call(RxMessage<String> message) { assertEquals("pongC", message.body()); testComplete(); } }); }
Observable.mergeはパラレルに処理を実行し、全ての処理が完了するまで待ちます。
Observable.reduce
@Test public void testConcatResults() { final RxEventBus rxEventBus = new RxEventBus(vertx.eventBus()); rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() { @Override public void call(RxMessage<String> msg) { System.out.println("receive: " + msg.body()); msg.reply("pong"+msg.body()); } }); Observable<RxMessage<String>> obs1 = rxEventBus.send("foo", "A"); Observable<RxMessage<String>> obs2 = rxEventBus.send("foo", "B"); Observable<RxMessage<String>> obs3 = rxEventBus.send("foo", "C"); Observable<RxMessage<String>> merged = Observable.merge(obs1, obs2, obs3); Observable<String> result = merged.reduce("", new Func2<String, RxMessage<String>, String>() { @Override public String call(String accum, RxMessage<String> reply) { return accum + reply.body(); } }); result.takeFirst().subscribe(new Action1<String>() { @Override public void call(String value) { assertEquals("pongApongBpongC", value); testComplete(); } }); }
先のObservable.mergeで一括りにしたパラレル処理の処理結果をrudeceで処理してまとめる、なんてこともできます。
ObservableにはTransforming、Filtering、Combiningといった様々な機能がありますので、ぜひWikiページをご覧下さい。
5 まとめ
いかかでしょうか?Vert.xによるプログラミングは asynchronous で eventbusベースなためにネストが深くなり、Callback Hell に陥りがちです。この Callback Hell から脱出する方法の一つとして、ぜひRxJava( mod-rxvertx )を試してみてください。私もまださわり始めたばかりなのですが、いかんせん日本語の情報はまだ少ないので、こんなこともできる!という情報などある方は、このブログのコメントや、tweetで教え頂けると嬉しいです。
See also
- Vert.x がいいね!(第1回:入門する) - Taste of Tech Topics
- Vert.x がいいね!(第2回:開発環境を構築する) - Taste of Tech Topics
- Vert.x がいいね!(第3回:Event LoopsとVerticle Instances) - Taste of Tech Topics
- Vert.x がいいね!(第4回:テストする) - Taste of Tech Topics
- RxJavaを使ってCallback Hellから脱出する( Java8 ラムダ編 ) - Taste of Tech Topics
Acroquest Technologyでは、キャリア採用を行っています。
- 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
- 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
- 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
- OSSの開発に携わりたい。
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
キャリア採用ページ