この記事では、kafka-clientsライブラリ(バージョン2.6.0を検討)のリスナーに対して自動コミットメカニズムがどのように機能するかをもう少し詳しく説明したいと思います。
でドキュメント、我々はどのように自動コミット作品説明する以下の処方を見つけることができます。
自動コミットは基本的に、auto.commit.interval.ms構成プロパティで設定された期間を持つcronとして機能します。コンシューマーがクラッシュした場合、再起動またはリバランスの後、クラッシュしたコンシューマーが所有するすべてのパーティションの位置は、最後にコミットされたオフセットにリセットされます。
次に、KafkaConsumerのJavaドキュメントには、次の説明が含まれています。
コンシューマーは、オフセットを定期的に自動的にコミットできます。または、コミットAPI(commitSyncやcommitAsyncなど)の1つを呼び出して、このコミットされた位置を手動で制御することを選択できます。
これらの定式化から、非ブロッキング自動オフセットコミットがバックグラウンドで発生し、それが特定のコンシューマーによるメッセージの受信プロセスにどのように関連しているか、そして最も重要なことに、どのような配信保証があるかが完全に明確ではないという誤解が生じる可能性があります?
org.apache.kafkaライブラリのKafkaConsumerクラスの実装例を使用して、enable.auto.commit = true設定でリスナーがメッセージを受信するメカニズムを詳しく見てみましょう:kafka-clients:2.6.0
これを行うには、java docKafkaConsumerにある例を検討してください。
Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("enable.auto.commit", "true");
props.setProperty("auto.commit.interval.ms", "1000");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
この場合、自動コミットはどのように行われますか?答えは、新しいメッセージを受信するためのメソッド自体にあるはずです。
consumer.poll(Duration.ofMillis(100));
. KafkaConsumer auto-commit enable.auto.commit auto.commit.interval.ms ConsumerCoordinator , auto-commit.
public void maybeAutoCommitOffsetsAsync(long now) {
if (autoCommitEnabled) {
nextAutoCommitTimer.update(now);
if (nextAutoCommitTimer.isExpired()) {
nextAutoCommitTimer.reset(autoCommitIntervalMs);
doAutoCommitOffsetsAsync();
}
}
}
enable.auto.commit = true auto.commit.interval.ms , , ( doAutoCommitOffsetsAsync)
private void doAutoCommitOffsetsAsync() {
Map<TopicPartition, OffsetAndMetadata> allConsumedOffsets = subscriptions.allConsumed();
log.debug("Sending asynchronous auto-commit of offsets {}", allConsumedOffsets);
commitOffsetsAsync(allConsumedOffsets, (offsets, exception) -> {
if (exception != null) {
if (exception instanceof RetriableCommitFailedException) {
log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
exception);
nextAutoCommitTimer.updateAndReset(rebalanceConfig.retryBackoffMs);
} else {
log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
}
} else {
log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
}
});
}
poll KafkaConsumer. updateAssignmentMetadataIfNeeded, poll ConsumerCoordinator, , maybeAutoCommitOffsetsAsync
poll KafkaConsumer:
offset
.
KafkaConsumer , .
.1 enable.auto.commit = true auto.commit.interval.ms. .. poll() 3 , auto.commit.interval.ms=6000, .
? “at least once delivery”, .