こんにちは、阪本です。
IoTやらデータ分析やらに関わる機会が増えて、大量のストリームデータに埋もれる(もとい、処理する^^;)ことが増えてきました。
しかも、関わっている業務的に、大概において、データ欠損は許されないという。。
そんなストリームデータを、高速かつ障害に強いアーキテクチャで処理しれくれるのが、Apache Flinkです。
アーキテクチャはこんな感じになっています↓
Apache Flink 1.2.0 Documentation: Distributed Runtime Environment
Flinkの動きをざっくり書くと、
- JobManagerはジョブを受け付け、1つ以上のTaskManagerに処理の実行を指示する。
- JobManagerは処理中、各TaskManagerにチェックポイントトリガーを発行する。
- TaskManagerはチェックポイントトリガーを受信すると、状態のスナップショットを保存し、JobManagerにAckを返す。
- 障害が発生すると、チェックポイントに戻り、再度実行を行う。
となっていて、このチェックポイントの仕組みが、耐障害性を高めています。
でも、障害に強いと言っても、実際に障害が発生したらどのような挙動になるの?
Exactly onceと書いてあるけど、本当にそうなの?
そんな疑問を解消すべく、こんな条件で試してみました。
2台のKafka Brokerを立て、2台のFlink TaskManagerがConsumerとして並列処理でデータを取得し、Elasticsearchに書き込みます。
動作中に、TaskManagerのうちの1台を強制停止したらどうなるか?をやってみました。
まずは何も気にせずやってみる
Flinkアプリケーションのコードは次のような感じです。文字列をJSONとして解析し、Mapに変換してElasticsearchに流しています。
public class FlinkExecutor { public static void main(String... args) { new FlinkExecutor().start(); } public void start() { // 5秒おきにCheckPointを発行、並行処理数を2とする StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000L); env.setParallelism(2); // Kafkaから受け取った文字列をJSON解析する Properties kafkaProps = new Properties(); kafkaProps.setProperty("bootstrap.servers", "192.168.0.1"); kafkaProps.setProperty("group.id", "test"); DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>( "topic", new SimpleStringSchema(), kafkaProps)); SingleOutputStreamOperator<Map<String, Object>> outputStream = stream .map(FlinkExecutor::parseJson); // Elasticsearchに書き込む Map<String, String> elasticsearchConfig = new HashMap<>(); elasticsearchConfig.put("bulk.flush.max.actions", "1"); elasticsearchConfig.put("cluster.name", "elasticsearch"); List<InetSocketAddress> transports = Arrays.asList(new InetSocketAddress("192.168.0.4", 9300)); outputStream.addSink(new ElasticsearchSink<>(elasticsearchConfig, transports, new MyElasticsearchSinkFunction<>())); env.execute() } @SuppressWarnings("unchecked") private static Map<String, Object> parseJson(String arg) { try { return mapper.readValue(arg, Map.class); } catch (IOException ex) { ex.printStackTrace(); return null; } } static class MyElasticsearchSinkFunction<T extends Map<String, Object>> implements ElasticsearchSinkFunction<T> { private static final long serialVersionUID = -3596543908572672509L; @Override public void process(T element, RuntimeContext ctx, RequestIndexer indexer) { IndexRequest request = Requests.indexRequest() .index("flink-sample-index").type("sample-type").source(element); indexer.add(request); } } }
Kafkaに対して20000件のデータを連続して送信中に、2つあるFlink TaskManagerの1つをkillしたところ、TaskManagerと通信できなくなった旨のメッセージがJobManagerのログに出ていました。
2017-03-15 18:56:50,648 WARN flink-akka.actor.default-dispatcher-3 akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@flink-taskmanager1:36911] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@flink-taskmanager1:36911]] Caused by: [ホストへの経路がありません
何回か上記のWARNログが出た後、タスクとジョブの状態が「FAILED」となりました。
2017-03-15 18:57:33,615 WARN flink-akka.actor.default-dispatcher-15 akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@flink-taskmanager1:36911] 2017-03-15 18:57:33,620 INFO flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@flink-taskmanager1:36911/user/taskmanager terminated. 2017-03-15 18:57:33,621 INFO flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Filter -> Sink: Unnamed (2/2) (103335fecb10487207ede8049ef5771c) switched from RUNNING to FAILED.java.lang.Exception: TaskManager was lost/killed: ResourceID{resourceId='953329bfa928e188c95f623efe90bd2f'} @ flink-taskmanager1 (dataPort=43427) ... 2017-03-15 18:57:33,626 INFO flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state RUNNING to FAILING.
ジョブが失敗しても、生きているTaskManagerが残っていれば、自動的にジョブが再開されました(状態がRUNNINGになっています)。
2017-03-15 18:57:33,692 INFO flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state FAILING to RESTARTING ... 2017-03-15 18:57:43,694 INFO jobmanager-future-1-thread-1 org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state RESTARTING to CREATED. ... 2017-03-15 18:57:43,700 INFO jobmanager-future-1-thread-1 org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state CREATED to RUNNING
ジョブはめでたく再開されました。が、Elasticsearchには、欠損はなかったものの、一部重複するレコードが格納されてしまいました。。
それはある意味自明で、最後のチェックポイント(次の図のチェックポイント2)以降の状態は失われ、再実行されるからです。
Elasticsearchは、トランザクションやスナップショットの仕組みがないため、重複して実行された部分はレコードも重複してしまうことになります。
重複しないようにする
じゃあ、どうすれば重複せずにレコードをElasticsearchに書き込めるでしょうか?
答えは、冪等性を保つような書き込み方にしてあげればOKです。
MyElasticsearchSinkFunctionクラスでElasticsearchに書き込むデータを作っていましたが、
ここにidを指定して、同じデータは同じ一意のidになるようにすることで、重複した(同じidの)レコードがElasticsearchに格納されなくなります。
(厳密には、こっそり見えないところに保存されているようですが。)
@Override public void process(T element, RuntimeContext ctx, RequestIndexer indexer) { // timestampとcategoryの組み合わせで一意となる場合 String id = element.get("timestamp") + "_" + element.get("category"); IndexRequest request = Requests.indexRequest() .index("flink-sample-index").type("sample-type").id(id).source(element); indexer.add(request); }
今度こそめでたく、重複せずにElasticsearchに格納することができました。
まとめ
Exactly onceといっても、挙動はSink(実はSourceも)の特性に影響されます。
ほとんどのSinkは冪等性を持たず、ゆえにExactly onceではないので、上記のように自分で工夫することが必要です。
とはいいつつ、障害が発生しても途中から自動で勝手にやり直してくれるのは、便利ですね!
今回は単純に各イベントをElasticsearchに保存する処理について調べてみました。
次回は、複数イベント間の関係を考慮して処理するCEP(Complex Event Processing)について調べてみます。
ではでは。
Acroquest Technologyでは、キャリア採用を行っています。
- ビッグデータ(Hadoop/Spark、NoSQL)、データ分析(Elasticsearch、Python関連)、Web開発(SpringCloud/SpringBoot、AngularJS)といった最新のOSSを利用する開発プロジェクトに関わりたい。
- マイクロサービス、DevOpsなどの技術を使ったり、データ分析、機械学習などのスキルを活かしたい。
- 社会貢献性の高いプロジェクトや、顧客の価値を創造するようなプロジェクトで、提案からリリースまで携わりたい。
- 書籍・雑誌等の執筆や、対外的な勉強会の開催・参加を通した技術の発信、社内勉強会での技術情報共有により、エンジニアとして成長したい。
少しでも上記に興味を持たれた方は、是非以下のページをご覧ください。Elasticsearchを仕事で使いこみたいデータ分析エンジニア募集中! - Acroquest Technology株式会社のエンジニア中途・インターンシップ・契約・委託の求人 - Wantedlywww.wantedly.com