Kafkaコンシューマのリバランスをローカルで体験
TOC
目的
ローカルでコンシューマリバランスを発生させ、その際の挙動を確認します。
Kafkaにおいて、Topic上のPartitionは、Consumer Group上のConsumerに下図の様に紐付けされます。
リバランスが発生すると、PartitionとConsumerの紐付けが更新されます。
やってみた
概要
PartitionとConsumerの紐づけは、Brokerが管理しており、タイムアウトを契機にリバランスが発生します。
1 つのブローカーが コーディネーター に指定され、グループのメンバーおよびパーティション割り当ての管理を担います。...
セッションタイムアウトの期限までにハートビートを受信しなかった場合、コーディネーターはそのメンバーをグループから除外し、そのメンバーのパーティションを別のメンバーに割り当て直します。
Kafka コンシューマー | Confluent Documentationより
以降では、Javaでコンシューマを実装し、タイムアウトさせてみましょう。 使用したコードはこちらから参照可能です。
事前調査
リバランスが発生する契機となるタイムアウトに関して、以下の2つが存在します。今回は2つ目の方法でタイムアウトを発生させます。
- session.timeout.ms: コンシューマからのheartbeatを待つ時間
- max.poll.interval.ms: コンシューマからのメッセージ取得を待つ時間
確認手順
Kafka
リポジトリのルートフォルダで、以下手順で準備を行います。
$ docker-compose -f docker/docker-compose.yml up -d $ docker exec -it kafka /bin/bash // 以降、コンテナ内での作業 // testトピックをパーティション数1で作成 # /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 1 --replication-factor 1 --topic test // プロヂューサー側でのデータ積み込み # /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test > 1 > 2 > 3 > 4 > 5
コンシューマ(Java)
メッセージの取得に関して、以下の通り設定を行いました。
- 1度のpollメソッドで、1つのメッセージを取得
- max.poll.interval.msに、2000を設定
そして、1メッセージを受信する毎に、1秒づつ待ち時間が増える様にしました。
Ref: https://github.com/U0326/samp-impl-kafka-api/blob/3876ddd285e7e853c722b796d34401dafac17585/src/main/java/work/garaku/code/example/ConsumerExample.java#L36
あとは、mainメソッドを実行します。
結果確認
Consumer(Java)のログ
まず、Thread-1で3つ目のメッセージまで行われ、オフセットのコミット前にタイムアウトが発生します。
次に、Thread-0で3つ目のメッセージを再度取得し、5つ目のメッセージを取得した後、オフセットのコミット前にタイムアウトが発生します。
thread_name: Thread-1, message_key: null, message_value:1 thread_name: Thread-1, message_key: null, message_value:2 thread_name: Thread-1, message_key: null, message_value:3 thread_name: Thread-0, message_key: null, message_value:3 Exception in thread "Thread-1" org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1231) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1090) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1501) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1400) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1357) at work.garaku.code.example.ConsumerExample.lambda$static$0(ConsumerExample.java:37) at java.base/java.lang.Thread.run(Thread.java:834) thread_name: Thread-0, message_key: null, message_value:4 thread_name: Thread-0, message_key: null, message_value:5 Exception in thread "Thread-0" org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1231) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1090) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1501) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1400) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1357) at work.garaku.code.example.ConsumerExample.lambda$static$0(ConsumerExample.java:37) at java.base/java.lang.Thread.run(Thread.java:834)
Kafkaのログ
5行目のログで2つのコンシューマが接続したことが記録されています。
6-9行目でリバランスの実施が記録されています。
kafka_1 | [2022-05-29 07:49:55,315] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 210 (__consumer_offsets-6) (reason: Adding new member consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7 with group instance id None) (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:49:55,320] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 211 (__consumer_offsets-6) with 1 members (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:49:55,322] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 211 (__consumer_offsets-6) (reason: Adding new member consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e with group instance id None) (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:49:55,338] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 212 (__consumer_offsets-6) with 2 members (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:49:55,344] INFO [GroupCoordinator 1001]: Assignment received from leader for group consumer-1 for generation 212. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:00,459] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7] in group consumer-1 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:00,460] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 212 (__consumer_offsets-6) (reason: removing member consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:01,322] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 213 (__consumer_offsets-6) with 1 members (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:01,326] INFO [GroupCoordinator 1001]: Assignment received from leader for group consumer-1 for generation 213. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:06,438] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e] in group consumer-1 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:06,439] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 213 (__consumer_offsets-6) (reason: removing member consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e on LeaveGroup) (kafka.coordinator.group.GroupCoordinator) kafka_1 | [2022-05-29 07:50:06,439] INFO [GroupCoordinator 1001]: Group consumer-1 with generation 214 is now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)