The Magical Rebalance Protocol of Apache Kafka

  • Very useful (and dense) talk that goes into the details of Kafka consumer groups.

  • Production/consumption always occurs against the leader replica (for a partition); all other replicas are simply standbys.

  • The consumer_offsets topic contains offsets for each connected consumer (group), which provides fault tolerance.

  • Consumers in a group figure out what partitions to consume using a homegrown protocol.

    • This first used Zookeeper, which couldn't scale (expected load is 30-50k consumer groups, with 100+ consumers per group)

    • v2 was a custom protocol that had a broker make all the assignments, which was unsuitable

    • v3 (current) moves this responsibility over to a consumer in each group

  • Protocol steps:

    • FindCoordinator: hash group id, modulo no. of partitions in consumer_offsets to get a partition. leader of this partition is the coordinator (guaranteed to be live)

    • JoinGroup: agree on an assignment algorithm + receive a member id + choose one consumer as the leader (linked list of consumers in the order they joined; head of the list is the leader, remove when a consumer dies)

    • SyncGroup: leader decides the paritition assignment and tells the coordinator, which gives this assigment to all other consumers

    • Heartbeat: consumers tell the coordinator they're still alive. Response can be just OK, or a command to rebalance, which is triggered when a consumer enters/leaves the group, topic metadata changes, etc.

    • LeaveGroup: consumer deliberately leaves the group

  • Can specify multiple assignment strategies when joining a group, in priority order. This is useful for upgrades, when each consumer may support a different set of assignment strategies. The coordinator settles on the highest-priority strategy that all consumers can support.

  • As far as I can tell, consumers don't talk directly to each other

  • Override AbstractCoordinator to implement a custom coordination (at the client level, not the broker) strategy (essentially customize consumer behavior for each of the 5 steps above)

  • Create a new ConsumerPartitionAssignor to implement just a custom partition assignment strategy

  • There was some explanation of ways this protocol is used outside the core use case towards the end, but went by too fast for me to actually grasp:

    • The schema registry overrides AbstractCoordinator to provide a leader election system (does this depend on a Kafka broker or is it entirely standalone?)

    • Kafka connect uses this to assign generic tasks (copy table X from a given source database, for example) to workers, so there's some scope to use this more generically, not just with partitions.

    • Kafka streams uses a custom assignor to have a consumer join data from two source topics. This is possible because the protocol allows for user metadata, which this strategy (ab)uses.

      • RocksDB for local state, but I didn't really understand the context/motivation

  • I'm curious about fault-tolerance strategies for when the coordinator goes down, which this talk didn't cover.

Edit