Taste of Tech Topics

Acroquest Technology株式会社のエンジニアが書く技術ブログ

Apache Flink の耐障害性はどんなもの? ~Exactly once の振る舞い~

こんにちは、阪本です。

IoTやらデータ分析やらに関わる機会が増えて、大量のストリームデータに埋もれる(もとい、処理する^^;)ことが増えてきました。
しかも、関わっている業務的に、大概において、データ欠損は許されないという。。

そんなストリームデータを、高速かつ障害に強いアーキテクチャで処理しれくれるのが、Apache Flinkです。
アーキテクチャはこんな感じになっています↓
Apache Flink 1.2.0 Documentation: Distributed Runtime Environment

Flinkの動きをざっくり書くと、

  • JobManagerはジョブを受け付け、1つ以上のTaskManagerに処理の実行を指示する。
  • JobManagerは処理中、各TaskManagerにチェックポイントトリガーを発行する。
  • TaskManagerはチェックポイントトリガーを受信すると、状態のスナップショットを保存し、JobManagerにAckを返す。
  • 障害が発生すると、チェックポイントに戻り、再度実行を行う。

となっていて、このチェックポイントの仕組みが、耐障害性を高めています。

でも、障害に強いと言っても、実際に障害が発生したらどのような挙動になるの?
Exactly onceと書いてあるけど、本当にそうなの?
そんな疑問を解消すべく、こんな条件で試してみました。

f:id:acro-engineer:20170227171700p:plain

2台のKafka Brokerを立て、2台のFlink TaskManagerがConsumerとして並列処理でデータを取得し、Elasticsearchに書き込みます。
動作中に、TaskManagerのうちの1台を強制停止したらどうなるか?をやってみました。

まずは何も気にせずやってみる

Flinkアプリケーションのコードは次のような感じです。文字列をJSONとして解析し、Mapに変換してElasticsearchに流しています。

public class FlinkExecutor {
    public static void main(String... args) {
        new FlinkExecutor().start();
    }

    public void start() {
        // 5秒おきにCheckPointを発行、並行処理数を2とする
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.enableCheckpointing(5000L);
        env.setParallelism(2);

        // Kafkaから受け取った文字列をJSON解析する
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", "192.168.0.1");
        kafkaProps.setProperty("group.id", "test");
        DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
                "topic", new SimpleStringSchema(), kafkaProps));
        SingleOutputStreamOperator<Map<String, Object>> outputStream = stream
                .map(FlinkExecutor::parseJson);

        // Elasticsearchに書き込む
        Map<String, String> elasticsearchConfig = new HashMap<>();
        elasticsearchConfig.put("bulk.flush.max.actions", "1");
        elasticsearchConfig.put("cluster.name", "elasticsearch");
        List<InetSocketAddress> transports = Arrays.asList(new InetSocketAddress("192.168.0.4", 9300));
        outputStream.addSink(new ElasticsearchSink<>(elasticsearchConfig, transports,
                new MyElasticsearchSinkFunction<>()));

        env.execute()
    }

    @SuppressWarnings("unchecked")
    private static Map<String, Object> parseJson(String arg) {
        try {
            return mapper.readValue(arg, Map.class);
        } catch (IOException ex) {
            ex.printStackTrace();
            return null;
        }
    }

    static class MyElasticsearchSinkFunction<T extends Map<String, Object>> implements ElasticsearchSinkFunction<T> {
        private static final long serialVersionUID = -3596543908572672509L;

        @Override
        public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
            IndexRequest request = Requests.indexRequest()
                    .index("flink-sample-index").type("sample-type").source(element);
            indexer.add(request);
        }
    }
}

Kafkaに対して20000件のデータを連続して送信中に、2つあるFlink TaskManagerの1つをkillしたところ、TaskManagerと通信できなくなった旨のメッセージがJobManagerのログに出ていました。

