Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Impalaにファイルで投入したデータはいつから検索可能になるの?

こんにちは。kimukimu(@)です。

このエントリはSpark, SQL on Hadoop etc. Advent Calendar 2014 - Qiitaの12/14分です。

多くの人がSQLなら書けるということで、
開発者でなくても自在に検索を定義可能なSQL on Hadoopが今非常に広まりつつあります。
私はそんなSQL on Hadoopの中でImpalaを今使用しているため、Impalaについて書かせていただきますね。
f:id:acro-engineer:20141209070857j:plain

Impalaをデータの集計で使用しているのですが、
その際に「元々存在しているテーブルにデータをファイルで投入する」ことがあります。


ですが、ファイル投入後クエリを実行してみても投入した結果が表示されないことがありました。
かつ、Impalaのサイトを見てもどのタイミングでデータが検索可能になるか、は明確には見つからないんですよね・・・

そのため、「Impalaにファイルで投入したデータはいつから検索可能になるの?」が
気になったので確認してみました。

1.前提環境

今回の確認は下記の環境で確認しています。

  • CDH 5.2.0(Cloudera Managerで構築、完全分散モード)
  • CentOS 6.6

Cloudera Managerを使うとHadoopクラスタ構築が本当に楽になりますね。

2.使用するファイル

実際に投入して確認してみるファイルの内容は下記です。

いつから検索可能になるかを確認できればいい、ということと、
あとは確認をわかりやすくするため、各ファイルには必要最小限のデータのみ定義しています。
■201412141000.csv

2014-12-14 10:00:00,Record1,100,12.34,Record1 Comment

■201412141100.csv

2014-12-14 11:00:00,Record2,200,34.56,Record2 Comment

■201412141200.csv

2014-12-14 12:00:00,Record3,300,45.67,Record3 Comment

■201412141300.csv

2014-12-14 13:00:00,Record4,400,56.78,Record4 Comment

2.ファイル投入後の表示タイミング確認パターン

Impalaに対してファイルで投入したテーブルの内容を表示するための関連オペレーションとして、
下記の4つの処理があります。

  1. Impalaにテーブル(EXTERNAL TABLE)を定義する。
  2. 定義したテーブルに対してクエリを発行し、結果を表示する。
  3. ファイルを投入する。
  4. ファイルを削除する。

すると、ざっと考えて下記くらいのパターンが挙げられると思います。
ファイル追加/ファイル削除の順番入れ替えはパターンも多くなるので省きます。

  • A.テーブル定義→クエリ発行(1→2、基本)
  • B.テーブル定義→ファイル追加→クエリ発行(1→3→2)
  • C.テーブル定義→クエリ発行→ファイル追加→クエリ発行(1→2→3→2)
  • D.テーブル定義→ファイル削除→クエリ発行(1→4→2)
  • E.テーブル定義→クエリ発行→ファイル削除→クエリ発行(1→2→4→2)
  • F.テーブル定義→ファイル追加&削除→クエリ発行(1→3&4→2)
  • G.テーブル定義→クエリ発行→ファイル追加&削除→クエリ発行(1→2→3&4→2)

3.確認結果

では各パターン毎に実際に実行してみて結果がどうなるかを確認してみます。
まず下準備としてimpalaユーザの操作可能なディレクトリ上に先ほど定義したファイルを配置します。
その上でHDFS上にImpalaがアクセス可能な下記のディレクトリを作成しておきます。

/ImpalaTest/TestPattern1
/ImpalaTest/TestPattern2
/ImpalaTest/TestPattern3
/ImpalaTest/TestPattern4
/ImpalaTest/TestPattern5
/ImpalaTest/TestPattern6
/ImpalaTest/TestPattern7

結果上では通常のLinux impalaユーザのコマンドを「>」、impala-shellからの実行コマンドを「[impala:21000] >」と記述しています。
また、impala-shellからimpala-daemonへの接続といった共通コマンドや、冗長な記述は省いています。

3-A.テーブル定義→クエリ発行(1→2、基本)

