読者です 読者をやめる 読者になる 読者になる

Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

"Reactive Streams" の実装はどうなっているの?

Scala Reactive Streams

こんにちは。@です。
夏の日差しが日に日に増し、セミも盛んに鳴きはじめましたが、皆さん夏バテなどされていませんか?

さて、今回の内容は、非同期ストリーム処理界隈で注目の「Reactive Streams」です。

先月のJJUGナイトセミナーでも取り上げられており、私は残念ながら参加はできなかったのですが
後から資料を読んで勉強させていただきました。jjug.doorkeeper.jp

1. Reactive Streamsとは何か?

Reactive Streamsとは、JVM 上でのノンブロッキングなバックプレッシャーを持つ非同期ストリーム処理の標準」で、
様々な非同期ストリーム処理のインタフェースを共通化して標準的に扱えるようにしようというものです。

分散環境での非同期ストリーム処理においては、
「上流のコンポーネント群の方が処理能力が高く、下流のコンポーネント群が処理しきれずに溢れる」というのは
既存の非同期ストリーム処理でかなり頻発する事象でした。
こうなるとネットワーク/システム全体に影響して、システム停止に繋がってしまいます。
私もStormを触っている中で、この問題にかなり悩まされました。


このような中でReactive Streamsが注目され、標準化につながったのだと思います。

なお、Reactive Streamsについては下記の解説記事がとても詳しいです。

okapies.hateblo.jp

okapies.hateblo.jp

takezoe.hatenablog.com

さて今回は、実際にReactive Streamsを実現しているプロダクトを動かしてみたいと思います。

2. 実際にReactive Streamsを実現しているプロダクトは?

実際にReactive Streamsを実現している代表的なプロダクトには下記があります。

というわけで? これらを実際に動かしてみましょう。
今回はAkka Streamsを用いて、RabbitMQをバックエンドとするScalaConsultants/reactive-rabbit · GitHubを用いて試してみます。

3. まずは動かしてみると・・?

というわけで、RabbitMQをローカルマシンにインストールし、reactive-rabbitを動かしてみたのですが、
RabbitMQに対しては何のメッセージの送受信も行われず、以下のコードで「invoices」というキューを作成して
メッセージを投入してみたのですが、やはり何もおきませんでした・・・

import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Sink, Source}
import io.scalac.amqp.Connection


// streaming invoices to Accounting Department
val connection = Connection()
val queue = connection.consume(queue = "invoices")
val exchange = connection.publish(exchange = "accounting_department",
  routingKey = "invoices")

implicit val system = ActorSystem()
implicit val mat = ActorMaterializer()

Source(queue).map(_.message).to(Sink(exchange)).run()

