前回に引き続き、Dempsyです。
今回はサンプルとして挙がっているWordCountのコードの確認と、
実際に動作させることの確認を取ってみます。
とはいえ、単に動かすだけでは何も分からないので、
まずは構成要素とメッセージの流れについて記述しますね。
尚、サンプルソースコードは「Dempsy-examples」の「userguide-wordcount」から取得しています。
ですが、mvnを通してビルドする関係上実行や中身確認がやりにくい点もあったため、
Eclipseプロジェクトにコンバートしたものをアップしています。参考にしてください。
1.Dempsyアプリケーションの構成要素は?
Dempsyアプリケーションの最小構成用として、下記の2要素があるようです。
尚、下記の2要素はDempsyアプリケーションにおいては「Cluster定義」と呼んでいます。
おそらく、アプリケーションの内部の処理群定義のため、
Clusterという記述となっているようにみえます。
Adapter
外部からDempsyアプリケーションに対してメッセージを取得する要素。
MessageProcessor
Adapterを通じて取り込まれたMessageを処理する要素。
多段で定義可能。
上記の2つを組み合わせて実際のアプリケーションを組んでみます。
2.サンプルアプリケーションの構成は?
サンプルコードとしてアップされていたアプリケーションの
構成は下記のようになっています。
上記の図を見てわかるかと思いますが、
Dempsyでは『メッセージクラス』をベースにMessageProcessor間の
ルーティングを行っているようです。
#基本構成においては。ルーティング定義を別途定義可能ではあるようです。
そのため、基本的には同じ型のメッセージを延々処理し続ける
リアルタイプアプリケーション・・・となるようです。
Dempsyの開発者はCEPの定義を
「複数の種類のイベントが飛び交っている状態でリアルタイム処理を行う」とおいているため、
ある意味決まったデータのみを扱うDempsyはCEPでは無い、と置いているようです。
ただその辺は言葉の定義でしかないのですが。
3.Dempsyアプリケーションの各構成要素のソース
では、各構成要素のソースを説明していきます。
上の図の左から解説していきますね。
ただし、Entityは単純なため保持するフィールドのみ説明します。
WordAdapter.java
初期化処理としてデータの取得元となるファイルを読み込みます。
その後、読み込んだ単語を順次送信する動作となります。
public class WordAdaptor implements Adaptor { private static Logger logger = Logger.getLogger(WordAdaptor.class); private Dispatcher dempsy; private AtomicBoolean running = new AtomicBoolean(false); @Override public void setDispatcher(Dispatcher dispatcher) { // Dempsyへのデータ投入要素を設定 this.dempsy = dispatcher; } @Override public void start() { // Dempsyアプリケーション開始処理 try { setupStream(); } catch (IOException ioe) { logger.fatal("ERROR"); throw new RuntimeException(ioe); } running.set(true); while (running.get()) { String wordString = getNextWordFromSoucre(); if (wordString == null) running.set(false); else dempsy.dispatch(new Word(wordString)); } } @Override public void stop() { running.set(false); } private String[] strings; int curCount = 0; private void setupStream() throws IOException { // データ取得元のファイルを読み込む InputStream is = WordAdaptor.class.getClassLoader().getResourceAsStream("AV1611Bible.txt"); StringWriter writer = new StringWriter(); IOUtils.copy(is,writer); strings = writer.toString().split("\\s+"); } private String getNextWordFromSoucre() { // 読み込んだファイルから順次単語を取得 String ret = strings[curCount]; curCount++; if (curCount >= strings.length) return null; return ret; } }
Word.java
文章である「wordText」のみを保持するエンティティです。
WordCount.java
受信したWordオブジェクトを基に数をカウントするMessageProcessorです。
Wordクラスを受信し、CountedWordクラスを送信します。
key毎に生成されるようです。(詳細は未確定)
@MessageProcessor public class WordCount implements Cloneable { private long count = 0; private String myword = null; @MessageHandler public void countWord(Word word) { // メッセージ受信処理。出現した単語の数をカウントする count++; } @Activation public void activate(String key) { myword = key; } @Output public CountedWord outputResults() { // メッセージ送信処理。単語と数をペアにしたCountedWordを返す。 return new CountedWord(myword,count); } public Object clone() throws CloneNotSupportedException { return (WordCount)super.clone(); } }
CountedWord.java
単語と数を保持するエンティティです。
WordRank.java
受信したCountedWordのトップ10を保持するMessageProcessorです。
CountedWordクラスを受信し、結果をコンソールに出力します。
@MessageProcessor public class WordRank implements Cloneable { private Comparator<CountedWord> comparator = new Comparator<CountedWord>() { @Override public int compare(CountedWord o1, CountedWord o2) { long o1c = o1.getCount(); long o2c = o2.getCount(); return o1c < o2c ? -1 : (o1c > o2c ? 1 : 0); } }; private TreeSet<CountedWord> topTen = new TreeSet<CountedWord>(comparator); @MessageHandler public void handleCount(CountedWord countedWord) { // メッセージ受信処理。受信したメッセージをリストに追加 topTen.add(countedWord); if (topTen.size() > 100) trim(); } @Output public void outputResults() { // メッセージ受信処理。結果をコンソールに出力。 trim(); for (Iterator<CountedWord> iter = topTen.descendingIterator(); iter.hasNext();) { CountedWord cur = iter.next(); System.out.println(cur.getWordText() + ":" + cur.getCount()); } System.out.println("-------------------------"); } public Object clone() throws CloneNotSupportedException { return (WordRank)super.clone(); } private void trim() { TreeSet<CountedWord> newTopTen = new TreeSet<CountedWord>(comparator); Iterator<CountedWord> iter = topTen.descendingIterator(); for (int i=0; i < 10; i++) newTopTen.add(iter.next()); topTen = newTopTen; } }
4.Dempsyアプリケーションの構成定義
3で記述した構成要素群を定義で組み合わせる形になります。
尚、見るとわかりますが、『メッセージの型』については一切定義していません。
メッセージ送信/受信のアノテーションがつけられたメソッドの返り値/引数を
ベースに自動判別して配信される形になります。
DempsyApplicationContext-WordCount.xml
<bean class="com.nokia.dempsy.config.ApplicationDefinition"> <constructor-arg value="word-count" /> <property name="clusterDefinitions"> <list> <bean class="com.nokia.dempsy.config.ClusterDefinition"> <!-- WordAdaptorの定義 --> <constructor-arg value="adaptor"/> <property name="adaptor"> <bean class="com.nokia.dempsy.example.userguide.wordcount.WordAdaptor" /> </property> </bean> <bean class="com.nokia.dempsy.config.ClusterDefinition"> <!-- WordCountの定義 --> <constructor-arg value="mp"/> <property name="messageProcessorPrototype"> <bean class="com.nokia.dempsy.example.userguide.wordcount.WordCount"/> </property> <property name="outputSchedule"> <!-- 1秒に1回結果を出力する旨を定義 --> <bean class="com.nokia.dempsy.config.OutputSchedule"> <constructor-arg index="0" type="long" value="1"/> <constructor-arg index="1" type="java.lang.String" value="SECONDS" /> </bean> </property> </bean> <bean class="com.nokia.dempsy.config.ClusterDefinition"> <constructor-arg value="word-rank"/> <property name="messageProcessorPrototype"> <bean class="com.nokia.dempsy.example.userguide.wordcount.WordRank"/> </property> <property name="outputSchedule"> <!-- 1秒に1回結果を出力する旨を定義 --> <bean class="com.nokia.dempsy.config.OutputSchedule"> <constructor-arg index="0" type="long" value="2"/> <constructor-arg index="1" type="java.lang.String" value="SECONDS" /> </bean> </property> </bean> </list> </property> </bean>
5.起動方法
まず今回はローカルで実行する場合の起動方法について説明します。
ローカルで実行する場合の起動方法は下記のように「-Dapplication=WordCount」オプションを付けて
「com.nokia.dempsy.spring.RunAppInVm」を起動するのみです。
尚、ローカルで実行する場合は環境構築は不要です。
java -Dapplication=WordCount -cp [classpath] com.nokia.dempsy.spring.RunAppInVm
上記のように起動すると「DempsyApplicationContext-WordCount.xml」が読み込まれて
アプリケーションが起動します。
出力は下記のようになります。
2012-05-04 23:50:57,875 [main] INFO ClassPathXmlApplicationContext - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@19e0ff2f: startup date [Fri May 04 23:50:57 JST 2012]; root of context hierarchy 2012-05-04 23:50:57,932 [main] INFO XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [DempsyApplicationContext-WordCount.xml] 2012-05-04 23:50:58,064 [main] INFO XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [Dempsy-localVm.xml] 2012-05-04 23:50:58,148 [main] INFO DefaultListableBeanFactory - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@c5a67c9: defining beans [com.nokia.dempsy.config.ApplicationDefinition#0,properties,localVMContainerClusterSessionFactory,Dempsy]; root of factory hierarchy 2012-05-04 23:50:58,338 [Adaptor - "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor@41ab11b0" of type "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor"] INFO Dempsy - Starting adaptor thread for "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor@41ab11b0" of type "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor" the:494 of:295 and:269 to:255 that:165 in:124 a:99 for:85 it:83 not:82 ------------------------- the:11117 and:6512 of:5950 And:3073 the:3007 and:2754 shall:2012 to:1989 in:1843 that:1821 ------------------------- 2012-05-04 23:51:03,188 [Adaptor - "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor@41ab11b0" of type "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor"] INFO Dempsy - Adaptor thread for "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor@41ab11b0" of type "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor" is shutting down the:28312 the:17806 and:17256 of:15810 the:11117 and:10382 of:9636 And:7498 and:6512 of:5950 ------------------------- the:51248 the:37593 and:30117 the:28312 of:28163 and:22324 of:21097 the:17806 and:17256 of:15810 ------------------------- the:62600 the:60794 the:51248 and:37820 the:37593 and:36767 of:34513 of:33471 and:30117 the:28312 ------------------------- the:62600 the:60794 the:51248 and:37820 the:37593 and:36767 of:34513 of:33471 and:30117 the:28312 ------------------------- the:62600 the:60794 the:51248 and:37820 the:37593 and:36767 of:34513 of:33471 and:30117 the:28312 -------------------------
途中でファイルの読み込み/カウントが完了して同じ値になっていることが分かりますね。
5.何がうれしいの?
Stormと似ているのですが、リアルタイム処理を行うアプリケーションを
POJOであるMessageProcessorを記述するだけで簡単に構築することが出来るようです。
とりあえず、次回はクラスタにデプロイする方法を調べてみますね。
それでは。