Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Dempsyアプリケーションを動作させてみます!(ローカル版


こんばんは。kimukimuです。

前回に引き続き、Dempsyです。

今回はサンプルとして挙がっているWordCountのコードの確認と、
実際に動作させることの確認を取ってみます。
とはいえ、単に動かすだけでは何も分からないので、
まずは構成要素とメッセージの流れについて記述しますね。

尚、サンプルソースコードは「Dempsy-examples」の「userguide-wordcount」から取得しています。
ですが、mvnを通してビルドする関係上実行や中身確認がやりにくい点もあったため、
Eclipseプロジェクトにコンバートしたものをアップしています。参考にしてください。

1.Dempsyアプリケーションの構成要素は?

Dempsyアプリケーションの最小構成用として、下記の2要素があるようです。
尚、下記の2要素はDempsyアプリケーションにおいては「Cluster定義」と呼んでいます。
おそらく、アプリケーションの内部の処理群定義のため、
Clusterという記述となっているようにみえます。

Adapter

外部からDempsyアプリケーションに対してメッセージを取得する要素。

MessageProcessor

Adapterを通じて取り込まれたMessageを処理する要素。
多段で定義可能。

上記の2つを組み合わせて実際のアプリケーションを組んでみます。

2.サンプルアプリケーションの構成は?

サンプルコードとしてアップされていたアプリケーションの
構成は下記のようになっています。

上記の図を見てわかるかと思いますが、
Dempsyでは『メッセージクラス』をベースにMessageProcessor間の
ルーティングを行っているようです。
#基本構成においては。ルーティング定義を別途定義可能ではあるようです。

そのため、基本的には同じ型のメッセージを延々処理し続ける
リアルタイプアプリケーション・・・となるようです。

Dempsyの開発者はCEPの定義を
「複数の種類のイベントが飛び交っている状態でリアルタイム処理を行う」とおいているため、
ある意味決まったデータのみを扱うDempsyはCEPでは無い、と置いているようです。
ただその辺は言葉の定義でしかないのですが。

3.Dempsyアプリケーションの各構成要素のソース

では、各構成要素のソースを説明していきます。
上の図の左から解説していきますね。
ただし、Entityは単純なため保持するフィールドのみ説明します。

WordAdapter.java

初期化処理としてデータの取得元となるファイルを読み込みます。
その後、読み込んだ単語を順次送信する動作となります。

public class WordAdaptor implements Adaptor
{
   private static Logger logger = Logger.getLogger(WordAdaptor.class);
   private Dispatcher dempsy;
   private AtomicBoolean running = new AtomicBoolean(false);
   
   @Override
   public void setDispatcher(Dispatcher dispatcher)
   {
      // Dempsyへのデータ投入要素を設定
      this.dempsy = dispatcher; 
   }

   @Override
   public void start()
   {
      // Dempsyアプリケーション開始処理
      try { setupStream(); } 
      catch (IOException ioe)
      {
         logger.fatal("ERROR");
         throw new RuntimeException(ioe);
      }
      
      running.set(true);
      while (running.get())
      {
         String wordString = getNextWordFromSoucre();
         if (wordString == null)
            running.set(false);
         else
            dempsy.dispatch(new Word(wordString));
      }
   }
   
   @Override
   public void stop()
   {
      running.set(false);
   }
   
   private String[] strings;
   int curCount = 0;
   
   private void setupStream() throws IOException
   {
      // データ取得元のファイルを読み込む
      InputStream is = WordAdaptor.class.getClassLoader().getResourceAsStream("AV1611Bible.txt");
      StringWriter writer = new StringWriter();
      IOUtils.copy(is,writer);
      strings = writer.toString().split("\\s+");
   }
   
   private String getNextWordFromSoucre()
   {
      // 読み込んだファイルから順次単語を取得
      String ret = strings[curCount];
      curCount++;
      if (curCount >= strings.length)
         return null;
      return ret;
   }
}
Word.java

文章である「wordText」のみを保持するエンティティです。

WordCount.java

受信したWordオブジェクトを基に数をカウントするMessageProcessorです。
Wordクラスを受信し、CountedWordクラスを送信します。
key毎に生成されるようです。(詳細は未確定)

@MessageProcessor
public class WordCount implements Cloneable
{
  private long count = 0;
  private String myword = null;

  @MessageHandler
  public void countWord(Word word)
  {
     // メッセージ受信処理。出現した単語の数をカウントする
     count++;
  }
  
  @Activation
  public void activate(String key)
  {
     myword = key;
  }
  
  @Output
  public CountedWord outputResults()
  {
     // メッセージ送信処理。単語と数をペアにしたCountedWordを返す。
     return new CountedWord(myword,count);
  }

  public Object clone() throws CloneNotSupportedException
  {
     return (WordCount)super.clone();
  }
}
CountedWord.java

単語と数を保持するエンティティです。

WordRank.java

受信したCountedWordのトップ10を保持するMessageProcessorです。
CountedWordクラスを受信し、結果をコンソールに出力します。

@MessageProcessor
public class WordRank implements Cloneable
{
   private Comparator<CountedWord> comparator = new Comparator<CountedWord>()
   {
      @Override
      public int compare(CountedWord o1, CountedWord o2)
      {
         long o1c = o1.getCount();
         long o2c = o2.getCount();
         return o1c < o2c ? -1 : (o1c > o2c ? 1 : 0);
      }
   };
   
   private TreeSet<CountedWord> topTen = new TreeSet<CountedWord>(comparator);
   
   @MessageHandler
   public void handleCount(CountedWord countedWord)
   {
     // メッセージ受信処理。受信したメッセージをリストに追加
      topTen.add(countedWord);
      if (topTen.size() > 100)
         trim();
   }
   
   @Output
   public void outputResults()
   {
     // メッセージ受信処理。結果をコンソールに出力。
      trim();
      
      for (Iterator<CountedWord> iter = topTen.descendingIterator(); iter.hasNext();)
      {
         CountedWord cur = iter.next();
         System.out.println(cur.getWordText() + ":" + cur.getCount());
      }
      System.out.println("-------------------------");
   }
   
   public Object clone() throws CloneNotSupportedException
   {
      return (WordRank)super.clone();
   }
   
   private void trim()
   {
      TreeSet<CountedWord> newTopTen = new TreeSet<CountedWord>(comparator);
      Iterator<CountedWord> iter = topTen.descendingIterator();
      for (int i=0; i < 10; i++)
         newTopTen.add(iter.next());
      topTen = newTopTen;
   }
}

4.Dempsyアプリケーションの構成定義

3で記述した構成要素群を定義で組み合わせる形になります。
尚、見るとわかりますが、『メッセージの型』については一切定義していません。
メッセージ送信/受信のアノテーションがつけられたメソッドの返り値/引数を
ベースに自動判別して配信される形になります。

DempsyApplicationContext-WordCount.xml
<bean class="com.nokia.dempsy.config.ApplicationDefinition">
  <constructor-arg value="word-count" />
  <property name="clusterDefinitions">
    <list>
      <bean class="com.nokia.dempsy.config.ClusterDefinition">
        <!-- WordAdaptorの定義 -->
        <constructor-arg value="adaptor"/>
        <property name="adaptor">
          <bean class="com.nokia.dempsy.example.userguide.wordcount.WordAdaptor" />
        </property>
      </bean>
      <bean class="com.nokia.dempsy.config.ClusterDefinition">
        <!-- WordCountの定義 -->
        <constructor-arg value="mp"/>
        <property name="messageProcessorPrototype">
          <bean class="com.nokia.dempsy.example.userguide.wordcount.WordCount"/>
        </property>
        <property name="outputSchedule">
           <!-- 1秒に1回結果を出力する旨を定義 -->
           <bean class="com.nokia.dempsy.config.OutputSchedule">
             <constructor-arg index="0" type="long" value="1"/>
             <constructor-arg index="1" type="java.lang.String" value="SECONDS" />
           </bean>
        </property>
       </bean>
      <bean class="com.nokia.dempsy.config.ClusterDefinition">
        <constructor-arg value="word-rank"/>
        <property name="messageProcessorPrototype">
          <bean class="com.nokia.dempsy.example.userguide.wordcount.WordRank"/>
        </property>
        <property name="outputSchedule">
           <!-- 1秒に1回結果を出力する旨を定義 -->
           <bean class="com.nokia.dempsy.config.OutputSchedule">
             <constructor-arg index="0" type="long" value="2"/>
             <constructor-arg index="1" type="java.lang.String" value="SECONDS" />
           </bean>
        </property>
       </bean>
    </list>
  </property>   
</bean>

5.起動方法

まず今回はローカルで実行する場合の起動方法について説明します。
ローカルで実行する場合の起動方法は下記のように「-Dapplication=WordCount」オプションを付けて
「com.nokia.dempsy.spring.RunAppInVm」を起動するのみです。
尚、ローカルで実行する場合は環境構築は不要です。

java -Dapplication=WordCount -cp [classpath] com.nokia.dempsy.spring.RunAppInVm

上記のように起動すると「DempsyApplicationContext-WordCount.xml」が読み込まれて
アプリケーションが起動します。
出力は下記のようになります。

2012-05-04 23:50:57,875 [main] INFO  ClassPathXmlApplicationContext - Refreshing org.springframework.context.support.ClassPathXmlApplicationContext@19e0ff2f: startup date [Fri May 04 23:50:57 JST 2012]; root of context hierarchy
2012-05-04 23:50:57,932 [main] INFO  XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [DempsyApplicationContext-WordCount.xml]
2012-05-04 23:50:58,064 [main] INFO  XmlBeanDefinitionReader - Loading XML bean definitions from class path resource [Dempsy-localVm.xml]
2012-05-04 23:50:58,148 [main] INFO  DefaultListableBeanFactory - Pre-instantiating singletons in org.springframework.beans.factory.support.DefaultListableBeanFactory@c5a67c9: defining beans [com.nokia.dempsy.config.ApplicationDefinition#0,properties,localVMContainerClusterSessionFactory,Dempsy]; root of factory hierarchy
2012-05-04 23:50:58,338 [Adaptor - "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor@41ab11b0" of type "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor"] INFO  Dempsy - Starting adaptor thread for "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor@41ab11b0" of type "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor"
the:494
of:295
and:269
to:255
that:165
in:124
a:99
for:85
it:83
not:82
-------------------------
the:11117
and:6512
of:5950
And:3073
the:3007
and:2754
shall:2012
to:1989
in:1843
that:1821
-------------------------
2012-05-04 23:51:03,188 [Adaptor - "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor@41ab11b0" of type "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor"] INFO  Dempsy - Adaptor thread for "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor@41ab11b0" of type "com.nokia.dempsy.example.userguide.wordcount.WordAdaptor" is shutting down
the:28312
the:17806
and:17256
of:15810
the:11117
and:10382
of:9636
And:7498
and:6512
of:5950
-------------------------
the:51248
the:37593
and:30117
the:28312
of:28163
and:22324
of:21097
the:17806
and:17256
of:15810
-------------------------
the:62600
the:60794
the:51248
and:37820
the:37593
and:36767
of:34513
of:33471
and:30117
the:28312
-------------------------
the:62600
the:60794
the:51248
and:37820
the:37593
and:36767
of:34513
of:33471
and:30117
the:28312
-------------------------
the:62600
the:60794
the:51248
and:37820
the:37593
and:36767
of:34513
of:33471
and:30117
the:28312
-------------------------

途中でファイルの読み込み/カウントが完了して同じ値になっていることが分かりますね。

5.何がうれしいの?

Stormと似ているのですが、リアルタイム処理を行うアプリケーションを
POJOであるMessageProcessorを記述するだけで簡単に構築することが出来るようです。

とりあえず、次回はクラスタにデプロイする方法を調べてみますね。

それでは。