こんにちは。@kimutanskです。
夏の日差しが日に日に増し、セミも盛んに鳴きはじめましたが、皆さん夏バテなどされていませんか?
さて、今回の内容は、非同期ストリーム処理界隈で注目の「Reactive Streams」です。
先月のJJUGナイトセミナーでも取り上げられており、私は残念ながら参加はできなかったのですが
後から資料を読んで勉強させていただきました。jjug.doorkeeper.jp
1. Reactive Streamsとは何か?
Reactive Streamsとは、「JVM 上でのノンブロッキングなバックプレッシャーを持つ非同期ストリーム処理の標準」で、
様々な非同期ストリーム処理のインタフェースを共通化して標準的に扱えるようにしようというものです。
分散環境での非同期ストリーム処理においては、
「上流のコンポーネント群の方が処理能力が高く、下流のコンポーネント群が処理しきれずに溢れる」というのは
既存の非同期ストリーム処理でかなり頻発する事象でした。
こうなるとネットワーク/システム全体に影響して、システム停止に繋がってしまいます。
私もStormを触っている中で、この問題にかなり悩まされました。
このような中でReactive Streamsが注目され、標準化につながったのだと思います。
なお、Reactive Streamsについては下記の解説記事がとても詳しいです。
さて今回は、実際にReactive Streamsを実現しているプロダクトを動かしてみたいと思います。
2. 実際にReactive Streamsを実現しているプロダクトは?
実際にReactive Streamsを実現している代表的なプロダクトには下記があります。
というわけで? これらを実際に動かしてみましょう。
今回はAkka Streamsを用いて、RabbitMQをバックエンドとするScalaConsultants/reactive-rabbit · GitHubを用いて試してみます。
3. まずは動かしてみると・・?
というわけで、RabbitMQをローカルマシンにインストールし、reactive-rabbitを動かしてみたのですが、
RabbitMQに対しては何のメッセージの送受信も行われず、以下のコードで「invoices」というキューを作成して
メッセージを投入してみたのですが、やはり何もおきませんでした・・・
import akka.actor.ActorSystem import akka.stream.ActorMaterializer import akka.stream.scaladsl.{Sink, Source} import io.scalac.amqp.Connection // streaming invoices to Accounting Department val connection = Connection() val queue = connection.consume(queue = "invoices") val exchange = connection.publish(exchange = "accounting_department", routingKey = "invoices") implicit val system = ActorSystem() implicit val mat = ActorMaterializer() Source(queue).map(_.message).to(Sink(exchange)).run()
中身を追っていきたい気持ちもありますが、
今回の記事の目的はまず動かすことなので、まずは動かすことを優先します(^^;
4. reactive-rabbitを利用するプロダクトを探してみる
というわけで、更に「reactive-rabbit」を使っているプロダクトを探してみました。
そうすると、Akka-Streamでreactive-rabbitを使っているサンプルがあったため、そちらを見てみました。
Processing RabbitMQ messages using Akka Streams - Typesafe Activator | @typesafe | Typesafe
ただ、こちらはこちらでReactive Streams 0.4.0準拠で、
APIが更新されたReactive Streams 1.0.0では動作しないという状況!
あちこち探してみたのですが、良いサンプルが見つからなかったため、
rabbitmq-akka-streamをReactive Streams 1.0.0準拠するバージョンに更新してみました。
ソースは下記になります。
kimutansk/reactive-streams-example · GitHub
で、実行してみると下記の結果になりました。
[DEBUG] [07/14/2015 05:07:00.259] [main] [EventStream(akka://rabbit-akka-stream)] logger log1-Logging$DefaultLogger started [DEBUG] [07/14/2015 05:07:00.261] [main] [EventStream(akka://rabbit-akka-stream)] Default Loggers started 05:07:00.686 [rabbit-akka-stream-akka.actor.default-dispatcher-5] INFO c.g.kimutansk.reactive.ConsumerApp$ - Exchanges, queues and bindings declared successfully. 05:07:00.833 [rabbit-akka-stream-akka.actor.default-dispatcher-5] INFO c.g.kimutansk.reactive.ConsumerApp$ - Starting the flow 05:07:00.945 [rabbit-akka-stream-akka.actor.default-dispatcher-5] INFO c.g.kimutansk.reactive.ConsumerApp$ - Starting the trial run 05:07:01.018 [rabbit-akka-stream-akka.actor.default-dispatcher-5] DEBUG c.g.k.reactive.DomainService$ - message: 'message 1' will be held for 2558 ms 05:07:03.580 [rabbit-akka-stream-akka.actor.default-dispatcher-4] DEBUG c.g.k.reactive.DomainService$ - message: 'message 2' will be held for 1873 ms 05:07:03.585 [rabbit-akka-stream-akka.actor.default-dispatcher-3] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe' 05:07:03.599 [rabbit-akka-stream-akka.actor.default-dispatcher-7] INFO c.g.kimutansk.reactive.ConsumerApp$ - 'message 1 [message processed]' delivered to censorship.ok.queue 05:07:05.454 [rabbit-akka-stream-akka.actor.default-dispatcher-6] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe' 05:07:05.454 [rabbit-akka-stream-akka.actor.default-dispatcher-4] DEBUG c.g.k.reactive.DomainService$ - message: 'message 3' will be held for 1915 ms 05:07:05.458 [rabbit-akka-stream-akka.actor.default-dispatcher-7] INFO c.g.kimutansk.reactive.ConsumerApp$ - 'message 2 [message processed]' delivered to censorship.ok.queue 05:07:07.371 [rabbit-akka-stream-akka.actor.default-dispatcher-4] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe' 05:07:07.372 [rabbit-akka-stream-akka.actor.default-dispatcher-6] DEBUG c.g.k.reactive.DomainService$ - message: 'message 4' will be held for 1364 ms 05:07:07.374 [rabbit-akka-stream-akka.actor.default-dispatcher-9] INFO c.g.kimutansk.reactive.ConsumerApp$ - 'message 3 [message processed]' delivered to censorship.ok.queue 05:07:08.737 [rabbit-akka-stream-akka.actor.default-dispatcher-6] DEBUG c.g.k.reactive.DomainService$ - message: 'message 5' will be held for 1125 ms 05:07:08.737 [rabbit-akka-stream-akka.actor.default-dispatcher-9] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe' 05:07:08.740 [rabbit-akka-stream-akka.actor.default-dispatcher-9] INFO c.g.kimutansk.reactive.ConsumerApp$ - 'message 4 [message processed]' delivered to censorship.ok.queue 05:07:09.862 [rabbit-akka-stream-akka.actor.default-dispatcher-6] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe' 05:07:09.865 [rabbit-akka-stream-akka.actor.default-dispatcher-5] INFO c.g.kimutansk.reactive.ConsumerApp$ - 'message 5 [message processed]' delivered to censorship.ok.queue 05:07:09.865 [rabbit-akka-stream-akka.actor.default-dispatcher-5] INFO c.g.kimutansk.reactive.ConsumerApp$ - Trial run finished. You can now go to http://localhost:15672/ and try publishing messages manually.
どうやら、RabbitMQに対してメッセージの投入と取得、処理が行われたようです。
実際にRabbitMQの管理画面からもメッセージが送受信された旨が表示され、キューが作られていることがわかります。
また、RabbitMQの「censorship.inbound.queue」というキューにメッセージを投入してみると、
下記のように処理した旨のメッセージも表示されました。
05:07:53.525 [rabbit-akka-stream-akka.actor.default-dispatcher-7] DEBUG c.g.k.reactive.DomainService$ - message: 'Test Input Manual Message' will be held for 1380 ms 05:07:54.906 [rabbit-akka-stream-akka.actor.default-dispatcher-10] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe'
というわけで、とりあえずサンプルを動かしてみることは出来ました。
ですが、これだけだと中身でどんな事が行われているかは分かりませんし、
バックプレッシャーを体感することもできません。
なので、次回以降は実際に中身を追っていこうと思います。
Acroquest Technologyでは、キャリア採用を行っています。
- 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
- 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
- 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
- OSSの開発に携わりたい。
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
キャリア採用ページ