読者です 読者をやめる 読者になる 読者になる

Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Apache FlinkでCEPを実現 ~イベント時刻とグルーピング~

こんにちは、阪本です。
前回に引き続き、Apache FlinkでCEP(Complex Event Processing)の実現方法について紹介します。

イベントの時刻は何を基準にするか

CEPをやるからには、イベントの時刻はとっても重要です。
Flinkでは、次の3つのいずれかの基準を、StreamExecutionEnvironmentに対して指定します。

意味
ProcessingTime Flinkがそれぞれの操作でイベントを処理した時刻
EventTime イベントが発生した時刻(別途、どの値を使用するか指定が必要)
IngestionTime Flinkがイベントを受信した時刻

これらはいずれもTimeCharacteristic enumのメンバです。
EventTimeは、Sourceで発生した時刻を基準にすることができるため、現実世界に忠実ではありますが、その分レイテンシは大きくなります。
その分、ProcessingTimeは忠実性に欠けますが、シンプルで高速です。
場合によって使い分けましょう。
今回はProcessingTimeを使うことにします。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

ウィンドウ処理のためのWatermark

イベントの時刻をEventTimeにすると、Flinkが処理するイベントが「イベントの時刻」順になっていない可能性があります。
そうすると、ウィンドウ処理を行う際、どこまで読み込めばウィンドウの区切りを行ってよいかが判別できなくなってしまいます。
その問題を解消するために、FlinkではWatermarkという仕組みを入れています。

f:id:acro-engineer:20170511092838p:plain
https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/event_time.htmlから引用)

Watermarkをどのように入れるかは、DataStreamのassignTimestampsAndWatermarksメソッドで指定します。
たとえば、Flinkに含まれているIngestionTimeExtractorクラスを指定すると、イベントを処理したマシンの時刻をベースにWatermarkを入れます。

stream.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

ストリームのグルーピング

1つのストリームに複数の種類のイベントが混ざって流れてくるパターン、たとえばIoTならいろんな場所に設置された複数の温度センサーから時々刻々と温度の情報が送られてくる状況において、場所ごとに温度の異常判定を行いたいとなると、温度センサー毎にイベントを分ける必要があります。
このような場合はDataStreamのkeyByを使うと、イベントをグルーピングできます。

たとえば次のようなイベントが1つのストリームに順次流れてくる場合、sensorIdをキーにして、センサー毎の温度のストリームにグルーピングします。

{"timestamp": "2017-05-10T10:00:00Z", "sensorId": 1, "temperature": 20.3}
{"timestamp": "2017-05-10T10:00:00Z", "sensorId": 2, "temperature": 21.0}
{"timestamp": "2017-05-10T10:00:00Z", "sensorId": 3, "temperature": 18.9}
{"timestamp": "2017-05-10T10:01:00Z", "sensorId": 1, "temperature": 20.3}
{"timestamp": "2017-05-10T10:01:00Z", "sensorId": 2, "temperature": 21.1}
{"timestamp": "2017-05-10T10:01:00Z", "sensorId": 3, "temperature": 18.8}
・・・
KeyedStream<Map<String, Object>, String> keyedStream = stream.map(FlinkExecutor::parseJson)
                                                          .keyBy(value -> value.get("sensorId"));

なお、FlinkExecutor::parseJsonは、前回の記事にある、JSON文字列をMapに変換するメソッドです。

グルーピングしたストリームに対してCEPの処理を記述することで、グループ毎に判定が行えるようになります。

続きは次回

ストリームの準備が整ったところで、いよいよCEPの処理に入ります。
長くなりそうなので、今回はいったんこの辺で。

Acroquest Technologyでは、キャリア採用を行っています。

  • ビッグデータHadoop/Spark、NoSQL)、データ分析(Elasticsearch、Python関連)、Web開発(SpringCloud/SpringBoot、AngularJS)といった最新のOSSを利用する開発プロジェクトに関わりたい。
  • マイクロサービスDevOpsなどの技術を使ったり、データ分析機械学習などのスキルを活かしたい。
  • 社会貢献性の高いプロジェクトや、顧客の価値を創造するようなプロジェクトで、提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や、対外的な勉強会の開催・参加を通した技術の発信、社内勉強会での技術情報共有により、エンジニアとして成長したい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

Elasticsearchを仕事で使いこみたいデータ分析エンジニア募集中! - Acroquest Technology株式会社のエンジニア中途・インターンシップ・契約・委託の求人 - Wantedlywww.wantedly.com