Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Apache FlinkでCEPを実現 ~パターン指定~

こんにちは、阪本です。

Apache FlinkでCEP(Complex Event Processing)の実現方法について、前回は時刻とグルーピングについて説明しました。
今回は、イベントのパターン定義を見てみましょう。これで、簡単なイベント処理ができるようになります。

パターンを定義する

ここでは、「15秒以内に温度が30℃を2回以上超える」を表すパターンを定義します。
パターンは、CEPクラスのpatternメソッドで次のように定義します。
なお、keyedStreamは、前回作成したKeyedStream<Map<String, Object>, String>型のストリームです。

PatternStream<Map<String, Object>> patternStream = CEP.pattern(keyedStream,
    Pattern.<Map<String, Object>>
         begin("first")
            .where(value -> value.get("temperature") > 30)
        .followedBy("second")
            .where(value -> value.get("temperature") > 30)
        .within(Time.seconds(15)));

条件指定に使用する代表的なメソッドを紹介します。

メソッド 意味
begin(ラベル) 処理条件の開始を表すラベル定義です。ラベルの値は任意です。必ずbeginから始まります。
where(条件式) ラベル定義に紐づくイベントの条件を指定します。
next(ラベル) 1つ前のラベル定義にマッチするイベントのすぐ次のイベントを表すラベル定義です。ラベルの値は任意です。
followedBy(ラベル) 1つ前のラベル定義にマッチするイベントから、そのイベントの後ろに続くイベントを表すラベル定義です。nextと異なり、間に別のイベントが入ることを許します。ラベルの値は任意です。
within(時間) イベントの時間間隔の制約です。

もう少し詳しく見ていきましょう。
beginメソッドで、最初の条件を指定しています。value変数(Map型)のtemperatureの値が30を超えたイベントが来た場合、そのイベントに「first」というラベルを付与します。

begin("first").where(value -> value.get("temperature") > 30)

次に、「first」というラベルが付与されたイベントに続くイベントの中で、value変数(Map型)のtemperatureの値が30を超えたイベントが来た場合、そのイベントに「second」というラベルを付与します。

.followedBy("second").where(value -> value.get("temperature") > 30)

最後に、「first」のイベントと「second」のイベントの発生間隔が15秒以内であることを定義します。

.within(Time.seconds(15)));

この定義では、次のようなイベントがマッチします。

{"timestamp": "2017-05-10T10:00:00Z", "sensorId": 1, "temperature": 29.3}
{"timestamp": "2017-05-10T10:01:00Z", "sensorId": 1, "temperature": 30.1}
{"timestamp": "2017-05-10T10:02:00Z", "sensorId": 1, "temperature": 31.5}
{"timestamp": "2017-05-10T10:03:00Z", "sensorId": 1, "temperature": 29.8}
{"timestamp": "2017-05-10T10:04:00Z", "sensorId": 1, "temperature": 29.7}
{"timestamp": "2017-05-10T10:00:00Z", "sensorId": 1, "temperature": 29.3}
{"timestamp": "2017-05-10T10:01:00Z", "sensorId": 1, "temperature": 30.1}
{"timestamp": "2017-05-10T10:02:00Z", "sensorId": 1, "temperature": 29.5}
{"timestamp": "2017-05-10T10:03:00Z", "sensorId": 1, "temperature": 29.8}
{"timestamp": "2017-05-10T10:04:00Z", "sensorId": 1, "temperature": 30.7}

なお、「first」や「second」といったラベルは、次に説明する、条件にマッチしたイベントを取得する際に使用します。

条件にマッチしたイベントを取得する

先ほど、パターンを定義して、条件にマッチするイベントにラベルを付与しました。
イベントの取得には、ラベルを使用します。
次のコードでは、「second」のラベルが付与されたイベントをストリームとして取得します。

DataStream<Map<String, Object>> alertStream = patternStream.select(pattern -> pattern.get("second"));

取得したイベントは通常のストリームなので、そのままSinkに流し込むことができます。

alertStream.addSink(new ElasticsearchSink<>(elasticsearchConfig, transports,
        new MyElasticsearchSinkFunction<>()));

上記では、ラベルが付与されたイベントはまとめて取得することもできます。

class Alert {
    public final Integer first;
    public final Integer second;

    public Alert(Integer first, Integer second) {
        this.first = first;
        this.second = second;
    }
}

DataStream<Alert> alertStream = patternStream.select(
        pattern -> new Alert(pattern.get("first"), pattern.get("second")));

つなげる

今まで説明してきた内容をつなげたコードは、次の通りです。

// 動作設定
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000L);
env.setParallelism(2);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// Kafkaから受け取った文字列をJSON解析する
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "192.168.0.1");
kafkaProps.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
        "topic", new SimpleStringSchema(), kafkaProps));

