Friday, December 26, 2014

Extending CentOS disk space (adding more lvm)


How to extend filesystem capacity in hadoop data node?

First, attach your new hard drive to your PC (or add virtual hard drive if you are running on VirtualBox)

Mount gParted ISO on your CentOS node.
Boot with gParted.

Create partition table for new hard drive:
Device > Create Partition Table>  GPT is recommended

Create new partition: "lvm2 pv" is recommended

Boot CentOs
Stop hadoop services on node.

fdisk -l

vgextend vg_hadoop2 /dev/sdb1

lvdisplay

lvextend -L+8.48G /dev/vg_hadoop2/lv_root

resize2fs -F /dev/vg_hadoop2/lv_root

df -h

That's it

Thursday, November 27, 2014

Tashkent House Price Analysis


Recently  I've got an idea to analyze prices of houses in Tashkent city.
"www.zor.uz" is one of the most famous sites where people sell houses, cars, electronics and etc.
I had to crawl it and collect data from it.
Unfortunately, "www.zor.uz" does not have the old data, the latest data that  I found there was just one month old. They seem to clean their DB every month.

I needed data for previous years. So, I used "Way Back Machine" that has captured points of all website around the world. It is really a cool stuff to try : https://archive.org/index.php
But they use "https" so I had download their certificate and register it to my JRE.

I crawled that site and succeeded to get data starting from 2009.08 (that was the earliest capture point of www.zor.uz)
Interestingly "WayBackMachine" captures websites periodically, depending how often website changes. They have their own logic to capture sites for optimizing their storage.

Anyways, The data I crawled was from specific months of the year.
After crawling the data, for simplicity I just exported data to Excel sheet.
Then filtered data (removed duplicates, meaningless entities, spams, and zero values) and sorted by date.

Here is graph that displays price change for houses with 2 rooms:


You can see that average prices of 2-room houses during 2009.07 ~2014.08  raised from 30,000$ to 42,000$

By drawing the moving average we can identify when the prices was raising and falling:


Interestingly, houses prices raise during summer and it drops during the winter.
There can be more results can retrieved, but at the moment I am too busy with other projects, and will upload more stats later on.



Tuesday, November 18, 2014

Why it is better not to use VirtualBox(VMWare etc..)

Cons

1. VirtualBox can not use full 100% resources of PC. Your hadoop will not be as fast as you expect.

2. You have to start virtual nodes whenever your reboot your PC (unless if you made some auto-start scripts)

3. Several virtual nodes in the same PC can cause mismanagement of resources. If RAM usage of VirtualBox nodes exceeds the maximum then your PC just crashes.


Though, VirtualBox is a good starting point to build experimental hadoop environment.

Pros

1. You can create several nodes in the same PC (if you lack PCs)
2. VirtualBox has Export /Import features that helps you to do "Do once - Copy many" 
3. Do experiments by changing hardware features (add CPUs, reduce RAM etc..)

For learning hadoop VirtualBox can be helpful to build hadoop environment.
However, for production purpose it is better to avoid Virtualbox and use PC itself as a single node.


Tuesday, September 2, 2014

Hbase: 'Put' Performance Tuning


When you have millions of data to insert into Hbase, use bulk method which increases Put speed.
Make sure to turn off AutoFlush option to False.


HTable tableTermMatrix = new HTable(conf,TABLE_Matrix);
tableTermMatrix.setAutoFlush(false);

Then inside your data insertion loop:

tableTermMatrix.put(put);
cnt++;
if(cnt>=5000)
{
cnt=0;
tableTermMatrix.flushCommits();
}

which will flush the data to RegionServer once for 5000 entries.


For me, after using this bulk method total insertion time into HBase changed from
60 minutes to 2.5 minutes.



Tuesday, August 26, 2014

Streaming Twitter tweets to HBase with Apache Flume

            Apache Hbase is a great noSQL database for storing enormous amount of data that can scale in three axis. Apache Hbase was based on Google's BigTable that stores all  web contents in internet. By knowing row key and column id we can retrieve the value at the matter of milliseconds.
