Flume or Kafka? Try both!

In the last few month, I’ve spent a lot of time answering one question: Flume or Kafka?

Its a pretty tough choice.
Flume has been around for a while, there are many users and happy production systems, its well supported by major Hadoop vendors, there are well known best practices on how to configure and tune it and its integration with both Hadoop and common data sources is unparalleled. If you are not a developer, its easy to configure a good data pipeline with Flume without writing a single line of code.

Kafka is slightly newer, but it offers impressive latency and throughput, scalability and high availability. Its a very well designed system. In addition, Kafka makes it easy pull data out of it into variety of systems – batch, streaming, applications, dashboards and even the command line.

In many cases the decision comes down to specific priorities:
Do you prefer possibility of integration with many data targets? Or an easy way to send data to Hadoop?
Do you prefer the flexibility of writing your own data producers or consumers? Or do you prefer a configuration-only solution without any programming involved?
Is it important for you to control every aspect of performance, availability and scalability? Or do you want a system that is mostly well-tuned out of the box, but may not squeeze every bit of possible performance?

After few of those discussions, we realized that this is silly. Why do we need to choose between a system that is great for developers and a system that is great for administrators? Why choose between fantastic scalability and flexibility and a great collection of data integration sources and sinks?

Why can’t we have it all?

zionThis is why we helped write Flafka (unofficial code name) – a set of Kafka source and sink for Flume.

Flafka serves two important use-cases:

1. You have Kafka. You have apps pushing data into Kafka. But you need to get the data to Hadoop. Not just HDFS. You need HBase and Solr too. Maybe you even need to tweak the data a bit on the way – mask sensitive data for example or flag suspicious events.  Use Kafka-source with HDFS, HBase or Solr sinks. And add an interceptor to massage the data for you.

2. You need to land data from HTTP, log files, syslog or vmstat to Kafka. You don’t want to write your own producer and rather use existing tried-and-true solution. Use a Flume source with a Kafka sink to land data in Kafka without having to figure out how to write a producer.

Since Flume is configuration-only, Flafka allows non-developers to easily use Kafka and to integrate Kafka with Hadoop.
But we didn’t want to sacrifice flexibility for simplicity, so the Kafka source will support any configuration that a Kafka consumer will accept, and Kafka sink can be configured just like any producer. Out of the box, we tuned Flafka for reliability rather than pure speed, reasoning that preventing data loss is our most important mission. If you have different trade-offs and preferences, it is easy to test them with Flafka.

Like all other Flume sources and sink, Flafka is designed to work in batches – you can configure batch sizes and the Kafka source will write data in batches to the Flume channel, and the Kafka sink will read data in batches from the channel and send them to Kafka. This design will increase throughput and improve resource utilization, but will also increase latency. You can tune the balance between throughput and latency to match your requirements by experimenting with different batch sizes.

You can even configure multiple Kafka sources to read from the same topic. If you configure all of the with the same groupId, each source will read a different set of partitions, which will lead to improved throughput. This is true even if the sources are configured on different agents. If one of the agents crashes, the remaining agents will automatically re-balance the partitions between them, which reduces delays and data loss.

At the moment, there is no Flume release with Flafka, so you will need to build Flafka from Flume’s source repository and build it:

git clone https://github.com/apache/flume.git flume-local
cd flume-local
git checkout trunk
mvn package -DskipTests

After building, simply copy the jars from flume-ng-dist/target/apache-flume-1.6.0-SNAPSHOT-bin/apache-flume-1.6.0-SNAPSHOT-bin/lib  to /usr/lib/flume-ng/lib and restart your Flume agent.

To get you started, here is an example of how I configure Flume to read data from a Kafka topic and write it to an HDFS directory. The directory will have the same name as the topic, with subdirectories for each day of data:

 tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks    = sink1
 
 tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
 tier1.sources.source1.zookeeperConnect = shapira-1:2181
 tier1.sources.source1.topic = shapira
 tier1.sources.source1.groupId = flume
 tier1.sources.source1.channels = channel1
 tier1.sources.source1.interceptors = i1
 tier1.sources.source1.interceptors.i1.type = timestamp
 tier1.sources.source1.kafka.consumer.timeout.ms = 100
 
 tier1.channels.channel1.type   = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 1000
 
 
 tier1.sinks.sink1.type         = hdfs
 tier1.sinks.sink1.hdfs.path    = /tmp/shapira/kafka/%{topic}/%y-%m-%d
 tier1.sinks.sink1.hdfs.rollInterval = 5
 tier1.sinks.sink1.hdfs.rollSize = 0
 tier1.sinks.sink1.hdfs.rollCount = 0
 tier1.sinks.sink1.hdfs.fileType = DataStream
 tier1.sinks.sink1.channel      = channel1 

