Always manually send heartbeats when processing message batches in ruby-kafka

I have found it useful to always manually send heartbeats when processing message batches with ruby-kafka. Not doing it often leads to instability in the consumer group at the worst possible time – when you have a performance degradation within the consumer loop. Consumer group instability means: frequent consumer group rebalances. Frequent consumer group rebalances prevent your consumers from making any progress which leads to a severe increase in Kafka consumer lag.

The consumer loop is the heart of a Kafka consumer. ruby-kafka provides very convenient interfaces for users of the library. A topic can be processed in batches as shown below:

consumer.subscribe("topic_name")

# consumer loop
consumer.each_batch do |batch|
  batch.messages.each do |message|
    # process each message
  end
end

The method Kafka::Consumer#each_batch abstracts the complexity of interacting with Kafka:

  • Fetching messages from Kafka
  • Committing the processed messages
  • Coordination between consumers of the same group
  • Instrumentation

Kafka consumer groups require coordination between the consumers and the Kafka broker. The consumers of the group divide the topic’s partitions among themselves. A Kafka broker – the group coordinator – monitors the health of the consumers in the group. Whenever a consumer is unhealthy, the group coordinator will trigger a rebalance – it will redistribute the partitions among the remaining healthy consumers. During a rebalance your consumers experience a short period of unavailability since they are not making progress in consuming the topic.

Consumers send “heartbeats” to the group coordinator in order to indicate that they are healthy. A rebalance is triggered whenever a group coordinator hasn’t received a heartbeat for longer than the configured session.timeout.ms (e.g. 30 seconds).

In ruby-kafka’s consumer loop the heartbeat is triggered after the whole batch is processed. This means that whenever processing your batch takes longer than session.timeout.ms you are going to get in trouble!

“My consumer loops always take less than 30 seconds!” you may say. But the behavior of production systems is dynamic. If tomorrow, your database latency degrades by 100x, your consumer loops may take longer. If tomorrow, your consumers suddenly fetch batches of 5 000 messages instead of 5 messages, your consumer loops may take longer. The behavior around consumer group rebalancing compounds a service degradation into a service unavailability.

I prefer to manually trigger heartbeats after each message is processed in order to make consumers resilient against unexpected degradations in processing latency:

consumer.each_batch do |batch|
  batch.messages.each do |message|
    # process each message

    trigger_heartbeat_if_necessary(consumer)
  end
end

def trigger_heartbeat_if_necessary(consumer)
  consumer.trigger_heartbeat
rescue 
# we need to rescue errors since we aren't properly handling any exceptions raised
# which are typically handled by Kafka::Consumer#consumer_loop
end

This solution will still run into problems if processing a single message takes longer than session.timeout though. But that is a more rare issue.

Kafka::Consumer#trigger_heartbeat will only send a heartbeat if necessary. The frequency at which heartbeats need to be sent is configured using the heartbeat_interval (e.g. every 10 seconds). For example: we may call trigger_heartbeat multiple times within a period of 10 seconds and it will only actually send the heartbeat once.

Note that within the trigger_heartbeat_if_necessary I had to rescue all exceptions.
The trigger_heartbeat method may raise exceptions related to group coordination (e.g. RebalanceInProgress, HeartbeatError). Usually these exceptions are handled within the consumer loop of #each_batch when the heartbeats are triggered by the gem. But in this specific case we are calling the method directly within the processing block and need to avoid raising a ProcessingError – a ProcessingError will trigger a rebalance. Ideally I should only be rescuing Kafka:: RebalanceInProgress or Kafka::HeartbeatError but I decided to “rescue all” just in case new consumer loop exceptions are added to the library.

The Kafka Java client library takes a different approach to solve this issue. A consumer has a separate thread to send heartbeats. In ruby-kafka the same thread processes the batch and sends heartbeats.


You can find other posts of my series on queues here. I can send you my posts straight to your e-mail inbox if you sign up for my newsletter.