HBase runs on top of HDFS and friendly with MapReduce tasks. So it can scale up together with Hadoop.
One thing which seems to be disadvantage is HBase depends on ZooKeeper, while other big table based databases like Cassandra is independent. Nevertheless, I did not face any problem with it.
Apache Hbase is really fast. Currently I am using it for TF-IDF based keyword retrieval, and it can retrieve results from 2 million tweets in few seconds.
Anyways, Let me get back to the topic.

My plan was to stream twitter data directly to Hbase by using Apache Flume. Fortunately, Flume has a Hbase sink plugin that comes by default in lib folder. We can use two kinds of sinks : HBaseSink and AsyncHBaseSink. The latter is highly recommended.
While using this plugin, we need to have a serializer that takes Events from Flume and serializes the data then puts into Hbase table.
There are some default serializers which can be used like SimpleAsyncHbaseEventSerializer, however they don't suit for us as our data is not simple, it is a twitter data.
That is why we need to user AsyncHbaseEventSerializer interface  to create our own serializer.

Our serializer simply gets the twitter events(statutes) and puts it into Hbase.
You can check the source code at : https://github.com/ahikmat85/flume2basesplitter

Flume conf:

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = hbaseSink

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey =
TwitterAgent.sources.Twitter.consumerSecret =
TwitterAgent.sources.Twitter.accessToken =
TwitterAgent.sources.Twitter.accessTokenSecret =
TwitterAgent.sources.Twitter.keywords =

TwitterAgent.sinks.hbaseSink.type=org.apache.flume.sink.hbase.AsyncHBaseSink
TwitterAgent.sinks.hbaseSink.channel=MemChannel
TwitterAgent.sinks.hbaseSink.table=hbtweet
TwitterAgent.sinks.hbaseSink.columnFamily=tweet
TwitterAgent.sinks.hbaseSink.serializer=org.apache.flume.sink.hbase.SplittingSerializer
TwitterAgent.sinks.hbaseSink.serializer.columns=tweet:nothing

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity =1000


Reference:  https://blogs.apache.org/flume/entry/streaming_data_into_apache_hbase

Saturday, July 19, 2014

ThreeJs : South Korea's Entry statistics from Asia

     By using ThreeJS (HTML5 supported library) I have been modifying existing code and visualizing the amount of people's entry to South Korea from various countries of Asia. Japan and China is on the top.


Tuesday, July 15, 2014

Building Term Matrix on HBase and Calculating TF-IDF

     Finding term's related terms in real-time would not be easy if you have 2 millions of tweets and 10 millions of words. Although Hadoop can process huge amount of data in parallel it is not real-time.
What if we want to see top  related words for the term "축구" in real-time.
Top related words can be found by calculating TF(Term frequency) values for each word a certain set of documents. Then we calculate IDF(Inverted Document Frequency) values by looking at all documents. IDF helps to exclude  meaningless words (이, 다, 나, 너, 니, 하고, 보다  and so on) . Then We calculate TFIDF and sort all of them by the same values.
Check out TFIDF on wiki: http://en.wikipedia.org/wiki/Tf%E2%80%93idf

The process is quite complicated, but I will try to explain it as simple as possible.
Here are the steps that I went through:

1. Extracting tweetId and the term from Hive into a Text file.

   By using Hive I split each tweet into words and separated each word in a line with its corresponding tweet id.  HiveQL query looks like this:

> select id_str, txt from twitter lateral view explode(split(extractNoun(text),' ')) words as txt where datehour=2014071023 and text not like '%RT %' and source not like '%bot%' and user.lang='ko';

The output look something like this:

487069487825297408      농구덕
487069487825297408      무슨짓
487069487825297408      한
487069488517365761      오늘
487069488517365761      이상
487069488517365761      버스
487069488517365761      흑흑
487069488517365761      시
487069488517365761      경계
487069488517365761      때