// Watermarkを付与する
stream.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

// sensorId毎にグルーピングする
KeyedStream<Map<String, Object>, String> keyedStream = stream.map(FlinkExecutor::parseJson)
                                                          .keyBy(value -> value.get("sensorId"));

// 条件を定義する
PatternStream<Map<String, Object>> patternStream = CEP.pattern(keyedStream,
    Pattern.<Map<String, Object>>
         begin("first")
            .where(value -> value.get("temperature") > 30)
        .followedBy("second")
            .where(value -> value.get("temperature") > 30)
        .within(Time.seconds(15)));

// 条件にマッチするイベントストリームを作成する
DataStream<Map<String, Object>> alertStream = patternStream.select(pattern -> pattern.get("second"));

// イベントを保存する
alertStream.addSink(new ElasticsearchSink<>(elasticsearchConfig, transports,
        new MyElasticsearchSinkFunction<>()));

まとめ

Flinkで簡単なCEPについて見てきました。
当然ではありますが、条件でマッチしたイベントをストリームで取得できるのが便利ですね!
イベントのパターンに指定できる条件は他にもありますので、より複雑なイベントの条件指定が可能です。
ぜひ触ってみてください。

それでは。

Acroquest Technologyでは、キャリア採用を行っています。

  • ビッグデータHadoop/Spark、NoSQL)、データ分析(Elasticsearch、Python関連)、Web開発(SpringCloud/SpringBoot、AngularJS)といった最新のOSSを利用する開発プロジェクトに関わりたい。
  • マイクロサービスDevOpsなどの技術を使ったり、データ分析機械学習などのスキルを活かしたい。
  • 社会貢献性の高いプロジェクトや、顧客の価値を創造するようなプロジェクトで、提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や、対外的な勉強会の開催・参加を通した技術の発信、社内勉強会での技術情報共有により、エンジニアとして成長したい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

Elasticsearchを仕事で使いこみたいデータ分析エンジニア募集中! - Acroquest Technology株式会社のエンジニア中途・インターンシップ・契約・委託の求人 - Wantedlywww.wantedly.com

Apache FlinkでCEPを実現 ~イベント時刻とグルーピング~

こんにちは、阪本です。
前回に引き続き、Apache FlinkでCEP(Complex Event Processing)の実現方法について紹介します。

イベントの時刻は何を基準にするか

CEPをやるからには、イベントの時刻はとっても重要です。
Flinkでは、次の3つのいずれかの基準を、StreamExecutionEnvironmentに対して指定します。

意味
ProcessingTime Flinkがそれぞれの操作でイベントを処理した時刻
EventTime イベントが発生した時刻(別途、どの値を使用するか指定が必要)
IngestionTime Flinkがイベントを受信した時刻

これらはいずれもTimeCharacteristic enumのメンバです。
EventTimeは、Sourceで発生した時刻を基準にすることができるため、現実世界に忠実ではありますが、その分レイテンシは大きくなります。
その分、ProcessingTimeは忠実性に欠けますが、シンプルで高速です。
場合によって使い分けましょう。
今回はProcessingTimeを使うことにします。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

ウィンドウ処理のためのWatermark

イベントの時刻をEventTimeにすると、Flinkが処理するイベントが「イベントの時刻」順になっていない可能性があります。
そうすると、ウィンドウ処理を行う際、どこまで読み込めばウィンドウの区切りを行ってよいかが判別できなくなってしまいます。
その問題を解消するために、FlinkではWatermarkという仕組みを入れています。

f:id:acro-engineer:20170511092838p:plain
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.htmlから引用)

Watermarkをどのように入れるかは、DataStreamのassignTimestampsAndWatermarksメソッドで指定します。
たとえば、Flinkに含まれているIngestionTimeExtractorクラスを指定すると、イベントを処理したマシンの時刻をベースにWatermarkを入れます。

stream.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

ストリームのグルーピング

1つのストリームに複数の種類のイベントが混ざって流れてくるパターン、たとえばIoTならいろんな場所に設置された複数の温度センサーから時々刻々と温度の情報が送られてくる状況において、場所ごとに温度の異常判定を行いたいとなると、温度センサー毎にイベントを分ける必要があります。
このような場合はDataStreamのkeyByを使うと、イベントをグルーピングできます。

たとえば次のようなイベントが1つのストリームに順次流れてくる場合、sensorIdをキーにして、センサー毎の温度のストリームにグルーピングします。

