Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Cassandra1.0性能検証。読み込み性能400%向上ってほんまかいな?!(前半)

こんばんは、ishida(@kojiisd)です。

実は私、Java Advent Calendarの12/12にエントリーしていまして、今回はその一環でも書いています。
Java Advent Caldendarが何なのかについては、
こちらを参照してください。

さて、最近諸事情からCassandraを触ることになったのですが、こんな記事を見つけました。

NoSQLのCassandraが1.0に。読み込み性能は400%向上」-Publickey-

0.7からの性能が大幅アップしているのであれば、試してみたい・・・!
ていうかほんまかいな?!

というわけで、Cassandraの性能検証「0.7 vs 1.0読み込み性能は本当に400%も向上しているのか?」を実施します。
※とはいえ、インストール作業等も備忘録として載せるので、今回は前半戦として
 Clientから一括データ登録処理の作成までを載せます。実際の性能評価はぜひ近いうち公開する後半をお楽しみに。

後半も書きました。こちらをご覧ください。

前提[対象の確認]

まず今回の実験を行うに当たり、それぞれ試したスペックを載せておきます。
性能評価のために、まずはXenServerをインストール、以下のスペックのマシンを用意しました。

CPU Core i3の2コアを割り当て
メモリ 2GB
HDD 160GB
OS Ubuntu desktop 11.10 64bit
Cassandra Version 0.7.10 / 1.0.5

ちなみに、、、Client側は大丈夫なんだろうなぁ?!と
後から言われないために、今回のClientのスペックはこちら。

CPU Core i5 2.83GHz
メモリ 12GB
HDD SSD 128GB
OS Windows 7 Pro

これだけそろえればまずClient側で問題が出ることはないでしょう。

Cassandra1.0のインストール

意外とCassandra1.0のインストールが載っていなかったので、
こちらに載せておきます。

今回はUbuntu上でapt-getをすることでインストールする
方法を選択しました。

まずUbuntu本体をアップデートします。

sudo apt-get update
sudo apt-get upgrade

次に、/etc/apt/source.listの最終行に以下の記述を加えます。

deb http://www.apache.org/dist/cassandra/debian 10x main
deb-src http://www.apache.org/dist/cassandra/debian 10x main

再度apt-get updateを実施すると、PUBLIC_KEYがない、とERRORを返されるので、
以下の操作を行ってください。

gpg --keyserver pgp.mit.edu --recv-keys XXXXXXX
gpg --export --armor XXXXXXX | sudo apt-key add -
(XXXXXXXはapt-get update時に出力されるエラー内に表示されるPUBLIC_KEY)

これでapt-get update後、Cassandraをinstallすれば完了です。

sudo apt-get update
sudo apt-get install cassandra

ちなみにこのやり方だとServiceとして登録されますので、Cassandraの起動/終了はserviceコマンドを使って行います。

sudo service cassandra start|restart|stop

サンプルプログラムの作成

Cassandraの準備ができたらデータをClientから投入するプログラムの作成です。
Cassandra0.7のサンプルはいくらでも手に入るので省略します。
1.0バージョンのコードは、ThriftExampleというThriftのWikiにありました。
(Cassandra本体のページにないのが何とも。。。これ探すの苦労しました。)

さて、とはいえこのままでは1件の投入しかできません。これを改造する必要があります。
batch_mutationというClientクラスのメソッドを使うのですが、若干組み方がやっかいなので、
全体を含めたソースを以下に載せます。(javadocが書かれていないなど、メンテ不足で汚いですが^^;)、
まぁ、サンプルということで(汗)参考にしてください。

public class CClient
{
    /** サーバアドレス(適宜変えてください) */
    private static final String SERVER_ADDR = "192.168.1.155";

    /** デフォルトポート番号 */
    private static final int DEFAULT_PORT = 9160;

    /** データ数(行) */
    private static final int DATA_ROW_NUM = 1000;

    /** 一つのカラムファミリあたりのカラム数 */
    private static final int DATA_COL_NUM = 100;

    /** キースペース */
    private static final String KEYSPACE_NAME = "Samplespace1";

    /** カラムファミリー */
    private static final String COLUMN_PARENT_NAME = "Standard1";