Then I stored this a text file.

2. Creating tables in HBase

 We need 3 tables: "TermMatrix" , "Docs" , "Stats". Here is the schema sample:

"Stats" table stores the total number of tweets per each hour.
From the above table "TermMatrix" you can see that Row Key is "term" , Columns represent each hour and cell contains the list of documents(tweets) that has that term.
These two tables helps us to calculate TF, IDF values.

3. Inserting data into Hbase Tables.

    I insert the data into HBase tables from above text file which we created in step 1.


4. Calculating TF-IDF

I made a simple calculation example about how to get TFIDF:

Target Term is searched keyword, in our case it is "축구"
In above image I intentionally removed "Log" function to simplify explanation.
Actual IDF Formula is     IDF = LOG(NumberOfDocs / TermTotalCount)

From TFIDF calculation you can see that TFIDF value of "is" drastically low from "people" because of it is used in many documents.

My Final Results:


My second searched keyword was "브라질"

getting tids took:25
building random list took:206
building freq List took :1562
sorting took :4
getting total took :130
final tfidf calcul took :1309

term:브라질           tfidf:10.10 
term:독일               tfidf:1.50 
term:월드컵           tfidf:1.33 
term:수니              tfidf:1.08 
term:축구             tfidf:0.98 
term:7                   tfidf:0.92 
term:독                tfidf:0.89 
term:1                tfidf:0.87 
term:경기          tfidf:0.86 
term:네덜란드 tfidf:0.81 
term:참패         tfidf:0.79 
term:4강전       tfidf:0.60 
term:콜롬비아 tfidf:0.59 
term:음주가무 tfidf:0.58 
term:회식        tfidf:0.57 
term:현지       tfidf:0.51 
term:우승      tfidf:0.50 
term:홍명보 tfidf:0.49 
term:응원     tfidf:0.49 

14/07/16 13:40:55 INFO client.HConnectionManager$HConnectionImplementation: Closed zookeeper sessionid=0x146ea39ad3fba67
14/07/16 13:40:55 INFO zookeeper.ZooKeeper: Session: 0x146ea39ad3fba67 closed
Done......
Total processing in Secs:5

It is taking around 5 seconds to get "브라질" related terms with TFIDF sorting of one day tweet.
I am working on optimization to reduce it.

Monday, July 14, 2014

Solving java.lang.ClassNotFoundException problem while exporting Jar file from Eclipse

While running my compiled jar files on hadoop cluster I faced this error many times:

Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration

Usually, I don't use maven and attach libraries manually to the project.
The libraries that I use are mostly apache's libraries related to hadoop, hive, hbase and etc..
After successful compilation I export the project as a Jar file.
I tried to export in many ways but I faced this ClassNotFoundException every time.
So I realized that exporting my project with attached jar libraries is not the optimal solution for the problem.
Then I found this solution on StackOverflow
http://stackoverflow.com/questions/2096283/including-jars-in-classpath-on-commandline-javac-or-apt

It describes the way how to add required (dependency) jar files while running the main jar.
It is simple! You just put all jars into the same folder where your main jar file is and Run:

>java -cp *:. com.kh.demo.RealtimeTFIDF

Note that you have to write down the full package name of your main class.
Asterix * tells java to load jar files from current directory.

Now you don't need to check Libs path while exporting your project as a jar file, i.e You don't need to include dependency jar files inside the main jar file.

Thursday, June 19, 2014

ThreeJs - 3D data visualization on web


     I was into 3D since high school years. My first experience was making 3D models with 3Dsmax.
