前回でStormの動く環境が整いましたので、
次は実際にTopologyを流してみます。
1.Topologyを流す方法は?
Nimbusに対してThriftで特定のフォーマットに沿って通信を行えば、
どんな言語からでもTopologyは流せる・・・と、Storm Wikiには書いてあります。
とはいえ、最初なので一番ポピュラーな方法を取って流してみる事にします。
StormSubmitter#submitTopologyメソッドを用いることで、
実際にStormクラスタに対してTopologyを流すことが可能になっています。
2.実際に流すコードはどうなるの?
Storm-StarterプロジェクトのExclamationTopologyをまずは流してみる事にします。
ExclamationTopologyクラスではmainメソッドの中でTopologyの定義を行っていますが、
同じタイミングでStormクラスタの設定も行うようにします。
#もしかすると、設定自体はもっと少なくてもいい気もしますが。
結果、mainメソッドが下記のようになります。
ExclamationTopology#main
public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("word", new TestWordSpout(), 10); builder.setBolt("exclaim1", new ExclamationBolt(), 3) .shuffleGrouping("word"); builder.setBolt("exclaim2", new ExclamationBolt(), 2) .shuffleGrouping("exclaim1"); Config conf = new Config(); conf.setDebug(true); //★ NimbusのHost、ポート番号を指定 conf.put(Config.NIMBUS_HOST, "192.168.0.101"); conf.put(Config.NIMBUS_THRIFT_PORT, 6627); //★ Topologyに"Storm"という名前を与えて起動 StormSubmitter.submitTopology("Storm", conf, builder.createTopology()); }
その上で、下記の2点を行う必要があります。
流すTopologyをJarファイルにまとめる
Topologyのsubmit時にJarファイルをNimbusに転送する設定を行う
3.流すTopologyをJarファイルにまとめる
当然のことながら、Stormクラスタははじめは
ExclamationTopologyのような個別のTopologyについては知りません。
そのため、Jarファイルを作成し、Topology起動時に流し込む必要があります。
Eclipseからexport>Jar Fileを選択し、Jarファイルを生成します。
尚、StormにおけるTopology開始時にはclassファイルしか伝播しないようです。
resourcesを含めるとStormクラスタ側で
「caution: filename not matched: resources/**」というエラーが出ました。
4.Topologyのsubmit時にJarファイルをNimbusに転送する設定を行う
次はTopologyのsubmit時にJarファイルをNimbusに転送する設定を行います。
設定方法は、JVMの起動オプション「storm.jar」に
Jarファイルのパスを指定すればOKです。
5.StormクラスタにTopologyをsubmit!
これまでで準備は整ったため、ExclamationTopologyクラスを起動します。
すると・・・
コンソールに下記のメッセージが表示されます。
「Finished submitting topology: Storm」とあるため、Topologyのsubmitは成功したようですね。
[f:id:acro-engineer:20120221072947j:image:w640] 1 [main] INFO backtype.storm.StormSubmitter - Jar not uploaded to master yet. Submitting jar... 95 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar E:\\AcroWorks\\StormStarter.jar to assigned location: /usr/local/storm/nimbus/inbox/stormjar-cd7b5cdb-86ff-4479-bdb1-6642edc486cb.jar 113 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /usr/local/storm/nimbus/inbox/stormjar-cd7b5cdb-86ff-4479-bdb1-6642edc486cb.jar 114 [main] INFO backtype.storm.StormSubmitter - Submitting topology Storm in distributed mode with conf {"nimbus.host":"192.168.0.101","topology.debug":true,"nimbus.thrift.port":6627} 644 [main] INFO backtype.storm.StormSubmitter - Finished submitting topology: Storm
TopologySubmit後、Storm UIを再度チェックします。
すると、Topology "Storm"が動作していることが分かります。
Topologyの名前を押下すると、更に詳細情報が確認可能です。
SpoutとBoltの名前を押下すると、各々の詳細情報が表示されます。
これで、StormクラスタにTopologyをsubmitする方法が確立できました。
今後は、もっと詳細を見ていきますね。