Kafka High Level Consumer – Frequently Missing Details

Apache Kafka is distributed with two java consumers: The high level consumer is very convenient to work with, providing clean and simple abstractions. The simple consumer gives the developer much more control over partition assignment and access to the data, but requires significantly more effort to use correctly.

There is a very detailed example of how to use the high level consumer in the Kafka documentation. This example is a great starting point and you should read it before reading anything else in this blog post.

However, the example is missing few details regarding the high level consumer API – leading to developers to use the low level consumer, which results in more complex code and more bugs.

So lets look at a slightly different example:  I’m a big fan of micro-batches, a design pattern that allows adjusting the latency and throughput tradeoffs to match requirements. Suppose we want to do micro-batch processing on data from Kafka. This will require reading as many events from Kafka as possible in 200ms, and then send the events to processing. If the send was successful, we can trust our processing pipeline to reliably handle the data, but if we failed to send the data on, we’d like to keep reading the same events from Kafka rather than moving the offset forward.

In terms of the high level consumer, we want to turn off auto commit feature, and rather commit events when we are certain they were successfully sent to the next stage of processing. This also means that we need to limit how long we wait for the next event, or we’ll miss our micro-batch window.

Lets start with the basic high level consumer loop shown in Kafka wiki:

while (it.hasNext())
    System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
What we actually want is to loop while we still have time, keep collecting events in a buffer and send it off to processing:
 while ((System.currentTimeMillis() < batchEndTime) && hasNext()) { 
    events.add(it.next().message()) }

After we collected enough events, we can send the list  to processing. We want to only commit the Kafka offsets if we successfully sent the events, so the first thing we need is to set auto.commit.enable=false in the consumer configuration.

Then lets look at the commit code:

if (events.size() > 0) {
        getProcessor().processEventBatch(events);
        events.clear();
        consumer.commitOffsets();
}

The order in which we do things here is important. First, we send the events to the processor, then we clear the event list and last we commit the offset to Kafka.

If we fail to send events to processing, this will throw an exception. The event list will not get cleared, which means that we can retry sending the same events again and again. Its important to note that the high level consumer tracks the offset independently – this means that if we clear the channel, as long as we don’t close the consumer, we will not be able to read the same events again even if we did not commit the offsets. That is – committing the offset protects against consumer crashes, but not against losing the events while the consumer is still iterating.

We commit the offset to Kafka after clearing the events list, because if Kafka throws an exception while committing offsets – we don’t need to process the events again – they were already successfully processed. This order of actions protects us both against data loss and against some cases of data duplication.

Note that we do not use multi-threaded consumer. Rather we have a separate consumer for each processing thread. We do that because currently the high-level consumer commits offsets of all threads at once. Since each thread processes data independently, committing offsets for all threads can lead to data loss.

The last piece of the puzzle is making sure our loop doesn’t hang when there are no more events to consume. After all, we want to send events to processing every 250ms regardless of how many events we collected at that time. This requires first setting a rather low timeout on the consumer: consumer.timeout.ms = 10 and then converting the timeout exception to a boolean value that will allow us to use hasNext() in a loop:

boolean hasNext() {
    try {
      it.hasNext();
      return true;
    } catch (ConsumerTimeoutException e) {
      return false;
    }
  }

The timeout value can be tuned – lower values will use more CPU but will allow more accurate micro-batches.

To see a complete example of a high level consumer using this pattern, I’ll refer you to the Flume Kafka Source.

One of the best things about the high level consumer is how it takes care of partition rebalancing for you. If you have multiple consumers in the same consumer group reading from a topic, if a consumer crashes or is added, the partitions in the topic will automatically rebalance between the consumers in the group.

Lets take a look at how this works: Here’s an example (created using ConsumerOffsetChecker tool) of one topic (t1) and one consumer group (flume). Each of the three partitions of t1 is being read by a different machine running the flume consumer:

Group           Topic                          Pid Offset   logSize         Lag             Owner
flume           t1                             0   501720   68100210042     50037974        kafkacdh-1
flume           t1                             1   49914701 49914701        0               kafkacdh-2
flume           t1                             2   54218841 82733380        28514539        kafkacdh-3

If kafkacdh-1 crashed, another consumer will pick up the partition:

Group           Topic                          Pid Offset     logSize         Lag            Owner
flume           t1                             0   59669715   100210042       40540327       kafkacdh-2
flume           t1                             1   49914701   49914701        0              kafkacdh-2
flume           t1                             2   65796205   82733380        16937175       kafkacdh-3

Then I can start flume_kafkacdh-4 and see things rebalance again:

