Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Apache FlinkでCEPを実現 ~パターン指定~

こんにちは、阪本です。

Apache FlinkでCEP(Complex Event Processing)の実現方法について、前回は時刻とグルーピングについて説明しました。
今回は、イベントのパターン定義を見てみましょう。これで、簡単なイベント処理ができるようになります。

パターンを定義する

ここでは、「15秒以内に温度が30℃を2回以上超える」を表すパターンを定義します。
パターンは、CEPクラスのpatternメソッドで次のように定義します。
なお、keyedStreamは、前回作成したKeyedStream<Map<String, Object>, String>型のストリームです。

PatternStream<Map<String, Object>> patternStream = CEP.pattern(keyedStream,
    Pattern.<Map<String, Object>>
         begin("first")
            .where(value -> value.get("temperature") > 30)
        .followedBy("second")
            .where(value -> value.get("temperature") > 30)
        .within(Time.seconds(15)));

条件指定に使用する代表的なメソッドを紹介します。

メソッド 意味
begin(ラベル) 処理条件の開始を表すラベル定義です。ラベルの値は任意です。必ずbeginから始まります。
where(条件式) ラベル定義に紐づくイベントの条件を指定します。
next(ラベル) 1つ前のラベル定義にマッチするイベントのすぐ次のイベントを表すラベル定義です。ラベルの値は任意です。
followedBy(ラベル) 1つ前のラベル定義にマッチするイベントから、そのイベントの後ろに続くイベントを表すラベル定義です。nextと異なり、間に別のイベントが入ることを許します。ラベルの値は任意です。
within(時間) イベントの時間間隔の制約です。

もう少し詳しく見ていきましょう。
beginメソッドで、最初の条件を指定しています。value変数(Map型)のtemperatureの値が30を超えたイベントが来た場合、そのイベントに「first」というラベルを付与します。

begin("first").where(value -> value.get("temperature") > 30)

次に、「first」というラベルが付与されたイベントに続くイベントの中で、value変数(Map型)のtemperatureの値が30を超えたイベントが来た場合、そのイベントに「second」というラベルを付与します。

.followedBy("second").where(value -> value.get("temperature") > 30)

最後に、「first」のイベントと「second」のイベントの発生間隔が15秒以内であることを定義します。

.within(Time.seconds(15)));

この定義では、次のようなイベントがマッチします。

{"timestamp": "2017-05-10T10:00:00Z", "sensorId": 1, "temperature": 29.3}
{"timestamp": "2017-05-10T10:01:00Z", "sensorId": 1, "temperature": 30.1}
{"timestamp": "2017-05-10T10:02:00Z", "sensorId": 1, "temperature": 31.5}
{"timestamp": "2017-05-10T10:03:00Z", "sensorId": 1, "temperature": 29.8}
{"timestamp": "2017-05-10T10:04:00Z", "sensorId": 1, "temperature": 29.7}
{"timestamp": "2017-05-10T10:00:00Z", "sensorId": 1, "temperature": 29.3}
{"timestamp": "2017-05-10T10:01:00Z", "sensorId": 1, "temperature": 30.1}
{"timestamp": "2017-05-10T10:02:00Z", "sensorId": 1, "temperature": 29.5}
{"timestamp": "2017-05-10T10:03:00Z", "sensorId": 1, "temperature": 29.8}
{"timestamp": "2017-05-10T10:04:00Z", "sensorId": 1, "temperature": 30.7}

なお、「first」や「second」といったラベルは、次に説明する、条件にマッチしたイベントを取得する際に使用します。

条件にマッチしたイベントを取得する

先ほど、パターンを定義して、条件にマッチするイベントにラベルを付与しました。
イベントの取得には、ラベルを使用します。
次のコードでは、「second」のラベルが付与されたイベントをストリームとして取得します。

DataStream<Map<String, Object>> alertStream = patternStream.select(pattern -> pattern.get("second"));

取得したイベントは通常のストリームなので、そのままSinkに流し込むことができます。

alertStream.addSink(new ElasticsearchSink<>(elasticsearchConfig, transports,
        new MyElasticsearchSinkFunction<>()));

上記では、ラベルが付与されたイベントはまとめて取得することもできます。

class Alert {
    public final Integer first;
    public final Integer second;

    public Alert(Integer first, Integer second) {
        this.first = first;
        this.second = second;
    }
}

DataStream<Alert> alertStream = patternStream.select(
        pattern -> new Alert(pattern.get("first"), pattern.get("second")));

つなげる

今まで説明してきた内容をつなげたコードは、次の通りです。

// 動作設定
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000L);
env.setParallelism(2);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

// Kafkaから受け取った文字列をJSON解析する
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "192.168.0.1");
kafkaProps.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
        "topic", new SimpleStringSchema(), kafkaProps));

// Watermarkを付与する
stream.assignTimestampsAndWatermarks(new IngestionTimeExtractor<>());

// sensorId毎にグルーピングする
KeyedStream<Map<String, Object>, String> keyedStream = stream.map(FlinkExecutor::parseJson)
                                                          .keyBy(value -> value.get("sensorId"));

// 条件を定義する
PatternStream<Map<String, Object>> patternStream = CEP.pattern(keyedStream,
    Pattern.<Map<String, Object>>
         begin("first")
            .where(value -> value.get("temperature") > 30)
        .followedBy("second")
            .where(value -> value.get("temperature") > 30)
        .within(Time.seconds(15)));

// 条件にマッチするイベントストリームを作成する
DataStream<Map<String, Object>> alertStream = patternStream.select(pattern -> pattern.get("second"));

// イベントを保存する
alertStream.addSink(new ElasticsearchSink<>(elasticsearchConfig, transports,
        new MyElasticsearchSinkFunction<>()));

まとめ

Flinkで簡単なCEPについて見てきました。
当然ではありますが、条件でマッチしたイベントをストリームで取得できるのが便利ですね!
イベントのパターンに指定できる条件は他にもありますので、より複雑なイベントの条件指定が可能です。
ぜひ触ってみてください。

それでは。

Acroquest Technologyでは、キャリア採用を行っています。

  • ビッグデータHadoop/Spark、NoSQL)、データ分析(Elasticsearch、Python関連)、Web開発(SpringCloud/SpringBoot、AngularJS)といった最新のOSSを利用する開発プロジェクトに関わりたい。
  • マイクロサービスDevOpsなどの技術を使ったり、データ分析機械学習などのスキルを活かしたい。
  • 社会貢献性の高いプロジェクトや、顧客の価値を創造するようなプロジェクトで、提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や、対外的な勉強会の開催・参加を通した技術の発信、社内勉強会での技術情報共有により、エンジニアとして成長したい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

Elasticsearchを仕事で使いこみたいデータ分析エンジニア募集中! - Acroquest Technology株式会社のエンジニア中途・インターンシップ・契約・委託の求人 - Wantedlywww.wantedly.com