Tuesday, August 26, 2014

Streaming Twitter tweets to HBase with Apache Flume

            Apache Hbase is a great noSQL database for storing enormous amount of data that can scale in three axis. Apache Hbase was based on Google's BigTable that stores all  web contents in internet. By knowing row key and column id we can retrieve the value at the matter of milliseconds.
HBase runs on top of HDFS and friendly with MapReduce tasks. So it can scale up together with Hadoop.
One thing which seems to be disadvantage is HBase depends on ZooKeeper, while other big table based databases like Cassandra is independent. Nevertheless, I did not face any problem with it.
Apache Hbase is really fast. Currently I am using it for TF-IDF based keyword retrieval, and it can retrieve results from 2 million tweets in few seconds.
Anyways, Let me get back to the topic.

My plan was to stream twitter data directly to Hbase by using Apache Flume. Fortunately, Flume has a Hbase sink plugin that comes by default in lib folder. We can use two kinds of sinks : HBaseSink and AsyncHBaseSink. The latter is highly recommended.
While using this plugin, we need to have a serializer that takes Events from Flume and serializes the data then puts into Hbase table.
There are some default serializers which can be used like SimpleAsyncHbaseEventSerializer, however they don't suit for us as our data is not simple, it is a twitter data.
That is why we need to user AsyncHbaseEventSerializer interface  to create our own serializer.

Our serializer simply gets the twitter events(statutes) and puts it into Hbase.
You can check the source code at : https://github.com/ahikmat85/flume2basesplitter

Flume conf:

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = hbaseSink

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey =
TwitterAgent.sources.Twitter.consumerSecret =
TwitterAgent.sources.Twitter.accessToken =
TwitterAgent.sources.Twitter.accessTokenSecret =
TwitterAgent.sources.Twitter.keywords =

TwitterAgent.sinks.hbaseSink.type=org.apache.flume.sink.hbase.AsyncHBaseSink
TwitterAgent.sinks.hbaseSink.channel=MemChannel
TwitterAgent.sinks.hbaseSink.table=hbtweet
TwitterAgent.sinks.hbaseSink.columnFamily=tweet
TwitterAgent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.SplittingSerializer
TwitterAgent.sinks.hbaseSink.serializer.columns=tweet:nothing

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity =1000


Reference:  https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase

3 comments:

  1. This comment has been removed by the author.

    ReplyDelete
  2. Hi,

    I am getting the following error while running the Flume.

    16/03/13 17:16:55 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
    org.apache.flume.FlumeException: No row key found in headers!
    at org.apache.flume.sink.hbase.SplittingSerializer.setEvent(SplittingSerializer.java:36)
    at org.apache.flume.sink.hbase.AsyncHBaseSink.process(AsyncHBaseSink.java:223)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
    16/03/13 17:16:57 INFO twitter.TwitterSource: Processed 100 docs
    16/03/13 17:16:59 INFO twitter.TwitterSource: Processed 200 docs
    16/03/13 17:17:00 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
    org.apache.flume.FlumeException: No row key found in headers!
    at org.apache.flume.sink.hbase.SplittingSerializer.setEvent(SplittingSerializer.java:36)
    at org.apache.flume.sink.hbase.AsyncHBaseSink.process(AsyncHBaseSink.java:223)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)




    I am new to Java and Flume both. Can you help me out in figuring what should i change to make it run?

    ReplyDelete