昨日はクリスマスイブでしたが、皆さんはどのように過ごされたでしょうか?
私はケーキは買う派ではなく、自分で作る派です。……まぁ、食べるのも自分なのですが……
クリスマスはさておき、今日はKinesisとStormに関する内容です。
AWS re:Invent 2013 で 発表されたAmazon Kinesis が12/17にPublic Betaになり一般公開されましたね!
以下のように、KinesisはStormのコネクタを含んでいる、ということなので、さっそく試してみましょう。
Kinesis does include a connector for porting data to Storm, which AWS General Manager
Amazon’s streaming data service, Kinesis, is now available — Tech News and Analysis
for Data Science Matt Wood said is a possibility in cases where existing Storm users want to keep using it
for processing data while automating the collection with Kinesis.
手始めに、Stormと連携し、Kinesisに投入された文章中の単語出現数をリアルタイムに集計するWordCountのサンプルを作成してみました。
今回使用したソースは全てkimutansk/storm-example-wordcount · GitHubにアップしています。
1.Amazon Kinesis とは?(おさらい)
Amazon Kinesisはストリームデータのリアルタイム処理を行うプラットフォームです。
詳細は下記のページなどで紹介されているため、参照してください。
- Amazon Web Services ブログ: 【AWS発表】 Amazon Kinesis – ストリームデータのリアルタイム処理
- ストリームデータをリアルタイム処理するプラットフォーム「Amazon Kinesis」を発表。1時間あたり5ドルでリアルタイムなツイートデータを分析可能。AWS re:Invent 2013 - Publickey
Amazon Kinesisはストリームデータを処理するキューとして下記のような特徴を持っています。
• 取得した時点でキュー上のメッセージは削除されないため、後で繰り返し取得可能
• あるキューに対して複数のProducer(メッセージ生産者)、Consumer(メッセージ消費者)を紐づけることが可能
加えて、AWSのサービスであるためスケール可能です。
大容量、スケール可能な高速なキューをサービスとして利用でき、
かついくらでも他のコンポーネントから独立してスケール可能というのは非常に大きいと思います。
2.どんな構成になるのか?
3.Amazon Kinesisの設定を行ってみる
3-1.Kinesis用のユーザ権限確認
では、まずAmazon Kinesisの設定を行います。
Amazon Kinesisのページから申し込みを行ったうえでAWS Consoleにログインすると
下記のようにKinesisのサービスが表示されるようになります。
加えて、その状態ですとIAMの設定画面からAmazon Kinesis用の権限テンプレートも
追加されていますので、KinesisにフルアクセスできるGroupを作成し、ユーザに割り振っておきます。
KinesisにフルアクセスできるユーザのaccessKeyとsecretKeyを用いてKinesisのアプリケーション開発を行います。
3-2.Kinesis Streamの作成
次にKinesisのStream(1個のデータストリームを示す)の作成を行います。
Kinesisの画面を表示し、Streamを作成します。
Streamの名前とShard(Stream中のデータパーティション。この数でStreamの性能が決まる)の数も入力します。
Shardの数に応じて性能が決まるので、性能の値もきちんと確認しておきましょう。
その上でCreateボタンを押下すると、Streamの初期化が始まります。
Createボタンを押下してしばらくは「CREATING」という初期化中のステータスとなります。
初期化が完了するとステータスが「ACTIVE」となり、使用可能になります。
4.作成したソースコード
では、実際に作成したソースコードを示します。
開発を行う場合にはMavenに以下の定義を追加しておく必要がありますので、その前提で進めます。
<dependency> <groupId>com.amazonaws</groupId> <artifactId>amazon-kinesis-client</artifactId> <version>1.0.0</version> </dependency>
4-1.KinesisPutter
Kinesisに文章を投入するプログラムです。
KinesisClientを初期化した後Partition用のキーを指定して投入するだけなので、投入側は単純な構成ですね。
- KinesisWordPutter.java
/** * プログラムエントリポイント<br/> * <ul> * <li>起動引数:arg[0] KinesisStream名称(例:TestStream)</li> * <li>起動引数:arg[1] 投入メッセージ数</li> * <li>起動引数:arg[2] 送信メッセージ間隔(ms)</li> * </ul> * * @param args * 起動引数 * @throws InterruptedException 割り込み発生時 * @throws UnsupportedEncodingException 文字コード不正時 */ public static void main(String... args) throws InterruptedException, UnsupportedEncodingException { // KinesisClient初期化 AmazonKinesisClient client = new AmazonKinesisClient(new ClasspathPropertiesFileCredentialsProvider()); String streamName = args[0]; int putCount = Integer.valueOf(args[1]); long interval = Long.valueOf(args[2]); Random random = new Random(); int sentenceNum = SENTENCES.length; // 送信メッセージ数の数だけランダムで文章一覧から取得し、Kinesisに送信 for (int count = 0; count < putCount; count++) { int sentenceIndex = random.nextInt(sentenceNum); String putSentence = SENTENCES[sentenceIndex]; PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName(streamName); putRecordRequest.setData(ByteBuffer.wrap(String.format(putSentence).getBytes("UTF-8"))); putRecordRequest.setPartitionKey(String.format("partitionKey-%d", count)); PutRecordResult putRecordResult = client.putRecord(putRecordRequest); System.out.println("Successfully putrecord, partition key : " + putRecordRequest.getPartitionKey() + ", ShardID : " + putRecordResult.getShardId()); TimeUnit.MILLISECONDS.sleep(interval); } }
4-2.KinesisSpout
Kinesisから文章を取得するStormSpoutです。
こちらは多少複雑で、初めにStream/Shardの情報を取得した上でShardのIteratorを取得し、
Iteratorを用いて文章を取得する形になります。
また、Kinesisは先ほどのコンソールでわかったかと思いましたがReadのリクエスト処理数が少ない(秒間5リクエスト)ため、
1リクエストでまとめてデータを取得し、処理する必要が出てきます。
その関係上、Stormの処理モデル(nextTupleメソッド1回呼び出しあたり1Tuple1Emit)に合わせるため、
Kinesisから取得したレコードを一度リストに蓄積しておき、リストから1Tupleずつ取り出し、Boltに流す構成になっています。
- KinesisWordPutter.java
/** * {@inheritDoc} */ @SuppressWarnings({ "rawtypes" }) @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { this.collector = collector; this.taskIndex = context.getThisTaskIndex(); this.kinesisClient = new AmazonKinesisClient(new ClasspathPropertiesFileCredentialsProvider()); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName(this.streamName); describeStreamRequest.setLimit(this.maxShardCount); DescribeStreamResult describeStreamResult = this.kinesisClient.describeStream(describeStreamRequest); this.shards = describeStreamResult.getStreamDescription().getShards(); GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(this.streamName); getShardIteratorRequest.setShardId(shards.get(this.taskIndex).getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = this.kinesisClient.getShardIterator(getShardIteratorRequest); this.shardIterator = getShardIteratorResult.getShardIterator(); this.decoder = Charset.forName("UTF-8").newDecoder(); this.gettedRecords = Lists.newArrayList(); } /** * {@inheritDoc} */ @Override public void nextTuple() { if (this.gettedRecords.isEmpty()) { GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(this.shardIterator); getRecordsRequest.setLimit(this.maxGetRecordNum); GetRecordsResult getRecordsResult = this.kinesisClient.getRecords(getRecordsRequest); this.gettedRecords.addAll(getRecordsResult.getRecords()); } if (this.gettedRecords.isEmpty()) { return; } Record emitRecord = this.gettedRecords.remove(0); this.sequence = emitRecord.getSequenceNumber(); String sentence = null; try { sentence = this.decoder.decode(emitRecord.getData()).toString(); } catch (CharacterCodingException ex) { throw new RuntimeException(ex); } this.collector.emit(new Values(sentence), emitRecord.getSequenceNumber()); }
5.実際に動かしてみる
5-1.KinesisPutter
KinesisPutterを動かすと、下記のようにPutしたメッセージが表示されます。
PartitionKeyに合わせてShardIdが分散される構成になっていますね。
(省略) Successfully putrecord, partition key : partitionKey-1299, ShardID : shardId-000000000001 Successfully putrecord, partition key : partitionKey-1300, ShardID : shardId-000000000000 Successfully putrecord, partition key : partitionKey-1301, ShardID : shardId-000000000001 Successfully putrecord, partition key : partitionKey-1302, ShardID : shardId-000000000001 (省略)
5-2.WordCountTopology
TopologyのDEBUGモードをONにし、Tupleの内容を確認すると単語がカウントされていることがわかります。
(省略) [Thread-25-WordCount] INFO backtype.storm.daemon.task - Emitting: WordCount default [the, 1280] [Thread-33-WordCount] INFO backtype.storm.daemon.task - Emitting: WordCount default [inch, 800] [Thread-25-WordCount] INFO backtype.storm.daemon.task - Emitting: WordCount default [square, 533] [Thread-33-WordCount] INFO backtype.storm.daemon.task - Emitting: WordCount default [inch, 801] (省略)
ということで、KinesisとStormの連携が実現できました。
…ん?Stormのコネクタってどこ?
amazon-kinesis-clientやawslabsのサンプルを見ましたが、Stormと関連するようなコンポーネントは見つからず…
まぁ、ひとまず連携はできたので、今回はOKとしましょう。
6.KinesisとStormを連携してみた感想
実際にKinesisとStormを連携させた結果は、以下のような感じでした。
- 基本的な動作は非常に簡単に実現可能
- 単に1Shardを保持するStreamを作成し、Spoutで取得して処理するだけであれば、ドキュメント&サンプルが充実していることもあり、非常に簡単に実現可能です。
- ドキュメントのコードをコピーして微修正するだけで実現できます。
- 異常系処理等がクラウドによって隠蔽されているため、シンプル
- この手の分散システムを実装する際には必要となる、構成変更時や異常系のハンドリングがクラウドというレイヤを通すことで隠蔽されているため、シンプルに実装出来ます。
なお、KinesisとStormの役割の違いは以下のようになると思います。
- Kinesis
- 分散してデータを収集したり、分配したりする。
- Storm
- ちょっと複雑な演算処理やストリーム処理とバッチ処理の統合を行ったりする。加えて失敗検知、再送管理を行う。
もう年内も残すところ約1週間ですね。
皆さん、残りの仕事を片付けて、すっきりと来年を迎えられるようにしましょう!(はい、自分自身がガンバリマス)
Acroquest Technologyでは、キャリア採用を行っています。
- 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
- 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
- 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
- OSSの開発に携わりたい。
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
キャリア採用ページ