> hadoop fs -copyFromLocal 20141214* /ImpalaTest/TestPattern1
[impala:21000] > CREATE EXTERNAL TABLE test_pattern_1(record_time TIMESTAMP, record_name STRING, record_value_int INT, record_value_float FLOAT, record_comment STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/ImpalaTest/TestPattern1';
[impala:21000] > select * from test_pattern_1 ORDER BY record_time;
+---------------------+-------------+------------------+--------------------+-----------------+
| record_time         | record_name | record_value_int | record_value_float | record_comment  |
+---------------------+-------------+------------------+--------------------+-----------------+
| 2014-12-14 10:00:00 | Record1     | 100              | 12.34000015258789  | Record1 Comment |
| 2014-12-14 11:00:00 | Record2     | 200              | 34.56000137329102  | Record2 Comment |
| 2014-12-14 12:00:00 | Record3     | 300              | 45.66999816894531  | Record3 Comment |
| 2014-12-14 13:00:00 | Record4     | 400              | 56.77999877929688  | Record4 Comment |
+---------------------+-------------+------------------+--------------------+-----------------+

データを投入してからテーブルを定義し、クエリを発行するという基本パターンですので、当然のことながら全データが表示されます。

3-B.テーブル定義→ファイル追加→クエリ発行(1→3→2)

> hadoop fs -copyFromLocal 2014121410* /ImpalaTest/TestPattern2
> hadoop fs -copyFromLocal 2014121411* /ImpalaTest/TestPattern2
[impala:21000] > CREATE EXTERNAL TABLE test_pattern_2(record_time TIMESTAMP, record_name STRING, record_value_int INT, record_value_float FLOAT, record_comment STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/ImpalaTest/TestPattern2';
> hadoop fs -copyFromLocal 2014121412* /ImpalaTest/TestPattern2
> hadoop fs -copyFromLocal 2014121413* /ImpalaTest/TestPattern2
[impala:21000] > select * from test_pattern_2 ORDER BY record_time;
+---------------------+-------------+------------------+--------------------+-----------------+
| record_time         | record_name | record_value_int | record_value_float | record_comment  |
+---------------------+-------------+------------------+--------------------+-----------------+
| 2014-12-14 10:00:00 | Record1     | 100              | 12.34000015258789  | Record1 Comment |
| 2014-12-14 11:00:00 | Record2     | 200              | 34.56000137329102  | Record2 Comment |
| 2014-12-14 12:00:00 | Record3     | 300              | 45.66999816894531  | Record3 Comment |
| 2014-12-14 13:00:00 | Record4     | 400              | 56.77999877929688  | Record4 Comment |
+---------------------+-------------+------------------+--------------------+-----------------+

テーブル定義後にファイルを追加した場合でも、クエリを発行する前に追加しておけば追加結果が反映されることがわかります。

3-C.テーブル定義→クエリ発行→ファイル追加→クエリ発行(1→2→3→2)

> hadoop fs -copyFromLocal 2014121410* /ImpalaTest/TestPattern3
> hadoop fs -copyFromLocal 2014121411* /ImpalaTest/TestPattern3
[impala:21000] > CREATE EXTERNAL TABLE test_pattern_3(record_time TIMESTAMP, record_name STRING, record_value_int INT, record_value_float FLOAT, record_comment STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/ImpalaTest/TestPattern3';
[impala:21000] > select * from test_pattern_3 ORDER BY record_time;
+---------------------+-------------+------------------+--------------------+-----------------+
| record_time         | record_name | record_value_int | record_value_float | record_comment  |
+---------------------+-------------+------------------+--------------------+-----------------+
| 2014-12-14 10:00:00 | Record1     | 100              | 12.34000015258789  | Record1 Comment |
| 2014-12-14 11:00:00 | Record2     | 200              | 34.56000137329102  | Record2 Comment |
+---------------------+-------------+------------------+--------------------+-----------------+
> hadoop fs -copyFromLocal 2014121412* /ImpalaTest/TestPattern3
> hadoop fs -copyFromLocal 2014121413* /ImpalaTest/TestPattern3
[impala:21000] > select * from test_pattern_3 ORDER BY record_time;
+---------------------+-------------+------------------+--------------------+-----------------+
| record_time         | record_name | record_value_int | record_value_float | record_comment  |
+---------------------+-------------+------------------+--------------------+-----------------+
| 2014-12-14 10:00:00 | Record1     | 100              | 12.34000015258789  | Record1 Comment |
| 2014-12-14 11:00:00 | Record2     | 200              | 34.56000137329102  | Record2 Comment |
+---------------------+-------------+------------------+--------------------+-----------------+

テーブル定義後、クエリを実行した後にファイルを追加した場合は追加結果はそのままでは反映されないようです。
そのため、クエリ初回実行時に実際に検索対象となるデータに対して何かしらのインデックス処理のようなものを施している・・?
尚、これは1日たっても結果はそのままでしたので、タイミング問題、というわけでもないようです。

3-D.テーブル定義→ファイル削除→クエリ発行(1→4→2)

> hadoop fs -copyFromLocal 20141214* /ImpalaTest/TestPattern4
[impala:21000] > CREATE EXTERNAL TABLE test_pattern_4(record_time TIMESTAMP, record_name STRING, record_value_int INT, record_value_float FLOAT, record_comment STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/ImpalaTest/TestPattern4';
> hadoop fs -rm /ImpalaTest/TestPattern4/201412141300.csv
[impala:21000] > select * from test_pattern_4 ORDER BY record_time;
+---------------------+-------------+------------------+--------------------+-----------------+
| record_time         | record_name | record_value_int | record_value_float | record_comment  |
+---------------------+-------------+------------------+--------------------+-----------------+
| 2014-12-14 10:00:00 | Record1     | 100              | 12.34000015258789  | Record1 Comment |
| 2014-12-14 11:00:00 | Record2     | 200              | 34.56000137329102  | Record2 Comment |
| 2014-12-14 12:00:00 | Record3     | 300              | 45.66999816894531  | Record3 Comment |
+---------------------+-------------+------------------+--------------------+-----------------+

削除についても追加と同じく、テーブル定義後クエリ発行前に実施しておけば反映されるようです。

3-E.テーブル定義→クエリ発行→ファイル削除→クエリ発行(1→2→4→2)

> hadoop fs -copyFromLocal 20141214* /ImpalaTest/TestPattern5
[impala:21000] > CREATE EXTERNAL TABLE test_pattern_5(record_time TIMESTAMP, record_name STRING, record_value_int INT, record_value_float FLOAT, record_comment STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/ImpalaTest/TestPattern5';
[impala:21000] > select * from test_pattern_5 ORDER BY record_time;
+---------------------+-------------+------------------+--------------------+-----------------+
| record_time         | record_name | record_value_int | record_value_float | record_comment  |
+---------------------+-------------+------------------+--------------------+-----------------+
| 2014-12-14 10:00:00 | Record1     | 100              | 12.34000015258789  | Record1 Comment |
| 2014-12-14 11:00:00 | Record2     | 200              | 34.56000137329102  | Record2 Comment |
| 2014-12-14 12:00:00 | Record3     | 300              | 45.66999816894531  | Record3 Comment |
| 2014-12-14 13:00:00 | Record4     | 400              | 56.77999877929688  | Record4 Comment |
+---------------------+-------------+------------------+--------------------+-----------------+
> hadoop fs -rm /ImpalaTest/TestPattern5/201412141200.csv
> hadoop fs -rm /ImpalaTest/TestPattern5/201412141300.csv
[impala:21000] > select * from test_pattern_5 ORDER BY record_time;
WARNINGS: Failed to open HDFS file hdfs://cluster1:8020/ImpalaTest/TestPattern5/201412141200.csv
Error(2): No such file or directory
Backend 1:Failed to open HDFS file hdfs://cluster1:8020/ImpalaTest/TestPattern5/201412141200.csv
Error(2): No such file or directory

クエリを実行した後にファイルを削除した場合、その後再度クエリを実行するとファイルが存在しない旨のエラーとなります。
結果も表示されませんでした。
やはりクエリの初回実行時にファイル単位のインデックス作成に近いものを行っていて、
以後クエリを実行した場合はそのインデックスにそって検索を行うようです。

・・と、とりあえずここまでで大体動きの予測はつきますが、念のため追加と削除を両方行うパターンについても確認しておきます。

3-F.テーブル定義→ファイル追加&削除→クエリ発行(1→3&4→2)

> hadoop fs -copyFromLocal 2014121410* /ImpalaTest/TestPattern6
> hadoop fs -copyFromLocal 2014121411* /ImpalaTest/TestPattern6
> hadoop fs -copyFromLocal 2014121412* /ImpalaTest/TestPattern6
[impala:21000] > CREATE EXTERNAL TABLE test_pattern_6(record_time TIMESTAMP, record_name STRING, record_value_int INT, record_value_float FLOAT, record_comment STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/ImpalaTest/TestPattern6';
> hadoop fs -copyFromLocal 2014121413* /ImpalaTest/TestPattern6
> hadoop fs -rm /ImpalaTest/TestPattern6/201412141200.csv
[impala:21000] > select * from test_pattern_6 ORDER BY record_time;
+---------------------+-------------+------------------+--------------------+-----------------+
| record_time         | record_name | record_value_int | record_value_float | record_comment  |
+---------------------+-------------+------------------+--------------------+-----------------+
| 2014-12-14 10:00:00 | Record1     | 100              | 12.34000015258789  | Record1 Comment |
| 2014-12-14 11:00:00 | Record2     | 200              | 34.56000137329102  | Record2 Comment |
| 2014-12-14 13:00:00 | Record4     | 400              | 56.77999877929688  | Record4 Comment |
+---------------------+-------------+------------------+--------------------+-----------------+

予想通り、ではありますが、クエリを初回実行前にファイルの追加削除を両方行っても問題なく検索可能です。

3-G.テーブル定義→クエリ発行→ファイル追加&削除→クエリ発行(1→2→3&4→2)

> hadoop fs -copyFromLocal 2014121410* /ImpalaTest/TestPattern7
> hadoop fs -copyFromLocal 2014121411* /ImpalaTest/TestPattern7
> hadoop fs -copyFromLocal 2014121412* /ImpalaTest/TestPattern7
[impala:21000] > CREATE EXTERNAL TABLE test_pattern_7(record_time TIMESTAMP, record_name STRING, record_value_int INT, record_value_float FLOAT, record_comment STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LOCATION '/ImpalaTest/TestPattern7';
[impala:21000] > select * from test_pattern_7 ORDER BY record_time;
+---------------------+-------------+------------------+--------------------+-----------------+
| record_time         | record_name | record_value_int | record_value_float | record_comment  |
+---------------------+-------------+------------------+--------------------+-----------------+
| 2014-12-14 10:00:00 | Record1     | 100              | 12.34000015258789  | Record1 Comment |
| 2014-12-14 11:00:00 | Record2     | 200              | 34.56000137329102  | Record2 Comment |
| 2014-12-14 12:00:00 | Record3     | 300              | 45.66999816894531  | Record3 Comment |
+---------------------+-------------+------------------+--------------------+-----------------+
> hadoop fs -copyFromLocal 2014121413* /ImpalaTest/TestPattern7
> hadoop fs -rm /ImpalaTest/TestPattern7/201412141200.csv
[impala:21000] > select * from test_pattern_7 ORDER BY record_time;
WARNINGS: Failed to open HDFS file hdfs://cluster1:8020/ImpalaTest/TestPattern7/201412141200.csv
Error(2): No such file or directory
Backend 1:Failed to open HDFS file hdfs://cluster1:8020/ImpalaTest/TestPattern7/201412141200.csv
Error(2): No such file or directory

予想通り、クエリを1回実行後にファイルの追加削除を行った場合、対応できませんでした。

4.テーブル定義後にファイルの追加削除を行った場合結果に反映させるには?

では、ファイルの追加削除を反映させるためにはどうすればいいのでしょうか。
Impalaクエリのマニュアルを見てみると「REFRESH」というコマンドがありました。
REFRESHコマンドを実行することで、追加削除の結果が反映され、現状のファイルに対して検索を行うことが可能でした。

[impala:21000] > REFRESH test_pattern_7;
[impala:21000] > select * from test_pattern_7 ORDER BY record_time;
+---------------------+-------------+------------------+--------------------+-----------------+
| record_time         | record_name | record_value_int | record_value_float | record_comment  |
+---------------------+-------------+------------------+--------------------+-----------------+
| 2014-12-14 10:00:00 | Record1     | 100              | 12.34000015258789  | Record1 Comment |
| 2014-12-14 11:00:00 | Record2     | 200              | 34.56000137329102  | Record2 Comment |
| 2014-12-14 13:00:00 | Record4     | 400              | 56.77999877929688  | Record4 Comment |
+---------------------+-------------+------------------+--------------------+-----------------+

5.確認結果まとめ

これまでの結果をまとめると下記のようになります。

  1. ImpalaはEXTERNALテーブル定義後、クエリを初回実行した際にインデックスのようなものを作成する。
  2. インデックス(?)を作成する前に行ったファイル追加削除はクエリ実行結果に反映される。
  3. インデックス(?)を作成した後に行ったファイル追加削除はクエリ実行時に反映されない。(削除の場合はエラーになる)
  4. REFRESHコマンドを実行することで現状の最新の状況にインデックス(?)が更新される。

テーブル定義後、データを投入しながらインクリメンタルにクエリを発行する場合、
データ投入時の事後処理かクエリ発行時の事前処理でREFRESHコマンドを実行すればOK
となりますね。
これで追加しながら検索、もデータ未反映を気にすることなく実行可能ですね。

小ネタでしたが、以上です。

Acroquest Technologyでは、キャリア採用を行っています。


  • 日頃勉強している成果を、Hadoop、Storm、NoSQL、HTML5/CSS3/JavaScriptといった最新の技術を使ったプロジェクトで発揮したい。
  • 社会貢献性の高いプロジェクトに提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や対外的な勉強会の開催を通した技術の発信や、社内勉強会での技術情報共有により、技術的に成長したい。
  • OSSの開発に携わりたい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。
 キャリア採用ページ

Hadoopのデザインパターン本を紹介。

どうもはじめまして、上田です。
今年も残すところ後2週間をきり、何か遣り残したことがないか気になる今日この頃。
年末に何か洋書の技術書を読んでみるのもいいのでは、と思っています。

最近は大量データ処理周りの本が目に付く気がします。
あまり大量データとは縁のない私ですが、
Hadoop周りのことは社内で使う人も多いので知っておきたいと思い、
ちょうど一ヶ月ほど前に出たこの本を先輩に薦められて買ってみました。

ついに出た、
Hadoopのデザインパターン
今日はこの本についてざっくり紹介したいと思います。

動物本でHadoopなので象が出てくるかと思いましたが、
見ての通りこれは鹿。一応、四不像という種類の鹿で、
角がシカ、頸部がラクダ、蹄がウシ、尾がロバに似ているが、
そのどれでもないと考えられたことが名前(四不像)の由来という説がある、
そうです。

残念ながら、は入っていません。

本の概要はこんな感じ。

File Size: 4466 KB
Print Length: 256 pages
Simultaneous Device Usage: Unlimited
Publisher: O'Reilly Media; 1 edition (November 21, 2012)
Sold by: Amazon Digital Services, Inc.
Language: English

amazonへのリンク↓($32くらいです)
http://www.amazon.com/MapReduce-Design-Patterns-ebook/dp/B00AB61JMQ/ref=sr_1_3?ie=UTF8&qid=1354289371&sr=8-3&keywords=mapreduce+design+patterns

それはさておき、デザインパターンは何が紹介されているのか、
パターンの種類全体を俯瞰するために表を作ってみました。
英訳に慣れていないので、何か直訳っぽい表現が多いのはご愛嬌で。


全23パターン
まとめや構成を説明した章をのぞくと、
ほぼ上記パターンの紹介がこの本の全てを占めています。

一見シンプルそうな集計計算にはかなり力を使って書かれていました。
個人的には、レコードのjoin処理系のパターンが気になっています。

これから気になるパターンをそれぞれ読んでみて、
何か面白いものがあればまた紹介したいと思います。

では今回はこの辺で。

Hadoop MapReduce Job の History ログの見方

今日は。
落合です。

ひしだまさんに引き続き、
2012年12月20日のHadoopアドベントカレンダー2012 #hadoopAC12jp投稿ということで、
Hadoop MapReduce Job の History ログの見方と、
Job実行状況のExcel表示例をご紹介します。

HadoopのMapReduce Jobを実行すると、
JobTrackerサーバのログディレクトリ

/var/log/hadoop/history

に、MapReduceのJob詳細ログがJobごとに作成されます。

WebUIで見ることができるような内容がこのファイルに詰まっています。

Hadoopのバージョンによって、history以下のディレクトリ構成や、
ファイル名の形式が変わっているのが曲者ですが、
とりあえず中を見てみましょう。

このように、一番上の行から順に、
Jobが進むに従って情報が追加されています。

ただ、このまま文字を追ってJobの内容を把握するのは大変ですよね。

赤で囲ったあたりから、
Job ID, Job名、Jobの開始時刻、終了時刻、Jobのステータス、
各 Map, Reduce Task Attempt のID、開始時刻、終了時刻、ステータス
を取り出して、
MapReduce Jobの様子を把握するためにExcelで情報を整形してみましょう。

テキストを解析するのも骨が折れますが、
とりあえず、以下のスクリプトを書いてみました。
引数でログファイル名を指定すると、
標準出力にカンマ区切りの整形済みデータを出力します。
(エラー処理を何もしていないです。悪しからず・・・)

MapReduceJobChart.py 直

#!/usr/bin/python
# -*- coding: utf-8 -*- 

import sys
import re
import datetime

class attempt:
    def __init__(self,attemptID):
        self.attemptID = attemptID
        self.attemptType = None
        self.attemptStart = None
        self.attemptEnd = None
        self.attemptStatus = None

if __name__ == "__main__":
    filepath = sys.argv[1]
    file = open(filepath,"r")
    
    jobID = ""
    jobName = ""
    jobStart = ""
    jobEnd = ""
    jobStatus = ""

    attempts = {}
    
    for row in file:
        if row.startswith("Job"):
            if jobID == "":
                search = re.search('JOBID="(\\w+)"', row)
                if search is not None:
                    jobID = search.group(1)
            if jobName == "":
                search = re.search('JOBNAME="([^"]+)"', row)
                if search is not None:
                    jobName = search.group(1)
            if jobStart == "":
                search = re.search('SUBMIT_TIME="(\\d+)"', row)
                if search is not None:
                    jobStart = datetime.datetime.fromtimestamp(float(search.group(1))/1000)
            if jobEnd == "":
                search = re.search('FINISH_TIME="(\\d+)"', row)
                if search is not None:
                    jobEnd = datetime.datetime.fromtimestamp(float(search.group(1))/1000)
            search = re.search('JOB_STATUS="(\\w+)"', row)
            if search is not None:
                jobStatus = search.group(1)

        if row.startswith("MapAttempt") or row.startswith("ReduceAttempt"):
            searchAttemptID = re.search('TASK_ATTEMPT_ID="(\\w+)"', row)
            attemptID = searchAttemptID.group(1)
            attemptItem = attempts.get(attemptID)
            if attemptItem is None:
                attemptItem = attempt(attemptID);
            if attemptItem.attemptType is None:
                search = re.search('TASK_TYPE="([^"]+)"', row)
                if search is not None:
                    attemptItem.attemptType = search.group(1)
            if attemptItem.attemptStart is None:
                search = re.search('START_TIME="(\\d+)"', row)
                if search is not None:
                    attemptItem.attemptStart = datetime.datetime.fromtimestamp(float(search.group(1))/1000)
            if attemptItem.attemptEnd is None:
                search = re.search('FINISH_TIME="(\\d+)"', row)
                if search is not None:
                    attemptItem.attemptEnd = datetime.datetime.fromtimestamp(float(search.group(1))/1000)
            search = re.search('TASK_STATUS="(\\w+)"', row)
            if search is not None:
                attemptItem.attemptStatus = search.group(1)
            attempts[attemptID] = attemptItem

    print jobID + ",,,," + jobStart.strftime("%Y/%m/%d %H:%M:%S") + "," + jobEnd.strftime("%Y/%m/%d %H:%M:%S") 
    print jobName
    print jobStatus

    for key in sorted(attempts.keys()):
        result = "," + attempts[key].attemptStatus + "," + attempts[key].attemptType + ","
        result += attempts[key].attemptID + "," + attempts[key].attemptStart.strftime("%Y/%m/%d %H:%M:%S") + ","
        result += attempts[key].attemptEnd.strftime("%Y/%m/%d %H:%M:%S")
        print result

    file.close()
    print ""

実行するとこんな感じで出力されます。

job_201210230540_0007,,,,2012/10/24 02:57:22,2012/10/24 03:02:06
Sleep job
SUCCESS
,SUCCESS,MAP,attempt_201210230540_0007_m_000000_0,2012/10/24 02:57:24,2012/10/24 02:59:26
,SUCCESS,CLEANUP,attempt_201210230540_0007_m_000001_0,2012/10/24 03:02:04,2012/10/24 03:02:05
,SUCCESS,SETUP,attempt_201210230540_0007_m_000002_0,2012/10/24 02:57:23,2012/10/24 02:57:24
,SUCCESS,REDUCE,attempt_201210230540_0007_r_000000_0,2012/10/24 02:59:26,2012/10/24 03:02:04

これを貼り付ける用のExcelシートを用意しました。

MapReduceJobChart_template.xlsx 直

A4 セルに、出力結果のテキストを貼り付けて、
テキストファイルウィザードでカンマ区切りを選択して出力すると、
準備完了です。

historyログをまとめて解析したい場合、
以下のようなシェルで対象ファイル全部について解析結果を出力させてもいいでしょう。

#!/bin/sh

for filepath in `find /var/log/hadoop/history/done/ -type f -name "job_*"`
do
    python MapReduceJobChart.py $filepath
done

出力例がこちらです。
Map、Reduceにそれぞれどれくらい時間がかかったか、
Map、Reduceがそれぞれ同時にいくつずつ動いていたか、
FailしていたMapReduce Taskはなかったか、
などが把握しやすくなったのではないでしょうか?

MapReduceJobChart.xlsx 直

JobTrackerサーバにちょうどいいログを持っている方は、
試してみてください。

ちなみに、
ここで可視化した内容は、
halook の MapReduce Job Arrow chart の劣化版です。
なんだかんだで、いちいちスクリプトを実行するのは手間なので、
ブラウザで見えるこっちのほうがいいですね。

スクリプト+Excelにせよ、可視化ツールにせよ、
実行状況の見える化手法をうまく使って、
トラブルの予防・解決に役立てましょう!

それでは。

インターンシップにて、「オープンソースコーディングサマー」を開催

こんばんは、ishidaです。

当社では、夏のインターンシップ
オープンソースコーディングサマー」を開催します。

オープンソースコーディングサマーでは、現在話題となっている
BigDataを操るHadoopを、当社のWGPを用いることで
可視化させてしまおう、という2週間の挑戦ができます。

プログラミングに自信のある学生の皆さん、
ぜひ最先端に挑戦してみてください。


では。

Flume+Hadoopを用いた「分散ログ収集ソリューション」の提供を開始

当社Acroquest Technologyは、Flume+Hadoopを用いた「分散ログ収集ソリューション」の提供を開始しました。

プレスリリースページ
http://www.acroquest.co.jp/company/press/2012/0329p1.html

ログ収集を効率的に行うオープンソース・プロダクト Flume に、
独自の拡張を行うことで、
下記の表のようにログ収集処理のパフォーマンスを約200%に向上させています。

Flumeの信頼性モード Flume(オリジナル) 分散ログ収集基盤(拡張) 比率
エンドツーエンド 2,532 MB/分 6,015 MB/分 238 %
ディスクフェイルオーバー 2,497 MB/分 5,575 MB/分 223 %
ベストエフォート 2,445 MB/分 5,321 MB/分 218 %

サーバ構成はスケールアウト可能であり、
要件に合わせてさらに大量のログを収集することも可能です。

今回、独自の拡張を行った Flume の変更部分は、
オープンソースとして開発元にフィードバックを行う方針で、
本ソリューションとしても無償ライセンスで提供する予定です。

当社では、本ソリューションを利用した、
分散ログ収集システム構築・導入のサポートサービスを提供します。

Hadoopソースコードリーディング第8回に参加しました

Hadoopソースコードリーディング第8回に参加しました。
会場提供、準備等、@hamakenさんを始め、
NTTデータの方々ありがとうございました。

以下、報告です。

(1)『オレオレMultipleInputを作る方法(仮)』
  - @muddydixon さん
(2)『MapRってどうよ? - 実際に使ってみた感触を紹介します』
  - リクルート 中野さん、高林さん、大坪さん

(1)『オレオレMultipleInputを作る方法(仮)』

発表資料が公開されています
http://www.slideshare.net/muddydixon/multipleinput

HadoopのMultipuleInputs を自作して、
Hadoopからmongodbを使えるようにした、という発表です。

HadoopのMultipulInputsは、
MapReduceジョブのinputを柔軟に指定して、

・異なる形式のデータを同一ジョブで扱う
 →Textファイル、Sequenceファイルなど
・ファイルごとに違うMapperを割り当てる
 →「元データはこのMapper」「差分データはこのMapper」など、振り分ける

といったことができるようにするものです。
@muddydixon さんが作ったMongoMultipleInputs によって、
mongodbのデータを入力として扱うことができます。
(分散環境に振り分けられたチャンクデータのローカリティには対応していない)

会の中で最も話題になったのは、
org.apache.hadoop.mapreduce.lib.input.TaggedInputSplitGenerator
がなぜpublicなクラスでないのか?
と言う話でした。
ソースコードリーディング」の名にふさわしいコアな内容だった感じがします。
mongodbをよく知らない私にはきつかった(^^;

参考:TaggedInputSplit が public でなくなった原因のパッチ。
https://issues.apache.org/jira/browse/HADOOP-3853

(2)『MapRってどうよ? - 実際に使ってみた感触を紹介します』

Recruit の検証内容の発表でした。

最初に、@nagixさんから「MapRとは」の説明でした。
内容は、多分ここで私が説明するより、@nagixさんの記事を見てもらった方が早いです。
参考:草薙さんの記事「インサイド MapR (1) 」
http://d.hatena.ne.jp/nagixx/20111216/1324006829

その後は、
高林さんによるHiveを使った性能検証の結果発表、
大坪さん(NSSOL)によるマルチテナント向け機能検証発表、
高林さんによるまとめ(!)
と続きました。

Hiveを使った性能検証の結果

2倍、3倍の性能というが、Hiveを用いた集計で、
 パーティション・圧縮で1.3倍、
 非パーティション・非圧縮非で1.76倍
の性能が出た。そこまでではない?
 

マルチテナント向け機能検証

1. 権限
Jobの実行権限等をつけられるかと思ったら、そうではなかった、とのことでした。
Cluster Permission, Volume Permission はカスタマイズできますが、
Jobの実行権限等を設定しようと思ったら、
Hadoopでやっていたのと同じことをする必要があります。

2. ジョブの同時実行
FairSchedulerが標準で使えます。
Map Slot数だけでなく、Prefetch Slot数の設定も可能。
ジョブの同時実行のパターンを、
最大Slot数、最小Slot数を指定したいくつかのパターンで試し、
うまい具合に割り振られている事を確認。
課金にも重要なこの機能がうまく動いていて、これは使えそう。

3. Load, Job実行
台数増加でうまくスケールしていることを確認。
また、
Master, Slaveを区別しないクラスタ構成(全8ノードにCLDBを乗せる)で、
変わらぬ性能が出ていたため、
Master, Slaveのノードを区別しない、効率的な運用が可能では、という期待。

EMCが3台にCLDBを乗せるパターンしかサポートしないと言っているのが難点か。

4. Direct NFS
MapRの機能で、
クライアント側にNFSマウントすることができます。

そうすると、
・サーバ側にデータを送る前にクライアント側で必要な圧縮が行える
・サーバ側のNICが複数枚ある場合全てを使って通信する事ができる
の2点より、データロードが高速化します。
ただし、
クライアント側に入れる場合もライセンス費用が発生するので注意が必要です。

高林さんによるまとめ

データさんのサポートのあるCDHは安心。
MapRはサポート面が心配。


CDH vs MapR の、
会場の雰囲気をいまいち伝えきれないところが残念です。

Flumeでセキュアな通信を

Hadoopアドベントカレンダーの14日目。
阪本です。

セキュア通信でFlumeを使う方法について書こうと思います。



現行のFlumeには、セキュア通信を行うための仕組みは用意されていません。
完全ローカルで使用するのであれば、それでもよいかもしれませんが、
やはり通信経路のセキュリティは気になるもの。
特に日本ではこのあたり、気にされるんですよねぇ。

そこで、SSHポートフォワード+Flumeによるセキュア通信をやってみました。


まずは、flume configでの設定。

exec unmapAll
exec purgeAll
exec map host1 node1_1
exec map host1 node1_2
exec map host2 node2_1
exec map host2 node2_2
exec map host3 collector1
exec map host3 collector2
exec config node1_1 flow1 'execPeriodic("/usr/local/test/collect1.sh", 60000)'
        'agentSink("localhost", 10001)'
exec config node1_2 flow2 'execPeriodic("/usr/local/test/collect2.sh", 60000)'
        'agentSink("localhost", 10002)'
exec config node2_1 flow1 'execPeriodic("/usr/local/test/collect1.sh", 60000)'
        'agentSink("localhost", 10001)'
exec config node2_2 flow2 'execPeriodic("/usr/local/test/collect2.sh", 60000)'
        'agentSink("localhost", 10002)'
exec config collector1 flow1 'collectorSource(35851)' 'collectorSink("file:///home/test/output1")'
exec config collector2 flow2 'collectorSource(35852)' 'collectorSink("file:///home/test/output2")'

上は、論理ノード4つと論理コレクタ4つを、計3台の物理マシンに割り当てる例。
(見やすいように改行しています。)
collect1/2.shは、転送する内容を標準出力に出力します。
node1_1とnode2_1はcollector1に、node1_2とnode2_2はcollector2にデータを転送します。

ここでポイントとなるのは、ノードのagentSinkで指定するアドレスとポート。
当然ながら、SSHポートフォワードでポートを開けているアドレスとポートを指定しなければなりません。
Agentはlocalhostの10001/10002番ポートへデータを送信し、
Collectorは

そしてsshコマンド。各ノード上で以下を実行。

ssh -L 10001:host3:35851 -L 10002:host3:35852 -L 10003:host3:35872 user@host3

こうすることで、ノードの10001番ポートに来たデータはhost1の35851番ポートへ、
ノードの10002番ポートに来たデータはhost1の35852番ポートへ、
そしてノードの10003番ポートに来たデータはhost1の35872番ポートへ、
SSH経由で転送されることになります。
Agent <-> Master間のHeartbeat通信もSSH経由を忘れずに。
公開鍵認証できるようにしておけば、パスワード要らず。

あとは、Agent側のflume.master.heartbeat.portに10003を設定して、
普通にFlumeを実行するだけ。


プログラムに手を加えることなく、セキュア通信できるようにするのは
SSHの強みですね。

余談

SSHポートフォワードで通信させるときの注意。

sshコマンド1つでポートフォワードできる最大数は100です。
設定などで変えることはできません。
#これにはまりました^^;

100個以上の通信経路を用意する場合は、
以下のように複数個のsshコマンドを実行すれば回避できます。

ssh -L 10001:host3:35851 -L 10002:host3:35852 user@host3
ssh -L 10003:host3:35872 user@host3

それでは。