{"timestamp": "2017-05-10T10:00:00Z", "sensorId": 1, "temperature": 20.3}
{"timestamp": "2017-05-10T10:00:00Z", "sensorId": 2, "temperature": 21.0}
{"timestamp": "2017-05-10T10:00:00Z", "sensorId": 3, "temperature": 18.9}
{"timestamp": "2017-05-10T10:01:00Z", "sensorId": 1, "temperature": 20.3}
{"timestamp": "2017-05-10T10:01:00Z", "sensorId": 2, "temperature": 21.1}
{"timestamp": "2017-05-10T10:01:00Z", "sensorId": 3, "temperature": 18.8}
・・・
KeyedStream<Map<String, Object>, String> keyedStream = stream.map(FlinkExecutor::parseJson)
                                                          .keyBy(value -> value.get("sensorId"));

なお、FlinkExecutor::parseJsonは、前回の記事にある、JSON文字列をMapに変換するメソッドです。

グルーピングしたストリームに対してCEPの処理を記述することで、グループ毎に判定が行えるようになります。

続きは次回

ストリームの準備が整ったところで、いよいよCEPの処理に入ります。
長くなりそうなので、今回はいったんこの辺で。続きはこちら。

Acroquest Technologyでは、キャリア採用を行っています。

  • ビッグデータHadoop/Spark、NoSQL)、データ分析(Elasticsearch、Python関連)、Web開発(SpringCloud/SpringBoot、AngularJS)といった最新のOSSを利用する開発プロジェクトに関わりたい。
  • マイクロサービスDevOpsなどの技術を使ったり、データ分析機械学習などのスキルを活かしたい。
  • 社会貢献性の高いプロジェクトや、顧客の価値を創造するようなプロジェクトで、提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や、対外的な勉強会の開催・参加を通した技術の発信、社内勉強会での技術情報共有により、エンジニアとして成長したい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

Elasticsearchを仕事で使いこみたいデータ分析エンジニア募集中! - Acroquest Technology株式会社のエンジニア中途・インターンシップ・契約・委託の求人 - Wantedlywww.wantedly.com

X-Pack Machine Learningを試してみました

みなさんこんにちは!
@です。
本日、待望のX-Pack Machine Learningがリリースされました。

X-Pack Machine Learning

X-Pack Machine Learningとは

X-Pack Machine LearningはElastic Stackで時系列の異常検知を行える機械学習の製品です。
特徴として、教師なし学習による異常検知モデルの生成や周期、トレンドの異常検知を行えます。
そして、2017年のElastic{ON}での注目機能の1つでもあります。

www.elastic.co

X-Pack Machine Learningは以前、Prelertとしてリリースされており、
弊社ブログでも説明させていただきました。

acro-engineer.hatenablog.com

早速、X-Pack Machine Learningを試してみます!

インストール

X-Packをインストールする手順と同じです。
そのため、公式のX-Packのインストール方法を確認し、インストールしてください。

www.elastic.co

実際にやってみた

解析準備

解析対象となるデータセットがX-Pack Machine Learningを試すために必要です。
今回は過去に弊社ブログで取り上げたfarequoteのサンプルを使います。
データの投入方法や形式は次のページを参考にしてください。

acro-engineer.hatenablog.com

X-Pack Machine Learningを利用して、2種類のJobを作成します。
1つ目がSingle Metric Job、2つ目がMulti Metric Jobです。

Single Metric Job

Single Metric Jobは1つのメトリックに対して、異常検知を行います。
今回はアクセス数全体のカウントを対象に異常検知を行います。

X-Pack Machine Learningでは、はじめにJobの種類を選択します。
本項目では、「Create a single metric job」を使います。
f:id:acro-engineer:20170505081718p:plain

次に解析対象となるElasticsearchのindexを設定します。indexは「farequote」を選択します。

f:id:acro-engineer:20170505100124p:plain

次の画面でSingle Metric Jobの設定を行います。
Aggregationをcount, Bucket spanを30mにします。
また、データの可視化を設定項目右のボタンを押すとできます。
この機能によって予め、解析対象がどのようなデータかを確認できます。

最後に、「Create Job」をクリックすると、Jobの生成を行います。

f:id:acro-engineer:20170505101745p:plain

Jobの生成が終わると、次の画面になります。
異常として検知された箇所に黄色い棒がグラフ内に表示されています。

f:id:acro-engineer:20170505101732p:plain

最後に解析結果を確認しましょう。View Resultsをクリックすると
次のような画面になります。
時系列グラフを確認しつつ、異常箇所を見られます。
今までできなかった嬉しい機能の1つです。また、正常と判定する領域も確認できます。

f:id:acro-engineer:20170505101639p:plain

Multi Metric Job

Multi Metric Jobはフィールドごとに異常検知を行います。
Single Metric Jobの構築でアクセス数の異常検知に成功しました。
しかし、本データに使われているairline(航空会社)全てが異常といえるのでしょうか?
特定のairlineに異常が発生しているかどうかをMulti Metric Jobで判定できます。

さて、早速試してみましょう。

