こんにちは、阪本です。
IoTやらデータ分析やらに関わる機会が増えて、大量のストリームデータに埋もれる(もとい、処理する^^;)ことが増えてきました。
しかも、関わっている業務的に、大概において、データ欠損は許されないという。。
そんなストリームデータを、高速かつ障害に強いアーキテクチャで処理しれくれるのが、Apache Flinkです。
アーキテクチャはこんな感じになっています↓
Apache Flink 1.2.0 Documentation: Distributed Runtime Environment
Flinkの動きをざっくり書くと、
- JobManagerはジョブを受け付け、1つ以上のTaskManagerに処理の実行を指示する。
- JobManagerは処理中、各TaskManagerにチェックポイントトリガーを発行する。
- TaskManagerはチェックポイントトリガーを受信すると、状態のスナップショットを保存し、JobManagerにAckを返す。
- 障害が発生すると、チェックポイントに戻り、再度実行を行う。
となっていて、このチェックポイントの仕組みが、耐障害性を高めています。
でも、障害に強いと言っても、実際に障害が発生したらどのような挙動になるの?
Exactly onceと書いてあるけど、本当にそうなの?
そんな疑問を解消すべく、こんな条件で試してみました。
2台のKafka Brokerを立て、2台のFlink TaskManagerがConsumerとして並列処理でデータを取得し、Elasticsearchに書き込みます。
動作中に、TaskManagerのうちの1台を強制停止したらどうなるか?をやってみました。
まずは何も気にせずやってみる
Flinkアプリケーションのコードは次のような感じです。文字列をJSONとして解析し、Mapに変換してElasticsearchに流しています。
public class FlinkExecutor {
public static void main(String... args) {
new FlinkExecutor().start();
}
public void start() {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000L);
env.setParallelism(2);
Properties kafkaProps = new Properties();
kafkaProps.setProperty("bootstrap.servers", "192.168.0.1");
kafkaProps.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
"topic", new SimpleStringSchema(), kafkaProps));
SingleOutputStreamOperator<Map<String, Object>> outputStream = stream
.map(FlinkExecutor::parseJson);
Map<String, String> elasticsearchConfig = new HashMap<>();
elasticsearchConfig.put("bulk.flush.max.actions", "1");
elasticsearchConfig.put("cluster.name", "elasticsearch");
List<InetSocketAddress> transports = Arrays.asList(new InetSocketAddress("192.168.0.4", 9300));
outputStream.addSink(new ElasticsearchSink<>(elasticsearchConfig, transports,
new MyElasticsearchSinkFunction<>()));
env.execute()
}
@SuppressWarnings("unchecked")
private static Map<String, Object> parseJson(String arg) {
try {
return mapper.readValue(arg, Map.class);
} catch (IOException ex) {
ex.printStackTrace();
return null;
}
}
static class MyElasticsearchSinkFunction<T extends Map<String, Object>> implements ElasticsearchSinkFunction<T> {
private static final long serialVersionUID = -3596543908572672509L;
@Override
public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
IndexRequest request = Requests.indexRequest()
.index("flink-sample-index").type("sample-type").source(element);
indexer.add(request);
}
}
}
Kafkaに対して20000件のデータを連続して送信中に、2つあるFlink TaskManagerの1つをkillしたところ、TaskManagerと通信できなくなった旨のメッセージがJobManagerのログに出ていました。
2017-03-15 18:56:50,648 WARN flink-akka.actor.default-dispatcher-3 akka.remote.ReliableDeliverySupervisor - Association with remote system [akka.tcp://flink@flink-taskmanager1:36911] has failed, address is now gated for [5000] ms. Reason: [Association failed with [akka.tcp://flink@flink-taskmanager1:36911]] Caused by: [ホストへの経路がありません
何回か上記のWARNログが出た後、タスクとジョブの状態が「FAILED」となりました。
2017-03-15 18:57:33,615 WARN flink-akka.actor.default-dispatcher-15 akka.remote.RemoteWatcher - Detected unreachable: [akka.tcp://flink@flink-taskmanager1:36911]
2017-03-15 18:57:33,620 INFO flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.jobmanager.JobManager - Task manager akka.tcp://flink@flink-taskmanager1:36911/user/taskmanager terminated.
2017-03-15 18:57:33,621 INFO flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Custom Source -> Map -> Filter -> Sink: Unnamed (2/2) (103335fecb10487207ede8049ef5771c) switched from RUNNING to FAILED.java.lang.Exception: TaskManager was lost/killed: ResourceID{resourceId='953329bfa928e188c95f623efe90bd2f'} @ flink-taskmanager1 (dataPort=43427)
...
2017-03-15 18:57:33,626 INFO flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state RUNNING to FAILING.
ジョブが失敗しても、生きているTaskManagerが残っていれば、自動的にジョブが再開されました(状態がRUNNINGになっています)。
2017-03-15 18:57:33,692 INFO flink-akka.actor.default-dispatcher-15 org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state FAILING to RESTARTING
...
2017-03-15 18:57:43,694 INFO jobmanager-future-1-thread-1 org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state RESTARTING to CREATED.
...
2017-03-15 18:57:43,700 INFO jobmanager-future-1-thread-1 org.apache.flink.runtime.executiongraph.ExecutionGraph - Job Flink Streaming Job (888c1c6b9e910fddad08615b82bd0782) switched from state CREATED to RUNNING
ジョブはめでたく再開されました。が、Elasticsearchには、欠損はなかったものの、一部重複するレコードが格納されてしまいました。。
それはある意味自明で、最後のチェックポイント(次の図のチェックポイント2)以降の状態は失われ、再実行されるからです。
Elasticsearchは、トランザクションやスナップショットの仕組みがないため、重複して実行された部分はレコードも重複してしまうことになります。
重複しないようにする
じゃあ、どうすれば重複せずにレコードをElasticsearchに書き込めるでしょうか?
答えは、冪等性を保つような書き込み方にしてあげればOKです。
MyElasticsearchSinkFunctionクラスでElasticsearchに書き込むデータを作っていましたが、
ここにidを指定して、同じデータは同じ一意のidになるようにすることで、重複した(同じidの)レコードがElasticsearchに格納されなくなります。
(厳密には、こっそり見えないところに保存されているようですが。)
@Override
public void process(T element, RuntimeContext ctx, RequestIndexer indexer) {
String id = element.get("timestamp") + "_" + element.get("category");
IndexRequest request = Requests.indexRequest()
.index("flink-sample-index").type("sample-type").id(id).source(element);
indexer.add(request);
}
今度こそめでたく、重複せずにElasticsearchに格納することができました。