Tracking queue metrics with the ruby-kafka gem

Kafka may be used as a queue to send messages between different systems therefore the relevant metrics should be collected.

The ruby-kafka gem has amazing support to track the most important queue metrics. It has out of the box instrumentation for Statsd and Datadog. It also has instrumentation hooks which rely on Active Support Notifications.

In Kafka, a single topic may have different consumer groups. Each consumer group consumes the topic with its own message offset. It is as if each consumer group had a separate queue. Therefore, all consumer side metrics (i.e. queue latency, consume rate, processing latency) should track each consumer group/topic pair separately.

Below you will find how to track some important queue metrics using ruby-kafka.

Built-in instrumentation for Statsd and Datadog

In order to have built-in instrumentation for Statsd/Datadog you will need to require kafka/statsd or kafka/datadog and configure the client. The project’s readme explains how to do that.

You can find everything that is instrumented for those reporters by checking their source code (Statsd, Datadog).

Queue latency

ruby-kafka provides built-in queue latency tracking whenever each message is processed individually (i.e. using #each_message). The relevant metric is “consumer.time_lag" (Statsd, Datadog).

If you are processing messages in batches you’ll need to instrument latency yourself. You can do that by publishing latency while messages are being consumed by the consumers. This is also how you track queue latency if can’t use Statsd/Datadog.

consumer_group_name = "your-kafka-queue-consumer-group"

consumer.each_batch do |batch|
  topic = batch.topic

  batch.messages.each do |message|
    create_time = message.create_time
    latency = create_time && ((Time.current - create_time) * 1000).to_i

    if latency
      $statsd_client.gauge("kafka.queue.latency", latency, tags: ["consumer_group:#{consumer_group_name}", "topic:#{topic}"])
    end
  end
end

Note that here queue latency is being tracked on the consumer side. That means that if the consumer crashes, latency will stop being reported. You’ll have to monitor the consumer’s health to ensure you don’t have any blindspots.

The consumer’s health can be tracked using something like Burrow.

Consume rate

Consume rate can be tracked using the built-in Statsd and DataDog integration using the consumer.messages metric. It is available for both individual and batch message processing.

This can also instrumented manually through Active Support Notifications. This is useful if you can’t use Statsd/DataDog.

# when processing messages individually
ActiveSupport::Notifications.subscribe("process_message.consumer.kafka") do |_, _, _, _, payload|
  group_id = payload[:group_id]
  topic = payload[:topic]

  $statsd.increment("kafka.queue.message_count", tags: ["consumer_group:#{group_id}", "topic:#{topic}"])
end

# when processing messages in batches
ActiveSupport::Notifications.subscribe("process_batch.consumer.kafka") do |_, _, _, _, payload|
  message_count = payload[:message_count]
  group_id = payload[:group_id]
  topic = payload[:topic]

  $statsd.count("kafka.queue.message_count", message_count, tags: ["consumer_group:#{group_id}", "topic:#{topic}"])
end

Produce rate

Produce rate can be tracked using the built-in Statsd and DataDog integration using the producer.produce.messages metric.

This can also be instrumented manually through Active Support Notifications.

ActiveSupport::Notifications.subscribe("produce_message.producer.kafka") do |_, _, _, _, payload|
  topic = payload[:topic]

  $statsd.increment("kafka.queue.message_count", tags: ["topic:#{topic}"])
end

Processing latency

Processing latency can be tracked using the built-in StatsD/DataDog integration. The relevant metrics are process_message.latency when processing messages individually (StatsD, DataDog) and process_batch.latency when processing messages in batches (StatsD, DataDog).

This can also be instrumented manually within the consumer’s main loop.

consumer_group_name = "your-kafka-queue-consumer-group"

# when processing messages individually
consumer.each_message do |message|
  topic = message.topic

  $statsd.time("kakfa.processing_latency", tags: ["consumer_group:#{consumer_group_name}", "topic:#{topic}"]) do
    process(message)
  end
end

# when processing messages in batches
consumer.each_batch do |batch|
  topic = batch.topic

  $statsd.time("kakfa.processing_latency", tags: ["consumer_group:#{consumer_group_name}", "topic:#{topic}"]) do
    process(batch)
  end
end

Note that when processing batches of messages, the processing latency may increase due the consumers fetching higher numbers of messages per batch. In these cases, to adequately diagnose a degradation in processing latency you’ll need to also track the batch size. With the batch size you’ll be able to assess the processing latency per message.

In order to track batch size you can use the same metrics as specified in the “Consume Rate” section for batch message processing. For “Consume Rate” you will graph the sum of messages processed over time while for “Batch size” you will be graphing the 95p of the received “message_count” over time.


You can find other posts of my series on queues here. I can send you my posts straight to your e-mail inbox if you sign up for my newsletter.