And here’s a Flume configuration for getting data from vmstat to Kafka:

 tier1.sources  = source1
 tier1.channels = channel1
 tier1.sinks    = sink1
 
 tier1.sources.source1.type = exec
 tier1.sources.source1.command = /usr/bin/vmstat 1
 tier1.sources.source1.channels = channel1
 
 tier1.channels.channel1.type   = memory
 tier1.channels.channel1.capacity = 10000
 tier1.channels.channel1.transactionCapacity = 1000
 
 
 tier1.sinks.sink1.type         = org.apache.flume.sink.kafka.KafkaSink
 tier1.sinks.sink1.topic = sink1
 tier1.sinks.sink1.brokerList = kafkagames-1:9092,kafkagames-2:9092
 tier1.sinks.sink1.channel = channel1
 tier1.sinks.sink1.batchSize = 20

I hope you’ll enjoy the ease of use that Flume brings to Kafka. Let us know your experiences in the comments, and open Jira tickets if you run into issues, have ideas for improvements or want to contribute a patch.

Tweet about this on TwitterShare on FacebookShare on LinkedIn

'Flume or Kafka? Try both!' have 19 comments

  1. October 16, 2014 @ 11:15 pm James Cheng

    Gwen,

    The links for Flume and Kafka go to apache.com, instead of apache.org.

    -James

    Reply

  2. October 27, 2014 @ 7:40 am nphung

    Hey, I have almost try to hack something from the sources to install Kafka-Source & Kafka-Sink for CDH5.2.0. Please warn people using CDH5.2.0 with Spark 1.1 that the jars are already available for them with the labs parcels of Kafka in CDH !

    Reply

  3. December 28, 2014 @ 11:46 am Vincent Murphy

    This comes up in Google results for “kafka flume” before the Flume repo, so leaving an update here:

    Hari Shreedharan’s Kafka Channel implementation (reviewed by Gwen) builds on the Kafka Source and Sinks to allow a potent combination: the well-tested Sinks (syslog) and Sources (HDFS) components from Flume with the durability, scalability and fault tolerant qualities of Kafka. https://issues.apache.org/jira/browse/FLUME-2500

    Config example: https://git-wip-us.apache.org/repos/asf/flume/repo?p=flume.git;a=blobdiff;f=flume-ng-doc/sphinx/FlumeUserGuide.rst;h=0ab23fd33e7a04c726ac90e0476b9b8fcc56e37c;hp=e3aedeb1937404c260a1b81e7f9746a1a2a511d3;hb=fdd61e840b004ca1b5c23b28e8d9eeda300f56f7;hpb=aef02df10a26a6b6911b771a506994f2069857cf

    Reply

  4. January 12, 2015 @ 10:45 pm Jason

    Thanks so much for the post.

    Did you try running this in distributed mode? We have a single-node Kafka installation and a separate HDFS cluster. Where would you suggest deploying the Flume agent, and would you suggest using multiple Flume agents (on the Kafka node, on a separate node or across the HDFS cluster) to take advantage of the potential parallelism?

    Reply

    • March 14, 2015 @ 1:37 am Debu Sinha

      This sounds like a POC setup, for testing you can have the Flume agent on the same node as Kafka broker. You can also move Flume to a separate node. In production setup they will all have their own clusters with multiple nodes.

      Reply

  5. February 9, 2015 @ 3:00 pm Parvesh

    Hi Gwen, I am following the instructions as mentioned in this article and running into following error in flume log file. Any pointers to what could be the issue here ..
    09 Feb 2015 08:48:13,495 ERROR [conf-file-poller-0] (org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run:149) – Unhandled error
    java.lang.NoSuchMethodError: org.apache.flume.instrumentation.SourceCounter.(Ljava/lang/String;[Ljava/lang/String;)V
    at org.apache.flume.instrumentation.kafka.KafkaSourceCounter.(KafkaSourceCounter.java:37)
    at org.apache.flume.source.kafka.KafkaSource.configure(KafkaSource.java:190)
    at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:331)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

    Reply

    • February 9, 2015 @ 8:57 pm Gwen Shapira

      Sorry, we made changes in Flafka that made it depend on new additions to Flume-core. So you can no longer just build the source / sink and use Flume from an older version.
      If this is a test environment, you can build a new Flume from sources.
      If this is production, I’d wait for Flume 1.6 to ship (or use CDH 5.2 / 5.3 where Flafka is included).

      Reply

  6. February 27, 2015 @ 6:44 am colman madden

    when using a kafka channel and kafka sink I get the below errors. Also another issue I am struggling with is when I use a C# producer to write to an avro record to a kafka topic. When I then consumer the message using flume, and sink it to the hdfs, it writes the avro records with a schema after every individual record. Is the only way around this to get the C# producer to generate the avro message with a schema of AvroFlumeEvent? And if so can this message still be consumed by other consumers such as spark and storm?

    15/02/27 06:39:14 WARN kafka.KafkaChannel: Error while getting events from Kafka
    java.lang.IndexOutOfBoundsException
    at java.io.ByteArrayInputStream.read(ByteArrayInputStream.java:180)
    at org.apache.avro.io.DirectBinaryDecoder.doReadBytes(DirectBinaryDecoder.java:184)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
    at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:348)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:341)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
    at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:243)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:177)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:139)
    at org.apache.flume.channel.kafka.KafkaChannel$KafkaTransaction.doTake(KafkaChannel.java:317)
    at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
    at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
    at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:382)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
    15/02/27 06:39:14 ERROR hdfs.HDFSEventSink: process failed
    org.apache.flume.ChannelException: Error while getting events from Kafka

    Reply

  7. March 6, 2015 @ 7:16 pm Sergio Moreno

    Gwen, great article! Thank you so much for sharing.
    It also sounds like Flafka might give enough flexibility to overcome some limitations Camus imposes. I have a couple of questions: is the Kafka channel available as well in the current Flume trunk? (I think using just a channel to dump data into hadoop is great!).

    Reply

    • March 8, 2015 @ 1:29 am Gwen Shapira

      I also think that using just a channel to dump data to Hadoop is awesome :)

      Yes, the channel is committed to current Flume trunk.

      Reply

  8. March 14, 2015 @ 1:27 am Debu Sinha

    Gwen, Thanks a lot for the post. Flafka concept is really awesome and simple. Really enjoyed your talk comparing various data inclusion frameworks. Your reasoning to start learning Spark was hilarious. I am in the process of architecting a data processing pipeline and was evaluating my options between Camus and Flume for the final delivery of messages to HDFS, and this article gave valuable insights into it. :) Cheers!

    Reply

  9. April 13, 2015 @ 9:55 pm Mani

    Hello there – Thanks for this post. we ran into this issue and any input will be of great help.

    The kafka and flume are running on a different server , trying to push the data into HDFS cluster on a different node (mycluster.host.net)
    which is running HDP2.2 (HDFS 2.6.0.2.2.0.0)

    2015-04-13 16:26:54,206 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO – org.apache.flume.sink.hdfs.BucketWriter.open(BucketWriter.java:234)] Creating hdfs://mycluster.host.net/tmp/truckevent/15-04-13/FlumeData.1428960414127.tmp
    2015-04-13 16:26:54,585 (SinkRunner-PollingRunner-DefaultSinkProcessor) [WARN – org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:455)] HDFS IO error
    org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
    at org.apache.hadoop.ipc.Client.call(Client.java:1113)
    at org.apache.hadoop.ipc.RPC$Invoker.invoke(RPC.java:229)
    at com.sun.proxy.$Proxy5.getProtocolVersion(Unknown Source)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:85)
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:62)
    at com.sun.proxy.$Proxy5.getProtocolVersion(Unknown Source)
    at org.apache.hadoop.ipc.RPC.checkVersion(RPC.java:422)
    at org.apache.hadoop.hdfs.DFSClient.createNamenode(DFSClient.java:183)
    at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:281)
    at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:245)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:100)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1446)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:67)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1464)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:263)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:187)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:243)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)
    at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
    at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
    at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)

    Reply

  10. April 29, 2015 @ 5:39 am Jonathan

    Gwen, Trying to install Flafka on CDH 5.1.3 ( I cannot upgrade to 5.3 due to other incompatibility issue.) I have Flume agent running on seperate node running thru Cloudera Manager. I have built the apache-flume-1.7.0-SNAPSHOT and copied the kafka_2.10-0.8.1.1.jar into the plugin directory. I restart the Flume Agent and get the following error:

    ERROR org.apache.flume.node.PollingPropertiesFileConfigurationProvider
    Failed to load configuration data. Exception follows.
    org.apache.flume.FlumeException: Unable to load source type: org.apache.flume.source.kafka.KafkaSource, class: org.apache.flume.source.kafka.KafkaSource
    at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:67)
    at org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:40)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:327)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:102)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassNotFoundException: org.apache.flume.source.kafka.KafkaSource
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:190)
    at org.apache.flume.source.DefaultSourceFactory.getClass(DefaultSourceFactory.java:65)
    … 11 more

    Reply

    • May 12, 2015 @ 10:09 am Gwen Shapira

      Do you have a KafkaSource jar in your classpath? Looks like it is missing, although if you built Flume from trunk, it should have been there.

      Reply

  11. May 8, 2015 @ 2:22 pm Alex Mc

    Will this code be incorporated back into the official Flume sources? I use Hortonworks and would much rather have a version that they compiled and tested rather than me

    Reply

    • May 12, 2015 @ 9:59 am Gwen Shapira

      It is already in the official Apache Flume trunk.

      Flume 1.6.0 will include Flafka.

      I can’t comment on when Hortonworks will choose to upgrade to Flume 1.6.0.

      Reply

      • May 14, 2015 @ 3:07 am yangzhongjie

        HELLO:
        I want to kown if kafkaChannel can add flume 1.5(like add kafkaChannel .jar and properties in classpath) ,work together. thank you.

        Reply

  12. September 9, 2015 @ 12:54 pm Alex

    Nice article! I am not very familiar with Flume, in the case where I would send all my messages to Kafka and use Flume as way to land the data in HDFS (use case number 1), can exactly once processing be achieved? Or only at least once processing?

    HDFS being the “source of truth”, preventing data loss when writing to it is indeed very important but avoiding duplicates too :)

    Reply


Would you like to share your thoughts?

Your email address will not be published.