Kafka is well optimized for small messages. Around 10KB seem to be the sweet spot for getting the best throughput (as seen in LinkedIn’s famous benchmark). But sometimes, we need to send larger messages – XML documents or even JSON can get a bit bulky and we end up with messages in the 10M-100M range. What do we do in this case?
Here are few suggestions:
- The best way to send large messages is not to send them at all. If shared storage is available (NAS, HDFS, S3), placing large files on the shared storage and using Kafka just to send a message about the file’s location is a much better use of Kafka.
- The second best way to send large messages is to slice and dice them. Use the producing client to split the message into small 10K portions, use partition key to make sure all the portions will be sent to the same Kafka partition (so the order will be preserved) and have the consuming client sew them back up into a large message.
- Kafka producer can be used to compress messages. If the original message is XML, there’s a good chance that the compressed message will not be very large at all. Use compression.codec and compressed.topics configuration parameters in the producer to enable compression. GZip and Snappy are both supported.
But what if you really need to use Kafka with large messages? Well, in this case, there are few configuration parameters you need to set:
- message.max.bytes (default:1000000) – Maximum size of a message the broker will accept. This has to be smaller than the consumer fetch.message.max.bytes, or the broker will have messages that can’t be consumed, causing consumers to hang.
- log.segment.bytes (default: 1GB) – size of a Kafka data file. Make sure its larger than 1 message. Default should be fine (i.e. large messages probably shouldn’t exceed 1GB in any case. Its a messaging system, not a file system)
- replica.fetch.max.bytes (default: 1MB) – Maximum size of data that a broker can replicate. This has to be larger than message.max.bytes, or
a broker will accept messages and fail to replicate them. Leading to potential data loss.
- fetch.message.max.bytes (default 1MB) – Maximum size of message a consumer can read. This should be the size of message.max.bytes or larger.
If you choose to configure Kafka for large messages, there are few things you need to take into consideration. Large messages are not an afterthought, they have impact on overall design of the cluster and the topics:
- Performance: As we’ve mentioned, benchmarks indicate that Kafka reaches maximum throughput with message size of around 10K. Larger messages will show decreased throughput. Its important to remember this when doing capacity planning for a cluster.
- Available memory and number of partitions: Brokers will need to allocate a buffer the size of replica.fetch.max.bytes for each partition they replicate. So if replica.fetch.max.bytes = 1MB and you have 1000 partitions, that will take around 1GB of RAM. Do the math and make sure the number of partitions * the size of the largest message does not exceed available memory, or you’ll see OOM errors. Same for consumers and fetch.message.max.bytes – make sure there’s enough memory for the largest message for each partition the consumer replicates. This may mean that you end up with fewer partitions if you have large messages, or you may need servers with more RAM.
- Garbage collection - I did not personally see this issue, but I think its a reasonable concern. Large messages may cause longer garbage collection pauses (as brokers need to allocate large chunks), keep an eye on the GC log and on the server log. If long GC pauses cause Kafka to lose the zookeeper session, you may need to configure longer timeout values for zookeeper.session.timeout.ms.
Weight the pros, cons and headaches – and decide which solution works best for you.
Ran into additional issues with large messages? Know about parameters that I missed? Let us know in the comments.