今回はSingle Metric Jobを選択した画面で「Create a multi metric job」を選択しましょう。
まず、最初に次の画面が表示されます。この画面でJobの設定を行います。

f:id:acro-engineer:20170505081718p:plain

Bucket spanを30m、Key Fieldsをairline.keyword、Job Detailsに名前や説明を書きます。
Key Fieldsで設定した属性をベースに分割し、各々のデータで異常検知します。
必要な設定を行った例は次のとおりです。設定が完了し次第、Create Jobを実行しましょう。

f:id:acro-engineer:20170505081732p:plain

実行後、Anomaly Explorer画面に遷移できます。
どの航空会社のリクエスト数が増加しているかをAnomaly Explorerを使って、確認できます。

また、モデルが異常と判定した赤い四角をクリックすると、
クリックした対象のデータを表示できます。
そして、表示された時系列グラフからも急激にアクセス数が増加した様子を確認できます。

f:id:acro-engineer:20170505081743p:plain

おまけ

Elastic Stack5.4で実装されたVisual Builderを使って
データを可視化しました。これまでのVisualizeよりリッチに感じます。

f:id:acro-engineer:20170505082221p:plain

PrelertとX-Pack Machine Learningの違い

私が試してみて感じたX-Pack Machine LearningとPrelertの違いは以下、3点です。

より簡単に設定ができる

Prelertも簡単に異常検知の設定ができました。
しかし、X-Pack Machine Learningは更に簡単です。
X-Pack Machine Learningは利用ケース別(Single Metricなど)で
必要最低限の項目を設定を行えば使えます。

これまでに必要だったElasticsearchのアクセス先や
Prelertに見られた上級者向けの設定を記述・確認する必要がありません。

解析途中や解析後に生データを確認できる

X-Pack Machine Learningでは、Jobの設定途中に解析データを確認できます。
Prelertでは、モデルの解析途中や解析後にデータを確認する場合、
Kibanaなどのソフトを使ってデータ確認する必要がありました。

データを逐一、確認できることにより、
どのようなデータを解析しようとしているか確認し、適切な設定へ変更できます。

モデルが可視化できる

X-Pack Machine Learningはモデルを可視化できます。
X-Pack Machine Learningでは、解析している領域を可視化できます。
この可視化は、Prelert時にはindexを解析し、Timelionを利用して表示しなければなりませんでした。

Prelertのベータ版でMachine Learningを体験する | Elastic

X-Pack Machine Learningでは、モデルがどうデータを解析しているかを
標準で見えるようになっています。
そのため、本来ここはこうあるべきだったが、こうなったから異常と判定されたことを確認できます。

最後に

Prelertよりも非常に簡単に異常検知ができました!
また、生データを解析時に見れるといった痒い部分にも手が届いており、
非常に期待できる機能となっています!

Acroquest Technologyでは、キャリア採用を行っています。

  • ビッグデータHadoop/Spark、NoSQL)、データ分析(Elasticsearch、Python関連)、Web開発(SpringCloud/SpringBoot、AngularJS)といった最新のOSSを利用する開発プロジェクトに関わりたい。
  • マイクロサービスDevOpsなどの技術を使ったり、データ分析機械学習などのスキルを活かしたい。
  • 社会貢献性の高いプロジェクトや、顧客の価値を創造するようなプロジェクトで、提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や、対外的な勉強会の開催・参加を通した技術の発信、社内勉強会での技術情報共有により、エンジニアとして成長したい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

Elasticsearchを仕事で使いこみたいデータ分析エンジニア募集中! - Acroquest Technology株式会社のエンジニア中途・インターンシップ・契約・委託の求人 - Wantedlywww.wantedly.com

#Java本格入門 ついに発売! 日本よ、これが #アクロ本 だ!

最近まで寒い寒いと思っていたら急に暑くなってきましたが、お風邪など召さず健やかにお過ごしでしょうか。
ただ季節の話をしているだけの @ です☺️


さて、
Acroquestの同僚たちと執筆したJava本格入門が、ついに、ついに発売されました!!
gihyo.jp
電子書籍も同日発売で、Amazon Kindle楽天Koboのほか、GihyoのサイトでPDF/EPUB版を購入することもできます。

また、正誤表などを掲載しているサポートサイトも用意しました。
github.com
issueはどなたでも作成いただけるようしてあります。・・・既に柴田先生からのありがたいissueが!😇


またこの本は、入門書の次に読む「2冊目」を目指して書いたものであり、編集者の傳さんも書かれている通り、裏テーマが「35歳からのJava再入門」だったりします。


たとえば、
1〜2年ぐらいJavaを書いてきたので、もう少し次のことを学ぶとか。
Javaの勉強が少し古いバージョンまでで止まってしまっているので、改めて学び直すとか。
概要を知ってはいるけど、知ったかぶりになってしまっているところを補強するとか。
そういうような所で使ってもらえる本にしたい、という想いで執筆を進めました。


