Kafka Simple Consumer Failure Recovery

June 21st, 2016

A modern data platform requires a robust Complex Event Processing (CEP) system, a cornerstone of which is a distributed messaging system. Today, many people use Kafka to fill this latter role. Kafka can be complicated, particularly when it comes to understanding how to recover from a failure during consumption of a message. There are numerous options for such recovery; in this post we’ll just be looking at one of them.

We’ll walk through a simple failure recovery mechanism, as well as a test harness that allows you to make sure this mechanism works as expected. First, we’ll create a test Kafka producer and consumer with failure recovery logic in Java. Then, we’ll discuss a bash script that starts up a local Kafka cluster using Docker Compose, sends a set of test messages through the producer, and finally kills the consumer and resurrects it again in order to simulate a recovery.

The script will also provide a way to see if the recovery logic worked correctly. With a few small changes, you can use this script as a template for your Kafka consumers in order to see if the consumer recovery will perform as expected in production.

Local Kafka Cluster

The first part of this project involves finding a way to create a realistic Kafka cluster locally. I did this by forking a Kafka Compose project that contains all the Confluent components, and upgrading it to Kafka 0.9.0.0 (because of the fast speed of Kafka platform development, I haven’t had a chance to upgrade further to 0.10.0.0). You can find the project here.

Note: if you’re using OSX, spinning up the cluster requires a docker-machine install. I have added a small script that simplifies the installation of this cluster inside docker-machine.

All the below code is available in this repository.

After cloning, run the following script to bring up your development environment:
./dev_start.sh

Producer

The next step is to create your Kafka producer, which will be sending your sample events. Here is the relevant code:

     for (i = 0; i < iterations; i++) {
         producer.send(new ProducerRecord<String, String>(topic, i
                  .toString(), "Message_" + i));
         messagesSent++;
     }

     producer.flush();

As you can see, there is not much to this. We are simply creating a set of events that are easy to track in the output using their equivalent message number. Numbering the messages also allows us to see whether they were consumed in the correct order.

Consumer

Now for the consumer portion. We are going to be using the new consumer style, which comes with Kafka ~>0.9.0.0. Whereas with Kafka ~0.8.2.1 we had to consume every partition of a topic in a separate thread created manually, this threading is now abstracted out to the consumer library. This makes the Kafka consumer logic a lot easier to implement and to understand. This consumer will implement an “at-least-once” recovery pattern. While we risk getting duplicate events after recovery, we are sure to ingest every single event that is produced.

To make sure that we’re doing “at-least-once” ingestion, we have to manually control the way we commit the latest consumed offset to ZooKeeper. The key here is that we have to commit the latest offset only after we’re sure that we have persisted a set of pulled messages. This is done by setting the ‘enable.auto.commit‘ Kafka consumer setting to false, and adding logic that manually commits our offset using the ‘commitSync()‘ function. The logic that we use for the consumption is the following:

     final int WRITE_BATCH_SIZE = 1000;
     Properties kafkaConsumerProps = new Properties();
     kafkaConsumerProps.load(new java.net.URL(args[0]).openStream());
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(
             kafkaConsumerProps);
     consumer.subscribe(Arrays.asList(topic));
     Path outFile = Paths.get(args[2] + "/" + topic + ".out");
     ArrayList outputBuffer = new ArrayList<>();

Here, we are setting the batch size of the messages written out to file at a time. This also shows the new style of Simple Kafka Consumer logic. No more need to listen to every partition with a separate thread.

Now, let’s start listening to the topic. We will poll 1,000 records at a time, filling out our buffer, and writing the records out to a file using a synchronous helper method, allowing for the write to finish before committing the offset.

   try {
       while (true) {
           ConsumerRecords<String, String> records = consumer.poll(1000);
           if (records.count() > 0) {
               for (ConsumerRecord<String, String> record : records) {
                   //Write out some details about the message so we can 
               understand how the consumer is behaving
                   outputBuffer.add("Partition: " + record.partition() + 
",Offset: " + record.offset() + ",Key:" + record.key() + ",Value:"+ record.value());
                  }
                  if (outputBuffer.size() >= WRITE_BATCH_SIZE) {
                      writeBufferToFile(outFile, outputBuffer);
                      //Now that the records in the buffer are written, let’s commit 
the latest offset:
                      consumer.commitSync();
                      outputBuffer.clear();
                  }
              } else {
                  if (outputBuffer.size() > 0) {
                      //Make sure we clear the non-0 buffer
                      writeBufferToFile(outFile, outputBuffer);
                      outputBuffer.clear();
                  }
              }
          }
      } finally {
          if (outputBuffer.size() > 0) {
             //Finally, clear out non-zero buffer
             writeBufferToFile(outFile, outputBuffer);
          }
          consumer.close();
      }

  }

