Apache Kafka is distributed with two java consumers: The high level consumer is very convenient to work with, providing clean and simple abstractions. The simple consumer gives the developer much more control over partition assignment and access to the data, but requires significantly more effort to use correctly.
There is a very detailed example of how to use the high level consumer in the Kafka documentation. This example is a great starting point and you should read it before reading anything else in this blog post.
However, the example is missing few details regarding the high level consumer API – leading to developers to use the low level consumer, which results in more complex code and more bugs.
So lets look at a slightly different example: I’m a big fan of micro-batches, a design pattern that allows adjusting the latency and throughput tradeoffs to match requirements. Suppose we want to do micro-batch processing on data from Kafka. This will require reading as many events from Kafka as possible in 200ms, and then send the events to processing. If the send was successful, we can trust our processing pipeline to reliably handle the data, but if we failed to send the data on, we’d like to keep reading the same events from Kafka rather than moving the offset forward.
In terms of the high level consumer, we want to turn off auto commit feature, and rather commit events when we are certain they were successfully sent to the next stage of processing. This also means that we need to limit how long we wait for the next event, or we’ll miss our micro-batch window.
Lets start with the basic high level consumer loop shown in Kafka wiki:
while (it.hasNext()) System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
while ((System.currentTimeMillis() < batchEndTime) && hasNext()) { events.add(it.next().message()) }
After we collected enough events, we can send the list to processing. We want to only commit the Kafka offsets if we successfully sent the events, so the first thing we need is to set auto.commit.enable=false
in the consumer configuration.
Then lets look at the commit code:
if (events.size() > 0) { getProcessor().processEventBatch(events); events.clear(); consumer.commitOffsets(); }
The order in which we do things here is important. First, we send the events to the processor, then we clear the event list and last we commit the offset to Kafka.
If we fail to send events to processing, this will throw an exception. The event list will not get cleared, which means that we can retry sending the same events again and again. Its important to note that the high level consumer tracks the offset independently – this means that if we clear the channel, as long as we don’t close the consumer, we will not be able to read the same events again even if we did not commit the offsets. That is – committing the offset protects against consumer crashes, but not against losing the events while the consumer is still iterating.
We commit the offset to Kafka after clearing the events list, because if Kafka throws an exception while committing offsets – we don’t need to process the events again – they were already successfully processed. This order of actions protects us both against data loss and against some cases of data duplication.
Note that we do not use multi-threaded consumer. Rather we have a separate consumer for each processing thread. We do that because currently the high-level consumer commits offsets of all threads at once. Since each thread processes data independently, committing offsets for all threads can lead to data loss.
The last piece of the puzzle is making sure our loop doesn’t hang when there are no more events to consume. After all, we want to send events to processing every 250ms regardless of how many events we collected at that time. This requires first setting a rather low timeout on the consumer: consumer.timeout.ms = 10
and then converting the timeout exception to a boolean value that will allow us to use hasNext() in a loop:
boolean hasNext() { try { it.hasNext(); return true; } catch (ConsumerTimeoutException e) { return false; } }
The timeout value can be tuned – lower values will use more CPU but will allow more accurate micro-batches.
To see a complete example of a high level consumer using this pattern, I’ll refer you to the Flume Kafka Source.
One of the best things about the high level consumer is how it takes care of partition rebalancing for you. If you have multiple consumers in the same consumer group reading from a topic, if a consumer crashes or is added, the partitions in the topic will automatically rebalance between the consumers in the group.
Lets take a look at how this works: Here’s an example (created using ConsumerOffsetChecker tool) of one topic (t1) and one consumer group (flume). Each of the three partitions of t1 is being read by a different machine running the flume consumer:
Group Topic Pid Offset logSize Lag Owner flume t1 0 501720 68100210042 50037974 kafkacdh-1 flume t1 1 49914701 49914701 0 kafkacdh-2 flume t1 2 54218841 82733380 28514539 kafkacdh-3
If kafkacdh-1 crashed, another consumer will pick up the partition:
Group Topic Pid Offset logSize Lag Owner flume t1 0 59669715 100210042 40540327 kafkacdh-2 flume t1 1 49914701 49914701 0 kafkacdh-2 flume t1 2 65796205 82733380 16937175 kafkacdh-3
Then I can start flume_kafkacdh-4 and see things rebalance again:
Group Topic Pid Offset logSize Lag Owner flume t1 0 60669715 100210042 39540327 kafkacdh-2 flume t1 1 49914701 49914701 0 kafkacdh-3 flume t1 2 66829740 82733380 15903640 kafkacdh-4
Using the fully power of the Kafka high level consumer can save you a lot of work, low level headache, bugs, code complexity and data loss. I recommend learning what it can do, and using it, rather than the lower level consumer, if at all possible.
Of course, soon Kafka will release another type of consumer – and life will become even easier.
'Kafka High Level Consumer – Frequently Missing Details' have 24 comments
October 18, 2014 @ 7:42 pm Shlomi
Great tip! Thank you Gwen.
Two questions:
1. In what way life is going to be simpler?
2. What exactly do you mean by separate consumer? What API type is instantiated per thread?
This post should really be a kafka wiki doc
October 20, 2014 @ 4:14 am Gwen Shapira
1. You mean how would the new consumer make life easier? The new consumer API would give you more flexibility to choose between control and automation. Currently the high level consumer gives you too little control and the simple consumer requires too much work.
2. The high level consumer can be threaded (i.e. return a map of iterators and let you use an iterator per thread) or you can instantiate a new high level consumer per thread. I’m instantiating new high level consumer per thread, otherwise committing offsets would not work correctly.
October 21, 2014 @ 10:17 pm Miracle
Hi Gwen, by saying “instantiate new high level consumer per thread”, I assume you are talking about a scenario like this: you have a topic with 5 partitions, you create 5 threads, each thread instantiate a new high level consumer that reads from a particular partition only. Am I right (or, is this doable)? If so, could you share sample code? Would love to see it! Thanks.
October 22, 2014 @ 11:43 pm Miracle
Never mind – I went through the Kafka high level consumer source code, and I came to a conclusion that it is not possible to create high level consumers on per-partition basis like I asked above. To do that I must use SimpleConsumer.
October 23, 2014 @ 12:00 am Miracle
…and as suggested in this great blog post, creating multiple consumers in the same consumer group will allow me to get both multi-threaded message consumption and effective offset management, also automated re-balancing, which is very nice. On the same time, the trade-off is I do not have control over from which partition each consumer retrieves data (only the SimpleConsumer API gives this level of control).
October 29, 2014 @ 6:24 pm Gwen Shapira
True. High level API doesn’t let you choose which partition each consumer reads.
October 19, 2014 @ 11:41 am Miracle
Very helpful! I was about to write some code around SimpleConsumer but this article helped me avoid that.
October 28, 2014 @ 9:38 pm Johan
“If kafkacdh-1 crashed, another broker will pick up the partition” should really be: /…another consumer will…/ right?
October 29, 2014 @ 5:47 pm Gwen Shapira
Yep. Good catch
January 19, 2015 @ 7:22 pm hari
Thanks for the detailed article Gwen. Very helpful.
One thing I could not figure out from the docs and APIs is detecting a lost connection in high level consumer.
Irrespective of whether kafka broker was shutdown or there was no message written to kafka, I see the same ConsumerTimeOut exception after a time determined by “consumer.timeout.ms” option.
I do not see flume source handling broken connections. Do you happen to know a way to detect lost connection?
February 5, 2015 @ 1:00 am HARI
I am quoting the reply from Guozhang and Jun:
“For high level consumer the fetching logic is handled by a background
fetcher thread and is hidden from user, for either case of 1) broker down
or 2) no message is available the fetcher thread will keep retrying while
the user thread will wait on the fetcher thread to put some data into the
buffer until timeout. So in a sentence the high-level consumer design is to
not let users worry about connect / reconnect issues.”
March 21, 2015 @ 4:52 am Lan
First of all, great article!
I am trying to validate my understanding. If we use high level consumer API as you describe in the article, the message delivery ganrantee is at least once, not exactly once. If the consumer crashes after events are processed and cleared, but before offset is committed, when a new consumer takes over the partition, it will actually process the events in the batch again. In that case, the business logic has to deal with duplicated events scenario. If we want to achieve the “exact-once” guarantee, we have to use the simple consumer API.
Thanks.
March 22, 2015 @ 2:09 am Gwen Shapira
You are right, the high level consumer can implement at-least-once but not exactly-once, because you can’t “commit offsets” inside a transaction.
April 6, 2015 @ 12:09 pm Shantanu Deshmukh
Hello, I am very new to kafka. I am working on high level consumer only. I disabled auto commit property and I am trying to do consumer.commitOffsets() after I process 100 messages. There is only one thread running. But my process hangs up when commitOffsets() function is executed. I have verified using offset checker utility that all partitions are on lag. Which means there are messages still pending in queue. When I run consumer again, it processes 100 messages again and then hangs up again. So what is happening here? Please help!!
April 7, 2015 @ 1:56 am Gwen Shapira
commitOffsets() may need to call Zookeeper. You should validate that your consumer can connect to Zookeeper properly.
Depending on your consumer and broker version, you may also want to try committing offsets to Kafka itself. Search the Kafka documentation for offsets.storage to see how to do that.
April 21, 2015 @ 4:53 pm Pete Prokopowicz
while ((System.currentTimeMillis() < batchEndTime) && hasNext()) {
should read
while ((System.currentTimeMillis() < batchEndTime) && it.hasNext()) {
April 21, 2015 @ 4:54 pm pete
nevermind – my mistake
May 20, 2015 @ 1:58 pm Marina Popova
Hi, Gwen,
Great post, thank you – and a much needed clarification of offset management.
I am still somewhat confused by the “topic-> partition-> processing thread -> Consumer” mapping and how commitOffsets() works for some configurations.
In your post you note: “Note that we do not use multi-threaded consumer. Rather we have a separate consumer for each processing thread. We do that because currently the high-level consumer commits offsets of all threads at once. Since each thread processes data independently, committing offsets for all threads can lead to data loss.”
How, what exactly do you mean by the “processing thread” ? Is it equivalent to a partition?
If we take a look at the example code of the High-level consumer here:
https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example
Is this the correct implementation of what you call ” Rather we have a separate consumer for each processing thread “?
The number of threads is an input parameter here – what is this number? Lets say, we have 3 partitions as in your example (Pid’s 0, 1, 2) – should pass a_numThreads = 3 into the run() method below: ?
public void run(int a_numThreads) {
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(a_numThreads));
Map<String, List<KafkaStream>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream> streams = consumerMap.get(topic);
// now launch all the threads
//
executor = Executors.newFixedThreadPool(a_numThreads);
// now create an object to consume the messages
//
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
What I’m really confused about is how the list of KAfkaStreams returned from this method: consumer.createMessageStreams(topicCountMap);
Is mapped to partitions? It seems to return you as many streams as the a_numThreads number you specify…. So, if we have 3 partitions, but I set a_numThreads = 5 – I will get 5 KafkaStreams – and how are they mapped to each partition?
Lets consider what happens in your code in this case then:
We will have 5 ConsumerTest threads – which is 5 “processing threads” in your terminology, right?
So, each thread will be processing data from some partition (right ?).
Now, one of the ConsumerTest threads [lets say ‘kafkacdh-1′ in your example] decides to commit its offset and executes your code:
events.clear();
consumer.commitOffsets(); <– 'consumer' is the 'ConsumerTest' in this case, right?
Which partition/KafkaStream / ConsumerTest this offset will be committed to? How can you tell?
And quoting your note again: "currently the high-level consumer commits offsets of all threads at once" – which threads are we talking about here – the ConsumerTest threads? And if so – does not it mean that all other consumers' offsets will be committed [incorrectly] as well? Does it mean these offsets are committed also for all partitions?? So in your example, if it was the 'kafkacdh-1' consumer that issued the commitOffsets() command, as a result, the other two consumers 'kafkacdh-2' and 'kafkacdh-3' , would also have their Offset = 501720 ?
Would be great to see one full example of creating multiple consumers for one topic with multiple partitions and managing their offsets/commits/batch reads…
Thanks again!
Marina
July 2, 2015 @ 12:02 pm Peter
Great post Gwen and great comment Marina!
I am very much new to Kafka and I have the same doubts commented by Marina. Could someone explain or show me a full example of creating multiple consumers for one topic with multiple partitions and managing their offsets/commits/batch reads?
Thanks in advance.
July 13, 2015 @ 12:34 pm Stefan Miklosovic
Read my comment below.
July 13, 2015 @ 12:10 pm Stefan Miklosovic
From my experience this blog post is not quite true. Well, all information is indeed correct, but from my experience the results are little bit different and I have a feeling that this blog post does not mention it.
Let’s say I want to have 1 topic with 10 partitions and I want to have one consumer per partition. As it was correctly shown in this blog post, you just call Consumer.createJavaConsumer() 10 times with topic map like topicMap.put(“topic”,1). (as it is shown in linked source code) In this case you get list of streams and you just get the first one and you iterate over it.
The fact that you used “1” in that topic map (1) just means that there will be 1 consumer thread for that topic. But it does not mean that such thread will operate on ONE partition. That is not true. Instead, such consumer starts to iterate on the first stream (2) which represents that 1 thread.
So it means that it will indeed fetch messages from all three partitions.
Now you call this Consumer.createJavaConsumer() 3 times, what you get are 3 consumers, every consumer backed by 1 consumer thread. If this consumer now wants streams, automatic rebalancing of consumer-partition combination will occur in the backgroud so when the second consumer connects, automatic partition rebalancing feature of Kafka will take away some partition from the first consumer and it will assign that partition to the just connected one. Then third consumer is about to connect, so automatic rebalancing will take one partition from the first consumer and assigns it to the third one.
So it is something like this for consumers C1-3 and partitions P1-3
1. C1-(P1,P2,P3)
2. C1-(P1,P2),C2-(P3)
3. C1-(P1),C2-(P3),C3-(P2)
You see … just you instantiate 3 consumers with topicMap(“topic”,1) for each does not mean that there will ever be just one partition assigned to consumer. But it will automatically rebalance to it.
Now what is interesting is that this rebalancing does not behaves correctly in all cases. In case you create Consumers.createJavaConsumer() 10 times on a topic with 10 partitions does not ensure that you will have 10 consumers each covering 1 partition eventually.
From my experience it is more like 3,2,1,1,1,1,1 partition-wise among 7 consumers and additional 3 consumer are idle.
(1) https://github.com/apache/flume/blob/trunk/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java#L212
(2) https://github.com/apache/flume/blob/trunk/flume-ng-sources/flume-kafka-source/src/main/java/org/apache/flume/source/kafka/KafkaSource.java#L222
July 13, 2015 @ 12:24 pm Stefan Miklosovic
To add to my previous comment:
It seems that automatic rebalancing does not occur fully accross all consumers so you can end up with consumers, even they are each backed by one thread, which will read messages from more then 1 partition. Hence if you want to commit with some particular consumer instance, it will be indeed committed only in the context of one thread, but, to my understanding, it will be committed for all partitions it has actually assigned so the idea that that this technique will ensure that you can commit offsets per partition is not everytime true.
Yes, in your case you were lucky, you got partition for every consumer, so commit on such consumer will affect only (accidentally) one partition. But in my case, in that 3,2,1,1,1,1,1 scenario, some consumer will commit offsets to all three partitions, the second one for 2 and all remaining 5 will commit to 1 partition.
While this does not have to inherently wrong, I was firstly expecting that if I do this, then I am sure that I get commit per partition but this is simply not always the case.
I tried this on 0.8.1.1, in 0.8.3 there is going to be new consumer which makes these things more interesting. In old consumer, you can not say from which partitions you want consumer to fetch data, in new one, you can. So if you instantiate a consumer 10 times with some concrete partition number in mind, you can be sure that committing occurs only to that partition. However this makes automatic partition rebalancing useless because when consumer fails, other consumers do no take over the partitions which just died consumer was consuming.
September 3, 2015 @ 7:58 pm Vasudha
Hi Stefan,
I am also looking for consumer failover and rebalancing mechanism as specified in docs. But actual implementation is still missing… Lots of missing links…
Is there any concrete example for the explained scenarion?
September 24, 2015 @ 2:03 pm Criss
Great posts, thanks, but can you help me to understand better what does happen in the following scenario:
Let’s say that we use this implementation: start one consumer, read a batch of 10 messages(from 0 to 10), process them and commit the offset (offsetValue = 10).
Just before starting reading the next 10 messages, the application crashes. I restart the application and the consumer is re-initialized with the same id an group and it starts to read another 10 messages. What offset it’ll use? 0 or 10. Because from my understanding and my tests with the high level consumer, a restart will cause to start reading again from offset=0. We have offset control only with low level consumer, this is what I understand from documentation.
Thank you