Since few years back Google Chrome had been trying to bring this technology to all desktops through web. With the rise of HTML5 it became possible to render 3D on client side by using WebGL via javascript. However, using WebGL via Javascript was not easy so open-source communities developed various wrapper libraries to simplify using webGL.
   Among them ThreeJS (http://threejs.org/) is one the best choice to use IMHO. It is more mature comparing to others, and there are many examples to follow.

It is better to see rather than reading, Check out this ThreeJS example: http://www.georgeandjonathan.com/#6
Is not it amazing?
Dynamic, interactive and open-source 3D visualization directly on your Chrome Browser.
It runs on the latest Android Tablets and Phones too.
In few years, I think everyone will be able to browse 3D sites on their devices.

So, by using this library I am trying to make some impressing data visualization.
Here is how I visualized the Word Cloud in 3D:


  Unfortunately, it is just a screenshot from my browser and next time I am going to upload live demo.

Friday, June 13, 2014

Korean NLP on Hive

These days I am building a small platform for doing "Topic Modeling" with Apache Mahout.
While working on this project I tried to maximize the speed of NLP processing as fast as possible. Previously, I was running NLP on a single node after retrieving the tweet messages from Hive. Then I found out about UDF(User Defined Functions) which makes it possible to run custom libraries on Hadoop nodes during the mapping process of MapReduce.
By using UDF, I  attached open-source Korean NLP library Hannanum and made a library to process NLP and Text processing parallel.
Here is how the hive query looks like:

>select extractNoun(text) from twitter;

I put the source code on GitHub
Feel free to use and ask questions.

Here is my K-Means dumped cluster output (processing k=100 topics with one-day tweet data)
Output.txt

Wednesday, June 11, 2014

Hive: Creating External Table from existing one


   Apache Hive is an excellent tool to retrieve data from HDFS  using SQL queries. I used Json Serde library to map Json files into Hive table.
So initially I created table like this:

CREATE EXTERNAL TABLE twitter (
  coordinates struct, type:string,
  created_at string,
  entities struct, text:string, urls:array, url:string, user_mentions:array, name:string, screen_name:string,
  favorite_count int,
  favorited boolean,
  filter_level string,
  geo struct, type:string,
  id_str string,
  in_reply_to_screen_name string,
  in_reply_to_status_id_str string,
  in_reply_to_user_id_str string,
  lang string,
  retweet_count int,
  retweeted boolean,
  retweeted_status struct, text:string, urls:array, url:string;, user_mentions:array, name:string, screen_name:string, favorite_count:int, favorited:boolean, geo:struct, type:string, id_str:string, in_reply_to_screen_name:string, in_reply_to_status_id_str:string, in_reply_to_user_id_str:string, lang:string, retweet_count:int, retweeted:boolean, source:string, text:string, truncated:boolean, user:struct,
  source string,
  text string,
  truncated boolean,
  user struct)
PARTITIONED BY (datehour INT)
ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION '/user/hive/warehouse/twitter';

Everything worked great, till I realized that the mapping was not working so well.. I was not able to retrieve original tweets of retweets(Retweeted_status) as there was some problem with data type mismatch.
So, I simplified the table schema with this:

CREATE EXTERNAL TABLE twitternew (
  id BIGINT,
  created_at STRING,
  source STRING,
  favorited BOOLEAN,
  retweet_count INT,
  retweeted_status STRUCT<
    text:STRING,
    user:STRUCT>,
  entities STRUCT<
    urls:ARRAY>,
    user_mentions:ARRAY>,
    hashtags:ARRAY>>,

  text STRING,
  user STRUCT<
    screen_name:STRING,
    name:STRING,
    friends_count:INT,
    followers_count:INT,
    statuses_count:INT,
    verified:BOOLEAN,
    utc_offset:INT,
    time_zone:STRING>,
  in_reply_to_screen_name STRING
)
PARTITIONED BY (datehour INT)
ROW FORMAT SERDE 'com.cloudera.hive.serde.JSONSerDe'
LOCATION '/user/hive/warehouse/twitter';
Now,  How we can move already partitioned table data from previous to the new one.
There were several options, First option is to execute HQL "load data inpath" again for each hourly accumulated folder. But it would take a lot of time moving all data from one folder to another.
So I found easy alternative way

"alter table twitternew add partition (datehour=2014010100);"

Call above script for each already indexed hours. In my case I wrote a bash script to call from "2014010100" to "2014061210". This will update the Hive metastore without moving any files anywhere.

Note that new table "twitternew" table has the same LOCATION path as previous table, also the partition name datehour must be same too.

You can drop the old table after successfully running your test sql queries. good luck!

Friday, May 30, 2014

Running Mahout K-Means example

   CDH4 comes with Mahout library by default. You don't need to install mahout unless if you want to upgrade to the latest version.
Mahout is a scalable machine learning library which supports many algorithms for clustering, classification, topic modeling, prediction and recommendation systems. It can have terabytes of input data and process the clustering or classification in less than an hour depending on how powerful is your hadoop cluster. In my case I am using 4 powerful PCs with virtual nodes.
   I had a task to model topics from twitter data for 2 months. I tried to run LDA(Latent Dirichlet Allocation - topic modeling algorithm) with R on one PC and it took several hours to build a document matrix of one day tweets(around 800,000) . And for improving results the number of topics K should be bigger than 500 thus exponentially increasing the LDA processing time. It is how I turned into Mahout.
   Except Mahout there are other parallel machine learning libraries. Notable ones are MLlib , SparkR, hiveR. Initially I tried sparkR and MLlib, as they depend on Apache Spark(alternative to hadoop mapreduce) it was not mature enough to run huge datasets. Apache Spark is very fast and promising in-memory map-reduce library, which makes it kind a memory eater. There were many cases where I tried to run 2 queries with Apache Shark  on parallel and the Spark just died by itself. It could not handle heavy load.
I did not try to use hiveR yet, I will post later about it.
Three reasons why  I chose Mahout are stability, maturity and a good support.

Let us run Mahout "k-means" example on CDH4 node.
Go to "/opt/cloudera/parcels/CDH/share/doc/mahout-doc-0.7+22/examples/bin" folder and list out contents. There is "cluster-reuters.sh" file that downloads input data set(news contents from reuters), generates sequence files and converts them into vectors(tf, tf-idf, word count etc..)
Note that before running shell script you need to define HADOOP_HOME,MAHOUT_HOME,CLASSPATH..

Add this to the beginning of "clustter-reuters.sh" file:

HADOOP_HOME="/opt/cloudera/parcels/CDH/lib/hadoop"
MAHOUT_HOME="/opt/cloudera/parcels/CDH/lib/mahout"
MAHOUT=$MAHOUT_HOME/bin/mahout
CLASSPATH=${CLASSPATH}:$MAHOUT_HOME/lib/hadoop/hadoop-common.jar
for f in $MAHOUT_HOME/lib/*.jar; do
        CLASSPATH=${CLASSPATH}:$f;
done

Make sure to run script with "hdfs" user privilege and all accessing folders must have hdfs privilege.
>su hdfs
this will make sure that you won't mess up with account privileges, after the dataset is downloaded the sequence files are created on a local directory. 
You can change the script to generate sequence files on HDFS directly. 
Just remove "MAHOUT_LOCAL=true" from the script to generate sequence files on HDFS.

One last thing, change "dfs" commands to "fs" , dfs was the old deprecated version of HDFS command.

Tuesday, May 27, 2014

How I visualized the issues related to Sewol for 04.16 ~ 05.05



세월호 이슈 타임라인 from Hikmat on Vimeo.

In April 16 there was a big tragedy in Korea, the ferry named Sewol (세월호) sank just in few hours causing the death of hundreds of students.
After the tragedy, there were millions of tweets talking about sewol related issues. By analyzing these tweets we could see how people felt about sewol tragedy, how their opinions changed during the time, whom they blamed and how much they hoped for the survival of missing students.

  We calculated  TF-IDF  values with Sewol keywords, and from those we selected the top issues related to this tragedy. We exported results to excel where the rows represented the date and time, and the columns represented the words.
  I used 3ds max for visualization. It has MaxScript programming environment. I made a script to import the data from excel and draw the texts on 3D space, then animated their size by their TF-IDF value. The position of words are distributed randomly. Watch it and feel free to comment what you observed from the video.

here is the MaxScript code

Which device is famous for tweeting in Korea

I was wondering how many people use android to tweet and how many use iphone, so I did little bit a research on this,
Fortunately, there is a "source" field in raw json tweet data, so I grouped tweets by that field for random day.

hive>  select source, count(*) as cnt from twitter where datehour>=2014052000 and datehour<=2014052023 group by source order by cnt desc;  

Result:


Android 593671
Web 204334
Twittbot.net 153035
iPhone 144256
Tweetdeck 65767
iPad 19933














Note that the tweets are for May 20 including only the ones with korean syllables

Monday, May 26, 2014

NLP (Natural Language Processing) libraries for Korean language

There are mainly 2 available NLP libraries for Korean language. The first one is open source and the other is closed source with limited period of license.

1. Kookmin NLP library
(http://nlp.kookmin.ac.kr/)

This library has better dictionary, and word spacing feature which gives a nice output. It has a very long development history and considered to be one the best NLP libraries for Korean language. Its features include: automatic word spacing, morphological analyzer, noun extraction and others.
However, the license is free only for non-commercial purpose.
The download page is in :http://nlp.kookmin.ac.kr/HAM/kor/download.html
But the latest version you can find in this blog: http://cafe.daum.net/nlpk


2. Hannanum project ("http://kldp.net/projects/hannanum")

Good thing about this project is it is fully open-source.
Developed by KAIST graduates using JAVA programming language.
The dictionaries and grammatical rules are open to change and improve.
Also it is available in "R" programming language (you can use it by installing  KoNlp library in R)
I liked the simplicity and and its openness about this project, however its dictionary lacks many common korean words (I added the word 세월호 manually as it was not there), no auto spacing words and missing some grammar rules.

Conclusion:


Kookmin NLP library

Pros: better auto word spacing, rich dictionary, better recognizing of nouns, verbs, adj and etc..
Cons: closed source, license is not free, command line based

Hannanum project

Pros: open-source, Java library, free, simplicity, available in R
Cons: small dictionary, no auto spacing, missing grammar rules(e.g: 학생들 was not recognized, even the word 학생 was in dictionary)

Thursday, May 22, 2014

Three essential things to do while building Hadoop environment

Last year I setup Hadoop environment by using Cloudera manager. (Basically I followed this video tutorial : http://www.youtube.com/watch?v=CobVqNMiqww)
I used CDH4(cloudera hadoop)  that included HDFS, MapReduce, Hive, ZooKeeper HBase, Flume and other essential components. It also included YARN (MapReduce 2) but it was not stable so I used MapReduce instead.
I installed CDH4 on 10 centos nodes, and I set the Flume to collect twitter data, and by using "crontab" I scheduled the indexing the twitter data in Hive.
Anyways, I want to share some of my experiences  and challenges that I faced.
First, let me give some problem solutions that everyone must had faced while using Hadoop.

1. vm.swappiness warning on hadoop nodes

It is easy to get rid of this warning by just simply running this shell command on nodes:
>sysctl -w vm.swappiness=0
More details are written on cloudera's site

2. Make sure to synchronize time on all nodes (otherwise it will give error on nodes)

In Linux there is a module to sync time via internet. Do these for all nodes.
Install ntp: 
>yum install ntp
Run these commands by order
>chkconfig ntpd on
>ntpdate pool.ntp.org
>/etc/init.d/ntpd start

3. Problem uploading files to HDFS (hadoop fs -put localfiles destinationOnHdfs)

This error message says: "INFO hdfs.DFSClient: Exception in createBlockOutputStream” while uploading file to HDFS"

This is caused by firewall settings. Here is the solution:
>service iptables save
>service iptables stop
>chkconfig iptables off
do these on all nodes. Now you should be uploading files to HDFS without any problem.