Anarchy In the 1K

Kafkaコンシューマのリバランスをローカルで体験

TOC

目的

ローカルでコンシューマリバランスを発生させ、その際の挙動を確認します。
Kafkaにおいて、Topic上のPartitionは、Consumer Group上のConsumerに下図の様に紐付けされます。

PartitionとConsumerの関係
リバランスが発生すると、PartitionとConsumerの紐付けが更新されます。

やってみた

概要

PartitionとConsumerの紐づけは、Brokerが管理しており、タイムアウトを契機にリバランスが発生します。

1 つのブローカーが コーディネーター に指定され、グループのメンバーおよびパーティション割り当ての管理を担います。...
セッションタイムアウトの期限までにハートビートを受信しなかった場合、コーディネーターはそのメンバーをグループから除外し、そのメンバーのパーティションを別のメンバーに割り当て直します。
Kafka コンシューマー | Confluent Documentationより

以降では、Javaでコンシューマを実装し、タイムアウトさせてみましょう。 使用したコードはこちらから参照可能です。

事前調査

リバランスが発生する契機となるタイムアウトに関して、以下の2つが存在します。今回は2つ目の方法でタイムアウトを発生させます。

  1. session.timeout.ms: コンシューマからのheartbeatを待つ時間
  2. max.poll.interval.ms: コンシューマからのメッセージ取得を待つ時間

確認手順

Kafka

リポジトリのルートフォルダで、以下手順で準備を行います。

$ docker-compose -f docker/docker-compose.yml up -d
$ docker exec -it kafka /bin/bash

// 以降、コンテナ内での作業
// testトピックをパーティション数1で作成
# /opt/kafka/bin/kafka-topics.sh --zookeeper zookeeper:2181 --create --partitions 1 --replication-factor 1 --topic test
// プロヂューサー側でのデータ積み込み
# /opt/kafka/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
> 1
> 2
> 3
> 4
> 5

コンシューマ(Java)

メッセージの取得に関して、以下の通り設定を行いました。

  • 1度のpollメソッドで、1つのメッセージを取得
  • max.poll.interval.msに、2000を設定

Ref: https://github.com/U0326/samp-impl-kafka-api/blob/3876ddd285e7e853c722b796d34401dafac17585/src/main/java/work/garaku/code/example/ConsumerExample.java#L21-L22

そして、1メッセージを受信する毎に、1秒づつ待ち時間が増える様にしました。
Ref: https://github.com/U0326/samp-impl-kafka-api/blob/3876ddd285e7e853c722b796d34401dafac17585/src/main/java/work/garaku/code/example/ConsumerExample.java#L36
あとは、mainメソッドを実行します。

結果確認

Consumer(Java)のログ

まず、Thread-1で3つ目のメッセージまで行われ、オフセットのコミット前にタイムアウトが発生します。
次に、Thread-0で3つ目のメッセージを再度取得し、5つ目のメッセージを取得した後、オフセットのコミット前にタイムアウトが発生します。

thread_name: Thread-1, message_key: null, message_value:1
thread_name: Thread-1, message_key: null, message_value:2
thread_name: Thread-1, message_key: null, message_value:3
thread_name: Thread-0, message_key: null, message_value:3
Exception in thread "Thread-1" org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1231)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1090)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1501)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1400)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1357)
    at work.garaku.code.example.ConsumerExample.lambda$static$0(ConsumerExample.java:37)
    at java.base/java.lang.Thread.run(Thread.java:834)
thread_name: Thread-0, message_key: null, message_value:4
thread_name: Thread-0, message_key: null, message_value:5
Exception in thread "Thread-0" org.apache.kafka.clients.consumer.CommitFailedException: Offset commit cannot be completed since the consumer is not part of an active group for auto partition assignment; it is likely that the consumer was kicked out of the group.
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:1231)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1090)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1501)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1400)
    at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1357)
    at work.garaku.code.example.ConsumerExample.lambda$static$0(ConsumerExample.java:37)
    at java.base/java.lang.Thread.run(Thread.java:834)

Kafkaのログ

5行目のログで2つのコンシューマが接続したことが記録されています。
6-9行目でリバランスの実施が記録されています。

kafka_1      | [2022-05-29 07:49:55,315] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 210 (__consumer_offsets-6) (reason: Adding new member consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7 with group instance id None) (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:49:55,320] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 211 (__consumer_offsets-6) with 1 members (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:49:55,322] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 211 (__consumer_offsets-6) (reason: Adding new member consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e with group instance id None) (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:49:55,338] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 212 (__consumer_offsets-6) with 2 members (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:49:55,344] INFO [GroupCoordinator 1001]: Assignment received from leader for group consumer-1 for generation 212. The group has 2 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:00,459] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7] in group consumer-1 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:00,460] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 212 (__consumer_offsets-6) (reason: removing member consumer-consumer-1-1-7bc4e636-3930-47e9-ac5d-efc96ee0f7a7 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:01,322] INFO [GroupCoordinator 1001]: Stabilized group consumer-1 generation 213 (__consumer_offsets-6) with 1 members (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:01,326] INFO [GroupCoordinator 1001]: Assignment received from leader for group consumer-1 for generation 213. The group has 1 members, 0 of which are static. (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:06,438] INFO [GroupCoordinator 1001]: Member[group.instance.id None, member.id consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e] in group consumer-1 has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:06,439] INFO [GroupCoordinator 1001]: Preparing to rebalance group consumer-1 in state PreparingRebalance with old generation 213 (__consumer_offsets-6) (reason: removing member consumer-consumer-1-2-04eb45f5-e301-45e5-b63e-60ced8dc616e on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)
kafka_1      | [2022-05-29 07:50:06,439] INFO [GroupCoordinator 1001]: Group consumer-1 with generation 214 is now empty (__consumer_offsets-6) (kafka.coordinator.group.GroupCoordinator)