・・・なんていう綺麗な話はさておき、裏話を少しだけ。

実は、この本が企画されたのは、Java8がリリースされるよりも前でした。
当時は、これからJava8が出るにもかかわらず、世の中にはまだまだJava7の使い方が広まっていないと感じており、そういう想いから from old Java to modern Java なんてスライドを作って発表したりもしました。
このスライドが編集者である傳さんの目に留まったのが、この本のきっかけでした。

Java8のリリースより前から書き始めて、発売になったのがJava9リリースを数ヶ月後に控えた今というのはもう、多方面の皆様にご迷惑をおかけしました感が凄いわけですが。。。


そんなわけで苦しんで生まれた一冊ですが、もし本屋などで見かけた際には、ぜひ手に取っていただいて、そのままレジに持っていただいて、会計待ちの間にスマホAmazonから配る用にもう一冊買っていただいて、おまけに便利な電子書籍版も購入いただければ、幸いです。


それでは!
See you!

Elastic Stack5.3の特に魅力的な新機能を紹介します! #elasticsearch

こんにちは、@です。
Elastic{ON} 2017から戻ってきてから初投稿になります。

先日、Elastic Stack 5.3がリリースされ、既に公式による日本語リリースノートも出ています。
Elastic StackにどんなアップグレードがあったのかはElastic Stack 5.3.0リリースを参考にすると良いと思います。

私はElastic Stack 5.3のKibanaとBeatsの機能を特に魅力的に感じたのでご紹介します。

Kibana

Top hits Aggregation

KibanaのVisualizeでTop hits Aggregationを使えるようになりました。

Top hits Aggregationはキーを元に並び替えを行った場合の上位、
もしくは下位のデータの取得が可能なAggregationです。
Top hits Aggregationの詳細は「Top hits Aggregation」を参照してください。

この機能によってバージョン5.3以前でKibana上で可視化できなかった
Top X番やWorst X番の可視化ができます。
例えば、@timestampをキーにソートすると、常に最新の情報をKibanaで可視化できます。

f:id:acro-engineer:20170329231934p:plain:w700

画面の改善

KibanaのUIが改善されています。
Kibanaで現在、見ている時間の前後を見たくなったことありませんか?
Kibana上で簡単に実現するために、時間を進めるボタンと戻るボタンがTimepickerに追加されています。

これまで、右上にある時間を選択し、更にQuickやAbsoluteから時間を選択して見たい時間を選んでいました。
今後はTimepickerにあるボタンを押すだけで、少し時間のずれているデータを確認できます。
今の時間から少しずれている時間のデータを見たい!と思った時にすぐに探せる便利な機能ですね。

f:id:acro-engineer:20170331085703j:plain

Beats

Filebeat Modules

Elastic{ON}のKeynoteでリリース予告があった
Filebeat modulesがリリースされました。

Filebeat modulesを使えば、より簡単にFilebeatを使えるようになります。
Metricbeatには、情報を取得する機能のみならず、
KibanaのDashboardをも自動的に生成する機能がありました。

しかし、Dashboardを生成する機能はFilebeatになかったため、Dashboardを作る必要がありました。

この機能はクラスメソッドさんのブログが詳しいです。

dev.classmethod.jp

最後に

バージョン5.3ではElastic{ON}で
リリース告知されていたFilebeat modulesが出ました!

Elastic Stack 5.3では、Elastic{ON}でも感じた製品の使いやすさを意識した
アップグレードがされていると感じています。

バージョン5.4ではいよいよTime Series Visual Builderと
X-Pack Machine Learningのリリースが予定されています。
私にとってどちらも期待が高い機能なので、楽しみにしています!

Acroquest Technologyでは、キャリア採用を行っています。

  • ビッグデータHadoop/Spark、NoSQL)、データ分析(Elasticsearch、Python関連)、Web開発(SpringCloud/SpringBoot、AngularJS)といった最新のOSSを利用する開発プロジェクトに関わりたい。
  • マイクロサービスDevOpsなどの技術を使ったり、データ分析機械学習などのスキルを活かしたい。
  • 社会貢献性の高いプロジェクトや、顧客の価値を創造するようなプロジェクトで、提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や、対外的な勉強会の開催・参加を通した技術の発信、社内勉強会での技術情報共有により、エンジニアとして成長したい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

Elasticsearchを仕事で使いこみたいデータ分析エンジニア募集中! - Acroquest Technology株式会社のエンジニア中途・インターンシップ・契約・委託の求人 - Wantedlywww.wantedly.com

Apache Flink の耐障害性はどんなもの? ~Exactly once の振る舞い~

こんにちは、阪本です。