As you can see, instead of letting Kafka do the committing, we’re manually syncing the offset with ZooKeeper once we’re sure that the latest buffer got written to the output. In the ‘finally‘ clause we make sure that we clear any buffer that may be left over.

Test Harness

With the consumer and producer in place, let’s write the script which actually runs our test.

First, set some variables. Here, we’re passing the name of the test, the number of iterations to run though, and a flag that tells us whether to rebuild our consumer and producer.

RUN=$1
ITERATIONS=$2
REBUILD=$3

TOPIC=test_failure_$RUN
PARTITIONS=1
REPLICATION_FACTOR=2

Then we create the necessary topic:

kafka-compose/confluent-1.0.1/bin/kafka-topics --create --zookeeper $DOCKER_IP:2181 --topic \ $TOPIC --partitions $PARTITIONS --replication-factor $REPLICATION_FACTOR

Next, we create the output folder for the current run, launch our consumer, and then producer:

CONSUMER_PROPS=file://$(pwd)/kafka-consumer-harness/src/main/resources/consumer.properties
PRODUCER_PROPS=file://$(pwd)/kafka-producer-harness/src/main/resources/producer.properties
if [ -d output_$RUN ]; then
 seconds=`date +%s`
 mv output_$RUN archive/output_${RUN}_${seconds}
fi

mkdir output_$RUN

java -jar kafka-consumer-harness/target/kafka-consumer-harness-$HARNESS_VERSION.jar \
  $CONSUMER_PROPS $TOPIC $__dir/output_$RUN >> consumer.out 2>&1 &
consumer_pid=$!

java -jar kafka-producer-harness/target/kafka-producer-harness-$HARNESS_VERSION.jar \
  $PRODUCER_PROPS $TOPIC $ITERATIONS >> producer.out 2>&1 &
producer_pid=$!

We use the cleanup function to remove the running producer and consumer after the script finishes:

cleanup() {
  kill -1 $consumer_pid
  kill -1 $producer_pid
  exit 1
}

trap cleanup EXIT
trap cleanup INT

After 15 seconds, let’s kill our consumer in order to simulate a failure:

sleep 15
#kill consumer
kill -9 $consumer_pid

And then start it back up again:

java -jar kafka-consumer-harness/target/kafka-consumer-harness-$HARNESS_VERSION.jar \
  $CONSUMER_PROPS $TOPIC $__dir/output_$RUN >> consumer.out 2>&1 &
consumer_pid=$!

This loop allows us to watch the amount of messages being output to make sure that our recovery worked correctly:

while true
do
  echo messages processed: $(wc -l output_${RUN}/${TOPIC}.out)
  sleep 10
done

If the recovery worked correctly, we should see at least as many messages in the output as the number of iterations we passed in as an initial parameter to the script. The script can only work as intended if we pass enough messages in the input. Typically, passing 30,000 or more messages has worked pretty well for me, but that number would fluctuate based on the type of Kafka cluster you’re spinning up, or the type of message you’re passing.

Conclusion

You should now have a basic idea of how to write a harness that tests a Kafka consumer failure recovery scenario. Admittedly, the scenario here is a bit contrived; the above can be used as a template, expanded to whatever needs you may have. Some examples include scaling your consumer and producer, using different size Kafka cluster, passing messages of different sizes and formats, as well as using different persistence mechanisms.

Consumer failure is a very real possibility in a production system. We hope that you can use the mechanism above to protect you from such failure, and understand how your system will respond during this scenario. There are other mechanisms of failure recovery, such as ‘at-most-once’ and ‘exactly once.’ We hope to cover these in future blog posts. Please share your experiences, or what you’d like to hear more about, in the comments.