Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

データグリッド/キャッシュサーバのInfinispanって何ができるの?

こんばんは。kimukimuです。

今回もテーマが変わりますが、
JBoss Cacheを前身としたOSSのデータグリッド/キャッシュサーバである、
Infinispan」が気になったので使ってみた結果を書いてみます。

1.Infinispanって?

Java的な視点で一言で言ってしまうと、
「リモートアクセス出来て、データの同期、保存が出来るMap」です。

オブジェクトをputすることでメモリ上のキャッシュとして保持されます。

JSR-107(JCACHE - Java Temporary Caching API)
JSR-347(Javaのデータグリッド) に準拠しており、
非常に手軽にMapとして使え、他のキャッシュサーバ(memcachedとか)との互換性もあるようです。

2.インストール方法

・・・とこれだけ書いてもなんだか難しいですよね。なので実際に使ってみます。
Infinispanのホームページに行き、「Download」を押下します。


するとダウンロード一覧が表示されるので、「5.0.1.FINAL」の「Binaries, server and demos」をダウンロードします。
とりあえず、最新版はまだBETA1なので、余計な問題にはまらないために安定版を使います^^;

ダウンロードしたファイルを展開すると、下記のファイルが表示されます。
「infinispan-core.jar」と、「lib」ディレクトリ配下に存在するファイルをJavaクラスパスに
追加することでInfinispanを使うことができます。

で、これから実際に使ってみるわけですが、何か問題が起きた時のためにソースを見れる状態にしておきます。
昔誰かが言っていた、「こんなこともあろうかと!」という奴ですね。え?違う? まぁ、お気になさらず^^;
JBossのMavenリポジトリ
「infinispan-core-5.0.1.FINAL.jar」と「infinispan-core-5.0.1.FINAL-sources.jar」がアップされていますので、
ダウンロードして使用します。

Eclipseプロジェクトを作成して、「infinispan-core-5.0.1.FINAL.jar」と「lib」ディレクトリ配下の
jarファイルをビルドパスに追加します。

その後、ビルドパス設定から「infinispan-core-5.0.1.FINAL-sources.jar」を
「infinispan-core-5.0.1.FINAL.jar」の「Source attachment」として設定します。

すると、Insinispan固有クラスのソースコードEclipseから見れるようになれます。

3.サンプルのコード作成

では、実際のコードを書いてみましょう。
とりあえず、「ローカルホストで複数のInfinispanプロセスが同期する」事を確認するコードを書いてみます。
・・・本当は、他にも最初に作ってみるべきコードがある気もしますが、
とりあえずデモソースから読み解いて作れたものを^^

InfinispanのダウンロードページにあったSourceを参考に、下記のコードを作成しました。

AbstractNode.java
public abstract class AbstractNode {

  private static EmbeddedCacheManager createCacheManager() throws IOException {
    EmbeddedCacheManager result = new DefaultCacheManager(
        "infinispan-replication.xml");
    return result;
  }

  private EmbeddedCacheManager cacheManager = null;

  public AbstractNode() {
    try {
      this.cacheManager = createCacheManager();
    } catch (IOException e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
  }

  protected EmbeddedCacheManager getCacheManager() {
    return cacheManager;
  }
}
PutNode.java(データを投入し続けるノード)
public class PutNode extends AbstractNode {
  public static void main(String[] args) throws Exception {
    new PutNode().run();
  }

  public void run() {
    Cache<String, String> cache = getCacheManager().getCache("StringDemo");

    int index = 0;

    while (true) {
      cache.put("key", Integer.toString(index));
      index++;
      System.out.println(Integer.toString(index));

      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
    }
  }
}
GetNode.java(データを取得して表示し続けるノード)
public class GetNode extends AbstractNode {
  public static void main(String[] args) throws Exception {
    new GetNode().run();
  }

  public void run() {
    Cache<String, String> cache = getCacheManager().getCache("StringDemo");

    while (true) {
      String result = cache.get("key");
      System.out.println(result);

      try {
        Thread.sleep(5000);
      } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
      }
    }
  }
}
infinispan-replication.xml(Infinispan設定ファイル)
<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xsi:schemaLocation="urn:infinispan:config:5.0 http://www.infinispan.org/schemas/infinispan-config-5.0.xsd"
   xmlns="urn:infinispan:config:5.0">
    
   <global>
      <transport>
         <properties>
            <property name="configurationFile" value="jgroups.xml" />
         </properties>
      </transport>
   </global>
    
   <default>
      <!-- Configure a synchronous replication cache -->
      <clustering mode="replication">
         <sync/>
      </clustering>
   </default>
   
</infinispan>
jgroups.xml(ネットワーク設定ファイル)
<config xmlns="urn:org:jgroups" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="urn:org:jgroups file:schema/JGroups-2.8.xsd">
  <UDP mcast_addr="${jgroups.udp.mcast_addr:228.8.8.8}" mcast_port="${jgroups.udp.mcast_port:46655}"
    bind_addr="${jgroups.bind_addr:192.168.0.13}" tos="8" ip_mcast="true"
    ucast_recv_buf_size="20000000" ucast_send_buf_size="640000"
    mcast_recv_buf_size="25000000" mcast_send_buf_size="640000" loopback="false"
    discard_incompatible_packets="true" max_bundle_size="64000"
    max_bundle_timeout="30" ip_ttl="${jgroups.udp.ip_ttl:2}"
    enable_bundling="true" enable_diagnostics="false"

    thread_naming_pattern="pl" thread_pool.enabled="true"
    thread_pool.min_threads="2" thread_pool.max_threads="30"
    thread_pool.keep_alive_time="5000" thread_pool.queue_enabled="false"
    thread_pool.queue_max_size="100" thread_pool.rejection_policy="Discard"

    oob_thread_pool.enabled="true" oob_thread_pool.min_threads="2"
    oob_thread_pool.max_threads="30" oob_thread_pool.keep_alive_time="5000"
    oob_thread_pool.queue_enabled="false" oob_thread_pool.queue_max_size="100"
    oob_thread_pool.rejection_policy="Discard" />

  <PING timeout="3000" num_initial_members="3" />
  <MERGE2 max_interval="30000" min_interval="10000" />
  <FD_SOCK />
  <FD_ALL />
  <BARRIER />
  <pbcast.NAKACK use_stats_for_retransmission="false"
    exponential_backoff="0" use_mcast_xmit="true" gc_lag="0"
    retransmit_timeout="300,600,1200" discard_delivered_msgs="true" />
  <UNICAST timeout="300,600,1200" />
  <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
    max_bytes="1000000" />
  <pbcast.GMS print_local_addr="false" join_timeout="3000"
    view_bundling="true" />
  <UFC max_credits="500000" min_threshold="0.20" />
  <MFC max_credits="500000" min_threshold="0.20" />
  <FRAG2 frag_size="60000" />
  <pbcast.STREAMING_STATE_TRANSFER />
</config>

4.実際に動かしてみます

では、実際に動かしてみます。log4jのエラーはでますが、ひとまず直接関係ないため無視します。

PutNodeを起動すると下記のメッセージが表示されます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14

その状態でGetNodeを起動すると下記のメッセージが表示されます。

13
14

PutNodeで投入されたデータが同期されて、GetNodeで取得できていることがわかりますね。
複数プロセスで同期した場合、データが同期される事が確認できました。

5.何ができるんだろう?

ネットワーク越しでデータを同期してネットワーク内の複数プロセスで同じデータを保持・・・や、
データを分散配置して負荷と容量削減とかいろいろ考えられますね。

ただ、そもそもInfinispanで何ができるかの解説が今回ろくにできていないので、
それは次回にでも。