IoTやらデータ分析やらに関わる機会が増えて、大量のストリームデータに埋もれる(もとい、処理する^^;)ことが増えてきました。
しかも、関わっている業務的に、大概において、データ欠損は許されないという。。

そんなストリームデータを、高速かつ障害に強いアーキテクチャで処理しれくれるのが、Apache Flinkです。
アーキテクチャはこんな感じになっています↓
Apache Flink 1.2.0 Documentation: Distributed Runtime Environment

Flinkの動きをざっくり書くと、

  • JobManagerはジョブを受け付け、1つ以上のTaskManagerに処理の実行を指示する。
  • JobManagerは処理中、各TaskManagerにチェックポイントトリガーを発行する。
  • TaskManagerはチェックポイントトリガーを受信すると、状態のスナップショットを保存し、JobManagerにAckを返す。
  • 障害が発生すると、チェックポイントに戻り、再度実行を行う。

となっていて、このチェックポイントの仕組みが、耐障害性を高めています。

でも、障害に強いと言っても、実際に障害が発生したらどのような挙動になるの?
Exactly onceと書いてあるけど、本当にそうなの?
そんな疑問を解消すべく、こんな条件で試してみました。

f:id:acro-engineer:20170227171700p:plain

2台のKafka Brokerを立て、2台のFlink TaskManagerがConsumerとして並列処理でデータを取得し、Elasticsearchに書き込みます。
動作中に、TaskManagerのうちの1台を強制停止したらどうなるか?をやってみました。

まずは何も気にせずやってみる

Flinkアプリケーションのコードは次のような感じです。文字列をJSONとして解析し、Mapに変換してElasticsearchに流しています。

public class FlinkExecutor {
    public static void main(String... args) {
        new FlinkExecutor().start();
    }

    public void start() {
        // 5秒おきにCheckPointを発行、並行処理数を2とする
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000L);
        env.setParallelism(2);

        // Kafkaから受け取った文字列をJSON解析する
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "192.168.0.1");
        kafkaProps.setProperty("group.id", "test");
        DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
                "topic", new SimpleStringSchema(), kafkaProps));
        SingleOutputStreamOperator<Map<String, Object>> outputStream = stream
                .map(FlinkExecutor::parseJson);

        // Elasticsearchに書き込む
        Map<String, String> elasticsearchConfig = new HashMap<>();
        elasticsearchConfig.put("bulk.flush.max.actions", "1");
        elasticsearchConfig.put("cluster.name", "elasticsearch");
        List<InetSocketAddress> transports = Arrays.asList(new InetSocketAddress("192.168.0.4", 9300));
        outputStream.addSink(new ElasticsearchSink<>(elasticsearchConfig, transports,
                new MyElasticsearchSinkFunction<>()));

        env.execute()
    }

    @SuppressWarnings("unchecked")
    private static Map<String, Object> parseJson(String arg) {
        try {
            return mapper.readValue(arg, Map.class);
        } catch (IOException ex) {
            ex.printStackTrace();
            return null;
        }
    }

    static class MyElasticsearchSinkFunction<T extends Map<String, Object>> implements ElasticsearchSinkFunction<T> {
        private static final long serialVersionUID = -3596543908572672509L;

        @Override
        public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
            IndexRequest request = Requests.indexRequest()
                    .index("flink-sample-index").type("sample-type").source(element);
            indexer.add(request);
        }
    }
}

Kafkaに対して20000件のデータを連続して送信中に、2つあるFlink TaskManagerの1つをkillしたところ、TaskManagerと通信できなくなった旨のメッセージがJobManagerのログに出ていました。

2017-03-15 18:56:50,648 WARN  flink-akka.actor.default-dispatcher-3 akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@flink-taskmanager1:36911] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@flink-taskmanager1:36911]] Caused by: [ホストへの経路がありません

何回か上記のWARNログが出た後、タスクとジョブの状態が「FAILED」となりました。

2017-03-15 18:57:33,615 WARN  flink-akka.actor.default-dispatcher-15 akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@flink-taskmanager1:36911]
2017-03-15 18:57:33,620 INFO  flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.jobmanager.JobManager                - Task manager akka.tcp://flink@flink-taskmanager1:36911/user/taskmanager terminated.
2017-03-15 18:57:33,621 INFO  flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Map -> Filter -> Sink: Unnamed (2/2) (103335fecb10487207ede8049ef5771c) switched from RUNNING to FAILED.java.lang.Exception: TaskManager was lost/killed: ResourceID{resourceId='953329bfa928e188c95f623efe90bd2f'} @ flink-taskmanager1 (dataPort=43427)
...
2017-03-15 18:57:33,626 INFO  flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state RUNNING to FAILING.