    public static void main(String[] args) throws TException,
	    InvalidRequestException, UnavailableException,
	    UnsupportedEncodingException, NotFoundException, TimedOutException
    {
	TTransport tr = new TFramedTransport(new TSocket(SERVER_ADDR,
		DEFAULT_PORT));
	TProtocol proto = new TBinaryProtocol(tr);
	Cassandra.Client client = new Cassandra.Client(proto);
	tr.open();

	long timestamp = System.currentTimeMillis();
	client.set_keyspace(KEYSPACE_NAME);

	// KeyとカラムファミリのMapのMapを作成
	Map<ByteBuffer, Map<String, List<Mutation>>> rowsMap = new HashMap<ByteBuffer, Map<String, List<Mutation>>>();

	// DATA_ROW_NUM分の行数のデータを作成
	for (int index = 1; index <= DATA_ROW_NUM; index++)
	{
	    // 登録した値からMutationオブジェクトのリストを作成
	    // 100のColumnを作成し、Mutationリストに詰める。
	    List<Mutation> mutationList = createSampleMutationList(
		    DATA_COL_NUM, String.valueOf(index), timestamp);

	    // カラムファミリとMutationのリストのMapを作成
	    Map<String, List<Mutation>> columnFamilyMap = new HashMap<String, List<Mutation>>();
	    columnFamilyMap.put(COLUMN_PARENT_NAME, mutationList);

	    rowsMap.put(toByteBuffer(String.valueOf(index)), columnFamilyMap);
	}

	// 一括登録
	long beforeBatchTime = System.currentTimeMillis();
	client.batch_mutate(rowsMap, ConsistencyLevel.ONE);
	long afterBatchTime = System.currentTimeMillis();

	System.out.println("登録に " + (afterBatchTime - beforeBatchTime)
		+ " ミリ秒かかりました。");

	// 一要素取り出してみる。
	ColumnPath path = new ColumnPath("Standard1");

	// read single column
	path.setColumn(toByteBuffer("Key30"));

	beforeBatchTime = System.currentTimeMillis();
	System.out.println("\n"
		+ client.get(toByteBuffer("1000"), path, ConsistencyLevel.ONE)
		+ "\n");
	afterBatchTime = System.currentTimeMillis();

	System.out.println("検索に " + (afterBatchTime - beforeBatchTime)
		+ " ミリ秒かかりました。");

	tr.close();
    }

    /**
     * バイトコードへの変換
     * 
     * @param value
     * @return
     * @throws UnsupportedEncodingException
     */
    public static ByteBuffer toByteBuffer(String value)
	    throws UnsupportedEncodingException
    {
	return ByteBuffer.wrap(value.getBytes("UTF-8"));
    }

    /**
     * toStringの実装
     * 
     * @param buffer
     * @return
     * @throws UnsupportedEncodingException
     */
    public static String toString(ByteBuffer buffer)
	    throws UnsupportedEncodingException
    {
	byte[] bytes = new byte[buffer.remaining()];
	buffer.get(bytes);
	return new String(bytes, "UTF-8");
    }

    private static Mutation toMutation(final String name, final String value,
	    final long timestamp) throws UnsupportedEncodingException
    {
	Mutation mutation = new Mutation();
	ColumnOrSuperColumn columnOrSuperColumn = new ColumnOrSuperColumn();
	Column column = new Column(toByteBuffer(name));
	column.setValue(toByteBuffer(value));
	column.setTimestamp(timestamp);
	columnOrSuperColumn.setColumn(column);
	mutation.setColumn_or_supercolumn(columnOrSuperColumn);
	return mutation;
    }

    private static List<Mutation> createSampleMutationList(int endNum,
	    String valuePrefix, long timestamp)
	    throws UnsupportedEncodingException
    {
	Map<String, String> keyValueMap = new LinkedHashMap<String, String>();
	List<Mutation> mutationList = new ArrayList<Mutation>();

	for (int index = 1; index <= endNum; index++)
	{
	    String key = "Key" + index;
	    String value = "Value" + index + valuePrefix;

	    keyValueMap.put(key, value);
	}

	for (Entry<String, String> entry : keyValueMap.entrySet())
	{
	    mutationList.add(toMutation(entry.getKey(), entry.getValue(),
		    timestamp));
	}

	return mutationList;
    }
}

自分でも整理のために行やら何やら、RDBの言葉を持ち出しています(^^;あまり気にしないでください。
このDATA_ROW_NUM(行)とDATA_COL_NUM(列)を変化させ、性能評価を行おうと思います。

実行、、、の前の事前準備(外部アクセス許可)

さて、上記プログラムを実行してみましょう。と、その前に。今回はClientからアクセスするため、
上記状態ではCassandraに外部からアクセスできず「connection refused」が発生します。

Cassandra側にClient側からのアクセス用の穴を空けてあげる必要があります。
/etc/cassandra/cassandra.yamlの以下の記述を変更します。

#listen_address: localhost
listen_address: 192.168.1.0
(アドレスは適宜変更してください。)
~~~~~~~
#rpc_address: localhost
rpc_address: 0.0.0.0

今回の変更がベストなのかは未検証ですが、これでClient側からつながるはずです。

実行

いよいよ実行です。

登録に 2399 ミリ秒かかりました。

ColumnOrSuperColumn(column:Column(name:80 01 00 02 00 00 00 03 67 65 74 00 00 00 03 0C 00 00 0C 00 01 0B 00 01 00 00 00 05 4B 65 79 33 30, value:80 01 00 02 00 00 00 03 67 65 74 00 00 00 03 0C 00 00 0C 00 01 0B 00 01 00 00 00 05 4B 65 79 33 30 0B 00 02 00 00 00 0B 56 61 6C 75 65 33 30 31 30 30 30, timestamp:1323648382260))

検索に 72 ミリ秒かかりました。

おぉ!実行されて結果が出ましたね。

これを0.7用のCassandraプログラムとしても組み替えて、0.7用の仮想イメージを用意して、
性能評価に移りたいと思います。

では、前半としてはここまでで。