中身を追っていきたい気持ちもありますが、
今回の記事の目的はまず動かすことなので、まずは動かすことを優先します(^^;

4. reactive-rabbitを利用するプロダクトを探してみる

というわけで、更に「reactive-rabbit」を使っているプロダクトを探してみました。
そうすると、Akka-Streamでreactive-rabbitを使っているサンプルがあったため、そちらを見てみました。
Processing RabbitMQ messages using Akka Streams - Typesafe Activator | @typesafe | Typesafe

ただ、こちらはこちらでReactive Streams 0.4.0準拠で、
APIが更新されたReactive Streams 1.0.0では動作しないという状況!

あちこち探してみたのですが、良いサンプルが見つからなかったため、
rabbitmq-akka-streamをReactive Streams 1.0.0準拠するバージョンに更新してみました。

ソースは下記になります。
kimutansk/reactive-streams-example · GitHub

で、実行してみると下記の結果になりました。

[DEBUG] [07/14/2015 05:07:00.259] [main] [EventStream(akka://rabbit-akka-stream)] logger log1-Logging$DefaultLogger started
[DEBUG] [07/14/2015 05:07:00.261] [main] [EventStream(akka://rabbit-akka-stream)] Default Loggers started
05:07:00.686 [rabbit-akka-stream-akka.actor.default-dispatcher-5] INFO  c.g.kimutansk.reactive.ConsumerApp$ - Exchanges, queues and bindings declared successfully.
05:07:00.833 [rabbit-akka-stream-akka.actor.default-dispatcher-5] INFO  c.g.kimutansk.reactive.ConsumerApp$ - Starting the flow
05:07:00.945 [rabbit-akka-stream-akka.actor.default-dispatcher-5] INFO  c.g.kimutansk.reactive.ConsumerApp$ - Starting the trial run
05:07:01.018 [rabbit-akka-stream-akka.actor.default-dispatcher-5] DEBUG c.g.k.reactive.DomainService$ - message: 'message 1'  will be held for 2558 ms
05:07:03.580 [rabbit-akka-stream-akka.actor.default-dispatcher-4] DEBUG c.g.k.reactive.DomainService$ - message: 'message 2'  will be held for 1873 ms
05:07:03.585 [rabbit-akka-stream-akka.actor.default-dispatcher-3] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe'
05:07:03.599 [rabbit-akka-stream-akka.actor.default-dispatcher-7] INFO  c.g.kimutansk.reactive.ConsumerApp$ - 'message 1 [message processed]' delivered to censorship.ok.queue
05:07:05.454 [rabbit-akka-stream-akka.actor.default-dispatcher-6] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe'
05:07:05.454 [rabbit-akka-stream-akka.actor.default-dispatcher-4] DEBUG c.g.k.reactive.DomainService$ - message: 'message 3'  will be held for 1915 ms
05:07:05.458 [rabbit-akka-stream-akka.actor.default-dispatcher-7] INFO  c.g.kimutansk.reactive.ConsumerApp$ - 'message 2 [message processed]' delivered to censorship.ok.queue
05:07:07.371 [rabbit-akka-stream-akka.actor.default-dispatcher-4] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe'
05:07:07.372 [rabbit-akka-stream-akka.actor.default-dispatcher-6] DEBUG c.g.k.reactive.DomainService$ - message: 'message 4'  will be held for 1364 ms
05:07:07.374 [rabbit-akka-stream-akka.actor.default-dispatcher-9] INFO  c.g.kimutansk.reactive.ConsumerApp$ - 'message 3 [message processed]' delivered to censorship.ok.queue
05:07:08.737 [rabbit-akka-stream-akka.actor.default-dispatcher-6] DEBUG c.g.k.reactive.DomainService$ - message: 'message 5'  will be held for 1125 ms
05:07:08.737 [rabbit-akka-stream-akka.actor.default-dispatcher-9] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe'
05:07:08.740 [rabbit-akka-stream-akka.actor.default-dispatcher-9] INFO  c.g.kimutansk.reactive.ConsumerApp$ - 'message 4 [message processed]' delivered to censorship.ok.queue
05:07:09.862 [rabbit-akka-stream-akka.actor.default-dispatcher-6] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe'
05:07:09.865 [rabbit-akka-stream-akka.actor.default-dispatcher-5] INFO  c.g.kimutansk.reactive.ConsumerApp$ - 'message 5 [message processed]' delivered to censorship.ok.queue
05:07:09.865 [rabbit-akka-stream-akka.actor.default-dispatcher-5] INFO  c.g.kimutansk.reactive.ConsumerApp$ - Trial run finished. You can now go to http://localhost:15672/ and try publishing messages manually.

どうやら、RabbitMQに対してメッセージの投入と取得、処理が行われたようです。
実際にRabbitMQの管理画面からもメッセージが送受信された旨が表示され、キューが作られていることがわかります。

f:id:acro-engineer:20150623064313j:plain
f:id:acro-engineer:20150623064515j:plain

また、RabbitMQの「censorship.inbound.queue」というキューにメッセージを投入してみると、
下記のように処理した旨のメッセージも表示されました。

05:07:53.525 [rabbit-akka-stream-akka.actor.default-dispatcher-7] DEBUG c.g.k.reactive.DomainService$ - message: 'Test Input Manual Message'  will be held for 1380 ms
05:07:54.906 [rabbit-akka-stream-akka.actor.default-dispatcher-10] DEBUG c.g.k.reactive.DomainService$ - message classified as 'safe'

というわけで、とりあえずサンプルを動かしてみることは出来ました。
ですが、これだけだと中身でどんな事が行われているかは分かりませんし、
バックプレッシャーを体感することもできません。

なので、次回以降は実際に中身を追っていこうと思います。

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