ジョブが失敗しても、生きているTaskManagerが残っていれば、自動的にジョブが再開されました(状態がRUNNINGになっています)。

2017-03-15 18:57:33,692 INFO  flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state FAILING to RESTARTING
...
2017-03-15 18:57:43,694 INFO  jobmanager-future-1-thread-1 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state RESTARTING to CREATED.
...
2017-03-15 18:57:43,700 INFO  jobmanager-future-1-thread-1 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state CREATED to RUNNING

ジョブはめでたく再開されました。が、Elasticsearchには、欠損はなかったものの、一部重複するレコードが格納されてしまいました。。
それはある意味自明で、最後のチェックポイント(次の図のチェックポイント2)以降の状態は失われ、再実行されるからです。
f:id:acro-engineer:20170317003026p:plain

Elasticsearchは、トランザクションやスナップショットの仕組みがないため、重複して実行された部分はレコードも重複してしまうことになります。

重複しないようにする

じゃあ、どうすれば重複せずにレコードをElasticsearchに書き込めるでしょうか?
答えは、冪等性を保つような書き込み方にしてあげればOKです。

MyElasticsearchSinkFunctionクラスでElasticsearchに書き込むデータを作っていましたが、
ここにidを指定して、同じデータは同じ一意のidになるようにすることで、重複した(同じidの)レコードがElasticsearchに格納されなくなります。
(厳密には、こっそり見えないところに保存されているようですが。)

@Override
public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
    // timestampとcategoryの組み合わせで一意となる場合
    String id = element.get("timestamp") + "_" + element.get("category");
    IndexRequest request = Requests.indexRequest()
            .index("flink-sample-index").type("sample-type").id(id).source(element);
    indexer.add(request);
}

今度こそめでたく、重複せずにElasticsearchに格納することができました。

まとめ

Exactly onceといっても、挙動はSink(実はSourceも)の特性に影響されます。
ほとんどのSinkは冪等性を持たず、ゆえにExactly onceではないので、上記のように自分で工夫することが必要です。

とはいいつつ、障害が発生しても途中から自動で勝手にやり直してくれるのは、便利ですね!

今回は単純に各イベントをElasticsearchに保存する処理について調べてみました。
次回は、複数イベント間の関係を考慮して処理するCEP(Complex Event Processing)について調べてみます。

ではでは。

Acroquest Technologyでは、キャリア採用を行っています。

  • ビッグデータHadoop/Spark、NoSQL)、データ分析(Elasticsearch、Python関連)、Web開発(SpringCloud/SpringBoot、AngularJS)といった最新のOSSを利用する開発プロジェクトに関わりたい。
  • マイクロサービスDevOpsなどの技術を使ったり、データ分析機械学習などのスキルを活かしたい。
  • 社会貢献性の高いプロジェクトや、顧客の価値を創造するようなプロジェクトで、提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や、対外的な勉強会の開催・参加を通した技術の発信、社内勉強会での技術情報共有により、エンジニアとして成長したい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

Elasticsearchを仕事で使いこみたいデータ分析エンジニア募集中! - Acroquest Technology株式会社のエンジニア中途・インターンシップ・契約・委託の求人 - Wantedlywww.wantedly.com

Elastic{ON} 2017 3日目 | Closing Keynote:Elasticsearch活用で、よりよい世界を作る #elasticon

Elastic{ON}2017 レポートのまとめはこちら!!

こんにちは、nakaです!

Opening Keynoteでの興奮から始まったElastic{ON}も
最終日となってしまいました。
楽しい時間はあっという間に過ぎてしまいますね!

私からは、最後のセッション
「Closing Keynote」セッションについて
フィードバックします。
f:id:acro-engineer:20170310154311j:plain:w800

Closing Keynoteセッションは他のセッション以上に、
メッセージ性の強いセッションでした。

言葉で伝えるのが難しいのですが、私が受け取ったメッセージは、
「世界で起きている社会問題に対し、
Elasticsearchを適用することで、今までと違う方法で改善アプローチができ、
世界をよりよくできる大きな可能性がある」
というものです。


なかなかこれだけでは伝わらないですよね。。
以下詳細をレポートします。

「Cause Award」受賞3社より、プレゼン

このセッションでは、「Cause Award」を受賞した3社それぞれが
自社の取り組みついてと、Elasticsearchを
どう活用しているのかのプレゼンをしてもらいました。

安全保障の課題をElasticsearchで改善する:IST Research

IST Researchはアフガンなどの危険地域の安全保障の課題に取り組む会社です。

f:id:acro-engineer:20170310075026j:plain:w500

特にPULSEというプラットフォームを提供しており、
これは、世界でまだ日の目が当たっていない情報に対し、
収集、分析、可視化するシステムです。
その情報プラットフォームの基盤にElasticsearchを用いています。
f:id:acro-engineer:20170310155510j:plain:w500