2017-03-15 18:56:50,648 WARN  flink-akka.actor.default-dispatcher-3 akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@flink-taskmanager1:36911] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@flink-taskmanager1:36911]] Caused by: [ホストへの経路がありません

何回か上記のWARNログが出た後、タスクとジョブの状態が「FAILED」となりました。

2017-03-15 18:57:33,615 WARN  flink-akka.actor.default-dispatcher-15 akka.remote.RemoteWatcher                                     - Detected unreachable: [akka.tcp://flink@flink-taskmanager1:36911]
2017-03-15 18:57:33,620 INFO  flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.jobmanager.JobManager                - Task manager akka.tcp://flink@flink-taskmanager1:36911/user/taskmanager terminated.
2017-03-15 18:57:33,621 INFO  flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: Custom Source -> Map -> Filter -> Sink: Unnamed (2/2) (103335fecb10487207ede8049ef5771c) switched from RUNNING to FAILED.java.lang.Exception: TaskManager was lost/killed: ResourceID{resourceId='953329bfa928e188c95f623efe90bd2f'} @ flink-taskmanager1 (dataPort=43427)
...
2017-03-15 18:57:33,626 INFO  flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state RUNNING to FAILING.

ジョブが失敗しても、生きているTaskManagerが残っていれば、自動的にジョブが再開されました(状態がRUNNINGになっています)。

2017-03-15 18:57:33,692 INFO  flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state FAILING to RESTARTING
...
2017-03-15 18:57:43,694 INFO  jobmanager-future-1-thread-1 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state RESTARTING to CREATED.
...
2017-03-15 18:57:43,700 INFO  jobmanager-future-1-thread-1 org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state CREATED to RUNNING

ジョブはめでたく再開されました。が、Elasticsearchには、欠損はなかったものの、一部重複するレコードが格納されてしまいました。。
それはある意味自明で、最後のチェックポイント(次の図のチェックポイント2)以降の状態は失われ、再実行されるからです。
f:id:acro-engineer:20170317003026p:plain

Elasticsearchは、トランザクションやスナップショットの仕組みがないため、重複して実行された部分はレコードも重複してしまうことになります。

重複しないようにする

じゃあ、どうすれば重複せずにレコードをElasticsearchに書き込めるでしょうか?
答えは、冪等性を保つような書き込み方にしてあげればOKです。

MyElasticsearchSinkFunctionクラスでElasticsearchに書き込むデータを作っていましたが、
ここにidを指定して、同じデータは同じ一意のidになるようにすることで、重複した(同じidの)レコードがElasticsearchに格納されなくなります。
(厳密には、こっそり見えないところに保存されているようですが。)

@Override
public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
    // timestampとcategoryの組み合わせで一意となる場合
    String id = element.get("timestamp") + "_" + element.get("category");
    IndexRequest request = Requests.indexRequest()
            .index("flink-sample-index").type("sample-type").id(id).source(element);
    indexer.add(request);
}

今度こそめでたく、重複せずにElasticsearchに格納することができました。

まとめ

Exactly onceといっても、挙動はSink(実はSourceも)の特性に影響されます。
ほとんどのSinkは冪等性を持たず、ゆえにExactly onceではないので、上記のように自分で工夫することが必要です。

とはいいつつ、障害が発生しても途中から自動で勝手にやり直してくれるのは、便利ですね!

今回は単純に各イベントをElasticsearchに保存する処理について調べてみました。
次回は、複数イベント間の関係を考慮して処理するCEP(Complex Event Processing)について調べてみます。

ではでは。

Acroquest Technologyでは、キャリア採用を行っています。

  • ビッグデータHadoop/Spark、NoSQL)、データ分析(Elasticsearch、Python関連)、Web開発(SpringCloud/SpringBoot、AngularJS)といった最新のOSSを利用する開発プロジェクトに関わりたい。
  • マイクロサービスDevOpsなどの技術を使ったり、データ分析機械学習などのスキルを活かしたい。
  • 社会貢献性の高いプロジェクトや、顧客の価値を創造するようなプロジェクトで、提案からリリースまで携わりたい。
  • 書籍・雑誌等の執筆や、対外的な勉強会の開催・参加を通した技術の発信、社内勉強会での技術情報共有により、エンジニアとして成長したい。

 
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。

Elasticsearchを仕事で使いこみたいデータ分析エンジニア募集中! - Acroquest Technology株式会社のエンジニア中途・インターンシップ・契約・委託の求人 - Wantedlywww.wantedly.com