今回も引き続きØMQの話です。
簡単なサンプルプログラムを動かしながら、ØMQの内部動作を探っていきます。
なお、この連載は、あくまで「内部を探る」が目的のため、ØMQの説明では必ずといって良いほど出てくるtopologyの説明は、割愛しちゃいます。
ちょっと拍子抜けかも知れませんが、マニュアルを参考にしてくださいね。
最初のサンプルコード
2回目にして最初のサンプルコードです。
ØMQではいろいろな通信方式・構成を選べるのですが、まずはネットワークプログラムを書く人間なら誰しも書く、シンプルなサーバ・クライアント形式のプログラムを書いてみます。
今回は簡単のために素のC言語のAPIを触っています。
Javaなどの他の言語でもあまり変わらないので、安心してください。
サーバ側
まずはサーバ側です。
ソケットプログラミングを経験した人ならば容易に想像がつく実装だと思います。
ここらへんはドキュメントにも「BSD socketに似せている」と書いてあるとおり親しみやすいです。
あ、説明を簡潔にするため、エラー処理とかとことん省いています。
そのまま使うとリソースリークしますので、ご注意してください。
#include <zmq.h> int main () { // 0MQの初期化 void *context = zmq_ctx_new(); // サーバ用ソケットの作成と初期化 void *responder = zmq_socket(context, ZMQ_REP); zmq_bind(responder, "tcp://*:5555"); while (1) { // 受信する char buffer[10]; zmq_recv(responder, buffer, 10, 0); // 送信する zmq_send(responder, "aaaaa", 5, 0); } return 0; }
送信してきた内容に関わらず"aaaaa"とそっけない結果を返すサーバです。
クライアント側
次はクライアント側です。
こちらも一目でわかるとおり、安心のBSD socket likeな実装です。
#include <zmq.h> int main () { // 0MQの初期化 void *context = zmq_ctx_new(); // クライアント用ソケットの作成と初期化 void *requester = zmq_socket(context, ZMQ_REQ); zmq_connect(requester, "tcp://localhost:5555"); // 送信する zmq_send (requester, "Hello", 5, 0); // 受信する char buffer [10]; zmq_recv (requester, buffer, 10, 0); return 0; }
普通のソケットプログラミングと違うところ
このように一見普通のソケットプログラムと同じように見えるØMQのサンプルですが、ØMQの思想により若干異なる点があります。
少し整理してみましょう。
接続・切断は内部で管理される
通常のソケットプログラミングではサーバが待ちうけしていなければクライアントはタイムアウト後処理を停止します。
サーバが途中で異常終了しても同様ですね。
常に待ち受けを行うためには、アプリケーション側で接続異常・通信異常を考慮した実装が必要になります。
ØMQでは接続・切断部分はの内部で管理されています。
クライアントもサーバも通信をできるように待ち続けます。
例えば上のサンプルでサーバを起動せずにクライアントを起動しても、通信中にサーバを突然停止させても、クライアントはタイムアウトせずにサーバの復帰を待ち続けます。
「とにかく通信したい」という用途で考えると便利な機能です。
一方で、接続・切断をAPIの外から管理することは実質できなくなっています。
機能としては提供されていますが限定的ですので、厳密な接続・切断処理をしたい場合には他のライブラリの使用をお勧めします。
通信方式が変わっても同じAPIを使える
ØMQはTCP以外にUDP, プロセス間通信、プロセス内通信も通信方式として提供しています。
そしてこれらはzmq_bindの文字列を変更することで同じAPIを使って利用することができます。
いろいろな通信方式に定義の変更だけで対応できることは、システム構成で最適な通信方式が変わる可能性のある分散・並列処理環境において非常に楽ですね。
なおプロセス間通信はUNIXドメインソケットを使っているので、現在Windowsでは動作しません。
プロセス内通信は同一プロセス内のスレッド間通信に制限されますのでご注意ください。
既存プロトコルと相互通信できない
手軽に通信ができるØMQを使い始めると「これでHTTP通信をすると便利なのでは?」と思い始める人も多いと思います。
私もそのクチでした。
しかし残念ながらいくらサンプルを書いてリクエストを飛ばしても、正しく通信ができません。
これは0mqが通信データを独自フォーマットに組み込んで通信しているためです。
パケットキャプチャしてみるとわかりますが、"Hello"と"aaaaa"の前1byteに"5"というデータ長が入っていることがわかります。
$ tcpdump -s0 -i lo -X port 5555 @~ hh:mm:ss.334970 IP localhost.60461 > localhost.5555: Flags [P.], seq 13:24, ack 13, win 257, options [nop,nop,TS val 2428461 ecr 2428461], length 11 (省略) 0x0030: 0025 0e2d 0000 0100 0005 4865 6c6c 6f .%.-......Hello hh:mm:ss.335031 IP localhost.5555 > localhost.60461: Flags [P.], seq 15:24, ack 24, win 256, options [nop,nop,TS val 2428461 ecr 2428461], length 9 (省略) 0x0020: 8018 0100 fe31 0000 0101 080a 0025 0e2d .....1.......%.- 0x0030: 0025 0e2d 0100 0005 6161 6161 61 .%.-....aaaaa
おそらく効率的なデータ通信のために必要なフォーマットだったと思うのですが、汎用性がなくなり非常に残念な部分です。
次のバージョンであるversion 3.3以降では汎用的なデータフォーマットでの通信も可能になる予定です。
今からリリースが楽しみな機能なのですが、今は「他のプロトコルと送受信はできない」と考えておいてください。
内部で何が行われているか、確認する
ØMQの内部動作をさらに追うために、クライアントプロセスにstraceをかけてみます。
釈迦に説法かもしれませんが、straceはLinuxのシステムコールを追ったり、プロセス・スレッドの生成を追うために有益なツールです。
$ strace -f -tt -e trace=clone ./client hh:mm:ss.411879 clone(Process 25773 attached (waiting for parent) Process 25773 resumed (parent 25772 ready) child_stack=0x7f5f7d25dfb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f5f7d25e9d0, tls=0x7f5f7d25e700, child_tidptr=0x7f5f7d25e9d0) = 25773 [pid 25772] hh:mm:ss.412076 clone(Process 25774 attached (waiting for parent) Process 25774 resumed (parent 25772 ready) child_stack=0x7f5f7ca5cfb0, flags=CLONE_VM|CLONE_FS|CLONE_FILES|CLONE_SIGHAND|CLONE_THREAD|CLONE_SYSVSEM|CLONE_SETTLS|CLONE_PARENT_SETTID|CLONE_CHILD_CLEARTID, parent_tidptr=0x7f5f7ca5d9d0, tls=0x7f5f7ca5d700, child_tidptr=0x7f5f7ca5d9d0) = 25774
「あれ?」と思う動きをしています。
あれだけ簡単なサンプルコードにも関わらず、2つもスレッドを作っています。
長い引用になるため、省略しますが、サーバが停止していて通信が不可能な場合にも、生成されたスレッドは裏でなにやら処理を行っています。
これはØMQは処理性能を上げるために内部で積極的にスレッドを生成するためです。
このおかげで高速な並列処理の恩恵を受けられますが、単純な処理を行っているつもりでも知らないうちにリソースを使っている可能性があるため、注意が必要です。
PubSubを実現してみる
最初のサンプルがあまりに簡単だったので、次は「MQと言えばこれ!」といってもいい代表的な構成であるPubSubを実現してみます。
既存MQを使っても動かすまでには実装や設定がいろいろ必要になるPubSubですが、ØMQでの実装は拍子抜けするほど簡単です。
サーバ側
今回はサーバ側がPublisher(発信側)と考えます。
zmq_socketにZMQ_PUBと指定していることに注目してください。
後は先ほどのコードを殆ど変わらずに書けてしまいます。
#include <zmq.h> #include <string.h> #include <stdlib.h> int main() { void *context = zmq_ctx_new(); // publisherの初期化 void *publisher = zmq_socket(context, ZMQ_PUB); zmq_bind(publisher, "tcp://*:5556"); while (1) { char buffer[20]; sprintf(buffer, "%d", (int)random()); zmq_send(publisher, buffer, strlen(buffer), 0); } return 0; }
クライアント側
クライアント側ではzmq_socketにZMQ_SUBを指定します。
こちらも殆ど同じ処理になっていますが、少し工夫してフィルターを設定しています。
フィルターは「この文字が含まれているメッセージだけをPublisherから送信してもらう」ための仕組みです。
こうすることでクライアントには、必要なメッセージだけが送信されるようになり、パフォーマンスを稼げます。
#include <zmq.h> #include <string.h> #include <stdlib.h> #include <stdio.h> int main (int argc, char *argv []) { void *context = zmq_ctx_new(); // publisherに接続 void *subscriber = zmq_socket(context, ZMQ_SUB); zmq_connect(subscriber, "tcp://localhost:5556"); // フィルターを設定する if (argc > 1) { zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, argv[1], strlen(argv[1])); } while (1) { // 受信する char buffer[256 + 1]; int size = zmq_recv(subscriber, buffer, 255, 0); buffer[size] = '\0'; puts(buffer); } return 0; }
動かしてみる
サーバを立ち上げ、クライアントを複数立ち上げます。
すべてのクライアントに同じデータが流れれば成功です。
なお、このPubSubですが再送処理やデータの到達を保障する機能はありません。
このため、一部クライアントを一時的に停止させると、そのクライアントへのデータ送信は再度行われず失われてしまいます。
堅牢なデータ送達よりもパフォーマンスを重視しているためだと思います。
すべてのSubscriberにデータ送達することを保障するためには、ØMQの外側で送受信を完了したデータの管理が必要になりますので、ご注意ください。
前回も少し触れましたが、Stormもこの機能を独自に実装しています。
今回のまとめと次回予告
今回はØMQの簡単なサンプルを使ってØMQが提供している機能とその内部処理を覗いてみました。
ØMQがいかに目的を絞り込んで、その分野にとって都合のいい機能を提供しているかが垣間見えたと思います。
- 接続・切断はØMQが内部で管理する
- ØMQは独自のデータ構造をもっているので他のプロトコルとやりとりは(今のところできない)できない
- ØMQは内部で自動的にスレッドを作成する
- ØMQはデータの再送処理を実施しない
次回はさらに深いØMQのこだわりを紹介します。
ATN開催します
来る3/15(金)、ATN(Advanced Tech Night)第6回を開催します!
第6回の今回は、「Java開発者が今から学ぶべき、JavaScriptによるWeb開発」という
タイトルで、Webアプリ初心者の人に向けたJavaScriptを使用した、
グラフィカルでインタラクティブなWebアプリ開発の作り方を解説します。
今回のØMQとは方向性が違いますが、
興味のある方は、ATNDで募集中ですので、ぜひご参加ください。
http://atnd.org/events/37099