現地の人への取材などから、TwitterなどのSNSまで
様々な情報を一か所のElasticsearchデータセンタに集約し、
分析可能にしているとのことでした。

校内暴力やいじめをElasticsearchを用いて減らす:NoSchoolViolence.com

2社目は、NoSchoolViolence.com社からのプレゼン。

校内暴力やいじめに対し、実際に起きる前に防ぐこと、
また現状起きてしまっている暴力やいじめの数を
減らすための活動を行っている会社です。
f:id:acro-engineer:20170310155846j:plain:w500

Lanternというソフトウェアを開発し、課題に取り組んでいます。
f:id:acro-engineer:20170310160348j:plain:w500

Lanternはどのような行動をする人が、
犯罪となる行動をしやすいのかを明らかにする
検索プラットフォームです。

デモでは、以下左の写真にある検索窓に、
「レザージャケットを着ている」
「銃を持っている」
「彼女に振られた」
「家族が借金をしている」
という内容を以下の検索窓に入れて検索すると、
左の写真のように、「weapons use」のRisk scoreがhighであると出てきます。

f:id:acro-engineer:20170310173310p:plain:w300f:id:acro-engineer:20170310160633j:plain:w300


これを使うことで、
あらかじめ犯罪につながる行動を減らすよう教育し、
事前に犯罪を減らすことに取り組んでいます。

Elasticsearch活用で、エボラ出血熱撲滅:eHealth Africa

3社目はeHealth Africa社から、
エボラ出血熱の用の緊急時コールセンタを、
Elasticsearchを用いて実現したというプレゼンです。
f:id:acro-engineer:20170310082938j:plain:w500

以下がKibanaプラグインとして開発されている、
コールセンタの画面です。
f:id:acro-engineer:20170310161804p:plain:w500

この仕組みを用いることで、
500万件以上の電話に対し、対処することができています。


どれも、世界が抱える社会的問題に対し、
Elasticsearchという先端技術を用いて、
今まで解決できなかった問題を解決に導く取り組みでした。

この3社のプレゼンがあったので、
「世界で起きている社会問題に対し、
Elasticsearchを適用することで、今までと違う方法で改善アプローチができ、
世界をよりよくできる大きな可能性がある」
と感じたのですよね。

Elasticsearchの世界を変える力はすごいなと素直に思いました。

最後に

Closing Keynoteの最後には、
Steven氏、Uri氏、Simon氏へのフリー質問タイム。
※実はCTOのShay Banon氏は事情があり参加できませんでした。

f:id:acro-engineer:20170310164803j:plain:w500

質問は、各自が好きな機能や、質問が飛び交いました。

ただこれを見て感じたのは、
ShayBanon氏がカンファレンスにいられなくなった時に、
当然のように、周りがサポートし、
Elstic{ON}を成功に導いていたことです。

Elastic{ON}でのセッションの一つで、
リモート開発をしているのではない、
分散開発をしているんだというメッセージを
発していたセッションがありました。

その通りで、それぞれが考え、行動し、
お互いをサポートして分散して開発する文化があるからこそ、
成し遂げられたのではないかと思いました。

f:id:acro-engineer:20170310165803j:plain:w500


この3日間、生でElastic{ON}に触れられたことで、
新しい技術や機能はもちろん、
Elastic社の文化や、Elasticsearchの熱量を感じることができました。

ENdoSnipeはElasticsearchをベースにしているため、
Elasticsearchが進化する分、ENdoSnipeも進化させていきます。
今回得たヒントで進化するENdoSnipeを見せられるのが楽しみです

ぜひまた来年も来たいですね。
よりElastic{ON}を面白くするために、
日本でElasticsearchを盛り上げねば!頑張ります。
See you again!
f:id:acro-engineer:20170310165159j:plain:w500


Elastic{ON}2017 レポートのまとめはこちら!!

Acroquest Technologyでは、キャリア採用を行っています。

  • ビッグデータHadoop/Spark、NoSQL)、データ分析(Elasticsearch、Python関連)、Web開発(SpringCloud/SpringBoot、AngularJS)といった最新のOSSを利用する開発プロジェクトに関わりたい。
  • マイクロサービスDevOpsなどの技術を使ったり、データ分析機械学習などのスキルを活かしたい。
  • 社会貢献性の高いプロジェクトや、顧客の価値を創造するようなプロジェクトで、提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や、対外的な勉強会の開催・参加を通した技術の発信、社内勉強会での技術情報共有により、エンジニアとして成長したい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

Elasticsearchを仕事で使いこみたいデータ分析エンジニア募集中! - Acroquest Technology株式会社のエンジニア中途・インターンシップ・契約・委託の求人 - Wantedlywww.wantedly.com