Group           Topic                          Pid Offset     logSize         Lag            Owner
flume           t1                             0   60669715   100210042       39540327       kafkacdh-2
flume           t1                             1   49914701   49914701        0              kafkacdh-3
flume           t1                             2   66829740   82733380        15903640       kafkacdh-4

Using the fully power of the Kafka high level consumer can save you a lot of work, low level headache, bugs, code complexity and data loss. I recommend learning what it can do, and using it, rather than the lower level consumer, if at all possible.
Of course, soon Kafka will release another type of consumer – and life will become even easier.

Tweet about this on TwitterShare on FacebookShare on LinkedIn

'Kafka High Level Consumer – Frequently Missing Details' have 24 comments

  1. October 18, 2014 @ 7:42 pm Shlomi

    Great tip! Thank you Gwen.
    Two questions:
    1. In what way life is going to be simpler?
    2. What exactly do you mean by separate consumer? What API type is instantiated per thread?
    This post should really be a kafka wiki doc

    Reply

    • October 20, 2014 @ 4:14 am Gwen Shapira

      1. You mean how would the new consumer make life easier? The new consumer API would give you more flexibility to choose between control and automation. Currently the high level consumer gives you too little control and the simple consumer requires too much work.

      2. The high level consumer can be threaded (i.e. return a map of iterators and let you use an iterator per thread) or you can instantiate a new high level consumer per thread. I’m instantiating new high level consumer per thread, otherwise committing offsets would not work correctly.

      Reply

      • October 21, 2014 @ 10:17 pm Miracle

        Hi Gwen, by saying “instantiate new high level consumer per thread”, I assume you are talking about a scenario like this: you have a topic with 5 partitions, you create 5 threads, each thread instantiate a new high level consumer that reads from a particular partition only. Am I right (or, is this doable)? If so, could you share sample code? Would love to see it! Thanks.

        Reply

        • October 22, 2014 @ 11:43 pm Miracle

          Never mind – I went through the Kafka high level consumer source code, and I came to a conclusion that it is not possible to create high level consumers on per-partition basis like I asked above. To do that I must use SimpleConsumer.

          Reply

          • October 23, 2014 @ 12:00 am Miracle

            …and as suggested in this great blog post, creating multiple consumers in the same consumer group will allow me to get both multi-threaded message consumption and effective offset management, also automated re-balancing, which is very nice. On the same time, the trade-off is I do not have control over from which partition each consumer retrieves data (only the SimpleConsumer API gives this level of control).

          • October 29, 2014 @ 6:24 pm Gwen Shapira

            True. High level API doesn’t let you choose which partition each consumer reads.

  2. October 19, 2014 @ 11:41 am Miracle

    Very helpful! I was about to write some code around SimpleConsumer but this article helped me avoid that.

    Reply

  3. October 28, 2014 @ 9:38 pm Johan

    “If kafkacdh-1 crashed, another broker will pick up the partition” should really be: /…another consumer will…/ right?

    Reply

  4. January 19, 2015 @ 7:22 pm hari

    Thanks for the detailed article Gwen. Very helpful.

    One thing I could not figure out from the docs and APIs is detecting a lost connection in high level consumer.
    Irrespective of whether kafka broker was shutdown or there was no message written to kafka, I see the same ConsumerTimeOut exception after a time determined by “consumer.timeout.ms” option.

    I do not see flume source handling broken connections. Do you happen to know a way to detect lost connection?

    Reply

    • February 5, 2015 @ 1:00 am HARI

      I am quoting the reply from Guozhang and Jun:

      “For high level consumer the fetching logic is handled by a background
      fetcher thread and is hidden from user, for either case of 1) broker down
      or 2) no message is available the fetcher thread will keep retrying while
      the user thread will wait on the fetcher thread to put some data into the
      buffer until timeout. So in a sentence the high-level consumer design is to
      not let users worry about connect / reconnect issues.”

      Reply

  5. March 21, 2015 @ 4:52 am Lan

    First of all, great article!

    I am trying to validate my understanding. If we use high level consumer API as you describe in the article, the message delivery ganrantee is at least once, not exactly once. If the consumer crashes after events are processed and cleared, but before offset is committed, when a new consumer takes over the partition, it will actually process the events in the batch again. In that case, the business logic has to deal with duplicated events scenario. If we want to achieve the “exact-once” guarantee, we have to use the simple consumer API.

    Thanks.

    Reply

    • March 22, 2015 @ 2:09 am Gwen Shapira

      You are right, the high level consumer can implement at-least-once but not exactly-once, because you can’t “commit offsets” inside a transaction.

      Reply

      • April 6, 2015 @ 12:09 pm Shantanu Deshmukh

        Hello, I am very new to kafka. I am working on high level consumer only. I disabled auto commit property and I am trying to do consumer.commitOffsets() after I process 100 messages. There is only one thread running. But my process hangs up when commitOffsets() function is executed. I have verified using offset checker utility that all partitions are on lag. Which means there are messages still pending in queue. When I run consumer again, it processes 100 messages again and then hangs up again. So what is happening here? Please help!!

        Reply

        • April 7, 2015 @ 1:56 am Gwen Shapira

          commitOffsets() may need to call Zookeeper. You should validate that your consumer can connect to Zookeeper properly.
          Depending on your consumer and broker version, you may also want to try committing offsets to Kafka itself. Search the Kafka documentation for offsets.storage to see how to do that.

          Reply

  6. April 21, 2015 @ 4:53 pm Pete Prokopowicz

    while ((System.currentTimeMillis() < batchEndTime) && hasNext()) {
    should read
    while ((System.currentTimeMillis() < batchEndTime) && it.hasNext()) {

    Reply

    • April 21, 2015 @ 4:54 pm pete

      nevermind – my mistake

      Reply

  7. May 20, 2015 @ 1:58 pm Marina Popova

    Hi, Gwen,
    Great post, thank you – and a much needed clarification of offset management.
    I am still somewhat confused by the “topic-> partition-> processing thread -> Consumer” mapping and how commitOffsets() works for some configurations.
    In your post you note: “Note that we do not use multi-threaded consumer. Rather we have a separate consumer for each processing thread. We do that because currently the high-level consumer commits offsets of all threads at once. Since each thread processes data independently, committing offsets for all threads can lead to data loss.”

    How, what exactly do you mean by the “processing thread” ? Is it equivalent to a partition?

    If we take a look at the example code of the High-level consumer here:
    https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example

    Is this the correct implementation of what you call ” Rather we have a separate consumer for each processing thread “?

    The number of threads is an input parameter here – what is this number? Lets say, we have 3 partitions as in your example (Pid’s 0, 1, 2) – should pass a_numThreads = 3 into the run() method below: ?

       public void run(int a_numThreads) {
            Map topicCountMap = new HashMap();
            topicCountMap.put(topic, new Integer(a_numThreads));
            Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap);
            List<KafkaStream> streams = consumerMap.get(topic);
     
            // now launch all the threads
            //
            executor = Executors.newFixedThreadPool(a_numThreads);
     
            // now create an object to consume the messages
            //
            int threadNumber = 0;
            for (final KafkaStream stream : streams) {
                executor.submit(new ConsumerTest(stream, threadNumber));
                threadNumber++;
            }
        }

    What I’m really confused about is how the list of KAfkaStreams returned from this method: consumer.createMessageStreams(topicCountMap);
    Is mapped to partitions? It seems to return you as many streams as the a_numThreads number you specify…. So, if we have 3 partitions, but I set a_numThreads = 5 – I will get 5 KafkaStreams – and how are they mapped to each partition?

    Lets consider what happens in your code in this case then:

    We will have 5 ConsumerTest threads – which is 5 “processing threads” in your terminology, right?
    So, each thread will be processing data from some partition (right ?).
    Now, one of the ConsumerTest threads [lets say ‘kafkacdh-1′ in your example] decides to commit its offset and executes your code:

    events.clear();
    consumer.commitOffsets(); <– 'consumer' is the 'ConsumerTest' in this case, right?

    Which partition/KafkaStream / ConsumerTest this offset will be committed to? How can you tell?
    And quoting your note again: "currently the high-level consumer commits offsets of all threads at once" – which threads are we talking about here – the ConsumerTest threads? And if so – does not it mean that all other consumers' offsets will be committed [incorrectly] as well? Does it mean these offsets are committed also for all partitions?? So in your example, if it was the 'kafkacdh-1' consumer that issued the commitOffsets() command, as a result, the other two consumers 'kafkacdh-2' and 'kafkacdh-3' , would also have their Offset = 501720 ?

    Would be great to see one full example of creating multiple consumers for one topic with multiple partitions and managing their offsets/commits/batch reads…

    Thanks again!
    Marina

    Reply

    • July 2, 2015 @ 12:02 pm Peter

      Great post Gwen and great comment Marina!

      I am very much new to Kafka and I have the same doubts commented by Marina. Could someone explain or show me a full example of creating multiple consumers for one topic with multiple partitions and managing their offsets/commits/batch reads?

      Thanks in advance.

      Reply

    • July 13, 2015 @ 12:34 pm Stefan Miklosovic

      Read my comment below.

      Reply

  8. July 13, 2015 @ 12:10 pm Stefan Miklosovic

    From my experience this blog post is not quite true. Well, all information is indeed correct, but from my experience the results are little bit different and I have a feeling that this blog post does not mention it.

    Let’s say I want to have 1 topic with 10 partitions and I want to have one consumer per partition. As it was correctly shown in this blog post, you just call Consumer.createJavaConsumer() 10 times with topic map like topicMap.put(“topic”,1). (as it is shown in linked source code) In this case you get list of streams and you just get the first one and you iterate over it.

    The fact that you used “1” in that topic map (1) just means that there will be 1 consumer thread for that topic. But it does not mean that such thread will operate on ONE partition. That is not true. Instead, such consumer starts to iterate on the first stream (2) which represents that 1 thread.

    So it means that it will indeed fetch messages from all three partitions.

    Now you call this Consumer.createJavaConsumer() 3 times, what you get are 3 consumers, every consumer backed by 1 consumer thread. If this consumer now wants streams, automatic rebalancing of consumer-partition combination will occur in the backgroud so when the second consumer connects, automatic partition rebalancing feature of Kafka will take away some partition from the first consumer and it will assign that partition to the just connected one. Then third consumer is about to connect, so automatic rebalancing will take one partition from the first consumer and assigns it to the third one.

    So it is something like this for consumers C1-3 and partitions P1-3

    1. C1-(P1,P2,P3)
    2. C1-(P1,P2),C2-(P3)
    3. C1-(P1),C2-(P3),C3-(P2)

    You see … just you instantiate 3 consumers with topicMap(“topic”,1) for each does not mean that there will ever be just one partition assigned to consumer. But it will automatically rebalance to it.

    Now what is interesting is that this rebalancing does not behaves correctly in all cases. In case you create Consumers.createJavaConsumer() 10 times on a topic with 10 partitions does not ensure that you will have 10 consumers each covering 1 partition eventually.

    From my experience it is more like 3,2,1,1,1,1,1 partition-wise among 7 consumers and additional 3 consumer are idle.

    (1) https://github.com/apache/flume/blob/trunk/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java#L212
    (2) https://github.com/apache/flume/blob/trunk/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java#L222

    Reply

    • July 13, 2015 @ 12:24 pm Stefan Miklosovic

      To add to my previous comment:

      It seems that automatic rebalancing does not occur fully accross all consumers so you can end up with consumers, even they are each backed by one thread, which will read messages from more then 1 partition. Hence if you want to commit with some particular consumer instance, it will be indeed committed only in the context of one thread, but, to my understanding, it will be committed for all partitions it has actually assigned so the idea that that this technique will ensure that you can commit offsets per partition is not everytime true.

      Yes, in your case you were lucky, you got partition for every consumer, so commit on such consumer will affect only (accidentally) one partition. But in my case, in that 3,2,1,1,1,1,1 scenario, some consumer will commit offsets to all three partitions, the second one for 2 and all remaining 5 will commit to 1 partition.

      While this does not have to inherently wrong, I was firstly expecting that if I do this, then I am sure that I get commit per partition but this is simply not always the case.

      I tried this on 0.8.1.1, in 0.8.3 there is going to be new consumer which makes these things more interesting. In old consumer, you can not say from which partitions you want consumer to fetch data, in new one, you can. So if you instantiate a consumer 10 times with some concrete partition number in mind, you can be sure that committing occurs only to that partition. However this makes automatic partition rebalancing useless because when consumer fails, other consumers do no take over the partitions which just died consumer was consuming.

      Reply

      • September 3, 2015 @ 7:58 pm Vasudha

        Hi Stefan,
        I am also looking for consumer failover and rebalancing mechanism as specified in docs. But actual implementation is still missing… Lots of missing links…
        Is there any concrete example for the explained scenarion?

        Reply

  9. September 24, 2015 @ 2:03 pm Criss

    Great posts, thanks, but can you help me to understand better what does happen in the following scenario:
    Let’s say that we use this implementation: start one consumer, read a batch of 10 messages(from 0 to 10), process them and commit the offset (offsetValue = 10).
    Just before starting reading the next 10 messages, the application crashes. I restart the application and the consumer is re-initialized with the same id an group and it starts to read another 10 messages. What offset it’ll use? 0 or 10. Because from my understanding and my tests with the high level consumer, a restart will cause to start reading again from offset=0. We have offset control only with low level consumer, this is what I understand from documentation.
    Thank you

    Reply


Would you like to share your thoughts?

Your email address will not be published.