Friday, August 14, 2015

Spark Streaming WordCount Example


Spark Streaming : Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

For more visit :  Spark Streaming



We will write a basic Word Count Program which will be running on port 9999 using nc (netcat utility)

We are going to run spark in local mode by starting spark-shell

Let's get going.

Start the spark-shell

./bin/spark-shell

Import the StreamingCOntext

import org.apache.spark.streaming._

Create a new Spark Streaming context using the default avaiable SparkContext(sc)

val ssc = new StreamingContext(sc,Seconds(5))

// The time window is set to 5 seconds

Create a Dstream that will connect to localost on port 9999

val lines = new ssc.socketTextStream("localhost",9999)

flatMap is combination of Map and Flatten used individually. So line would get split on basis of white space.

val words = lines.flatMap(_.split(" "))

Now let us create pairs of each word with 1 

val pairs = words.map(word => (word,1))

using reduceByKey to find the final count of each word. Also check this to find when to use reduceByKey vs groupByKey .

val wordCounts = pairs.reduceByKey(_+_)

Next we will print each word and it's count.

wordCounts.print()


Notice that this would not print anything until we start the StreamingContext.

On a differnet termial start the nc -lk on port 9999 (using the netcat utility here)

nc -lk 9999



Add few lines on the terminal. In this example I have used the contents from this link .




Now let's start the ssc Context.

ssc.start()

ssc.awaitTermination()

//make sure that as soon as we start the StreamingContext , we would not be able to write this, we can forcefully stop this by pressing Ctrl+C (On Cluster mode this is not a problem)

O/P

Next : Coming Soon : Spark Streaming with Kafka.

Stay Tuned !!! ;)

Git and GitHub Tutorial

GitHub is a Web-based Git repository hosting service, which offers all of the distributed revision control and source code management (SCM) functionality of Git as well as adding its own features. 

Unlike Git, which is strictly a command-line tool, GitHub provides a Web-based graphical interface and desktop as well as mobile integration. It also provides access control and several collaboration features such as bug tracking, feature requests, task management, and wikis for every project.

Source : Github Wiki



Terms and Definitions.

SNAPSHOT/COMMIT
It commits a snapshot that you can refer back to.

CACHE(INDEX / STAGING AREA)
Place where all the active files are placed , monitored before committing

Git Repository 
Located in local folder staging , any folder/dir when initialized becomes git repository.

Create pair of Public / Private Keys

ssh-keygen -t rsa -b 4096 -C "your_email@example.com"

To check our public key works 
After Adding public key to github SSH Keys


Configure Git

git config --global user.name "Name"
git config --global user.email "Email@domain"

To check the config

git config --list 
Now  let's create a directory

mkdir gitproject  // master branch for our project 
cd gitproject
vim readme.txt
vim test.txt

git status  // tells what branch you are in , what files have changed, what files are tracked , hints on what to do next.

Now let's add the files to staging are 

git add .  // shortcut to add all the files in present directory

git rm --cached <filename> // to remove file from staging area

git commit -m "message for commit"

git log // check the commit log

git diff  // check the difference of updated file before adding to the staged area

git diff --cached //check the diff after adding to staged area

git log --oneline //only commit messages

git commit -a -m "message"  // adding files to staged area and committing in one line

git status -s  // short status 

Create a new repository on GitHub and copy the link
Eg(https://github.com/viprai91/SampleTutorial.git)

To push an existing repository to github 

git remote add origin "name of remote repo"

(Eg : git remote add origin https://github.com/viprai91/SampleTutorial.git)

git push origin master 

// origin is the name we gave in above step , master is the parent directory , also once added to remote , use the same command to push the changes

Branching - Allows to create a separate working copy of code

Merging - Allows to merge branches together

Cloning - Other developers can get copy of code.

Branching - 
git branch    // tells all the branches available for the repository

To create a new Branch

git checkout -b "branch_name"  // creates branch and sets new branch as working branch

or

git branch "name"  // creates branch with the name
eg: git branch r2_index

to change the branch (Note the current branch will have * before the name)

git checkout r2_index
 // Switched to branch 'r2_index'

// to clone the same repo for other dev on diff computer use clone
git clone sshlink

Merge
// to merge changes from branches to master 
git merge master // when we are in r2_index and need to merge to master

Pull
Pull syncs local repository with remote repo

git pull github master

Note: Always pull before push, so that we have the latest version of the code which might have been edited by someone else.

Monday, August 10, 2015

reduceByKey vs groupByKey

reduceByKey and groupByKey are both transformations on RDD in Apache Spark.

In this post we would try to answer few of the questions such as :

But which one to use ?

Which is more efficient ?

When to use reduceByKey and when to use groupByKey ?

To understand this we need to understand the underlying architecture of both the transformations.

The most important part of the execution stage is the Shuffle Phase.

So let us compare this for both reduceByKey and groupByKey

ReduceByKey Shuffle Step :

Data is combined so that at each partition there is at most one value for each key.
And then shuffle happens and it is sent over the network to some particular executor for some action such as reduce.





GroupByKey Shuffle Step :


It does not merge the values for the key but directly the shuffle step happens 
and  here lot of data gets sent to each partition, almost same as the initial data.

And the merging of values for each key happens after the shuffle step.
Here lot of data needs to be stored on final worker node (reducer) thereby resulting in out of memory issue.




Therefore we understand that :

ReduceByKey is more efficient when we run this on large data set.

GroupByKey can cause out of Disk Problem.


Why do we have GroupByKey then ?

Not every problem can be solved by GroupByKey.


Like we would not want to aggregate over same keys always.

Other variants such as reduceByKey , aggregateByKey , foldByKey and combineByKey do some operation before Shuffle phase,
so these should be preferred over groupByKey.





Saturday, August 8, 2015

Spark Optimizations and Hacks.(Part 2)

Continued from : Spark Optimizations and Hacks.(Part 1)

1.       Use SPARK_LOCAL_DIRS  property in spark-env.sh
Used to store the excess data which cannot be fit in memory and at the time of execution, so it directly sends the data to the required executors.
It is still faster than reading directly from HDFS/Cassandra etc.
It is specified in Spark_env.sh file
Use SSD for SPARK_LOCAL_DIRS   as it is faster than RAID HDD.

2.       Prefer Cluster mode rather than Client mode with YARN
Client Mode: - With Shell
Cluster mode – without shell

Same reason for not parallelizing, the driver might not be able to handle then end result and might crash.
Also, for example if we started the execution from shell, and if the shell gets interrupted there is no way we can get the result.

3.       Use all the available executors for processing so that we can use the advantage of Data Locality on each node.
For example if we have 50 nodes and we start only 10 executors then the data from other 40 nodes needs to be shipped to these 40 executors which can be very costly and time consuming.
4.       Pipelining : When we can put more than one RDD in a single stage.(Same thread does many things.- obviously in sequence )
Eg : Map – Filter – Join
5.       Preserve Partitioning
In operations where we make changes to only values and not keys, for example making plural of every word (here value).

Here the key is hash partitioned and we did not make any changes, so to preserve the keys with same key with hash partition we use:
preservesPartitioning = true

6.       Use Broadcast Variable and Accumulators

•Broadcast variables –allows program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations.
 Like sending a large, read-only lookup table to all the nodes.

•Accumulators–allows to aggregate values from worker nodes back to the driver program. Can be used to count the no of errors seen in an RDD of lines spread across 100s of nodes. Only the driver can access the value of an accumulator, tasks cannot. For tasks, accumulators are write-only.
One way to avoid shuffle when performing join operation is to use broadcast variables.
When one of the dataset is small enough to fit in the memory of a single executor then it can be loaded into hash table of the driver and then broadcast to every executor and then the map transformation can reference the hash table to do the lookups.



References:
1.       http://spark.apache.org/
2.       Learning Spark - http://shop.oreilly.com/product/0636920028512.do

3.       YouTube video by Data Bricks Engineer Sameer Farooqui (BTW this is the most awesome video about spark available online right now)- https://www.youtube.com/watch?v=7ooZ4S7Ay6Y

Spark Optimizations and Hacks.(Part 1)

Apache Spark is one of the most active Big Data ecosystems project right now. So I thought about sharing my knowledge what I have gained in past 6 months of my experience on Apache Spark.

More on Apache Spark can be found on: http://spark.apache.org/
Here in this blog post I would like to mention few Optimization Techniques that we can use for making full use of the available resources such as memory, cores, etc. to full use.
1.       Loading data from external dataset rather than parallelizing.
As most users are aware, that there are two ways to create an RDD.
a)      Parallelize a collection
b)      Load from external dataset
The first option is not the preferred option as the data goes to the driver program and incase if the data is huge it might crash the driver JVM.
2.       Filter transformations gives the same number of Child RDD Partitions as the Parent RDD Partitions.
So after a filter transformation we can use Coalesce (n) to bring down the number of partitions to n partitions.
Note: Here n is the number of partitions.
3.       Collect action ships all the required data to Driver Program JVM
Do not call collect on something like 1TB of data, that will cause collect to come at driver and crash the driver. Instead either save to HDFS or sample the bigger RDD to smaller just to see the results while testing.

4.       Use Persist or Cache
When we are sure that the data is clean and might be required again and again.
Eg : Like after map or filter use cache to cache the data to memory.
This will reduce the re computation time for RDD.

5.       Narrow Transformation vs Wide Transformation
Narrow à Can happen in parallel, not dependent on other partitions.
Wide à  Where multiple child partitions may depend on it.

Always prefer Narrow transformation if possible.
  
Most of the transformations work element wise. Like map/filter works on each element and each partition.

But there are few transformation which work on per partition basis.

Eg  : Use Case can be opening a connection to a remote database and iterating each element and then closing the connection, instead of opening connection for each element.
6.       Oversubscribe
Always oversubscribe, for example if we have 5 cores on an executor, oversubscribe at least 1.5 times or 2 times. So here we can subscribe to probably 10 tasks.
Syntax:
Spark-shell –master local[10] //will start with 10 worker  / slots

Spark-shell –master local[*]  //as many slots as logical cores on the machine

More in Spark Optimizations and Hacks.(Part 2)

Thursday, October 9, 2014

Running Qlikview with HDFS as Data Source on Kerberos enabled Cloudera Hadoop CDH 4.7


In this post we will install Kerberos on Cloudera CDH 4.7
I am using the Cloudera Quick Start VM running on CentOS 6


Step 1 : 

For installing Kerberos Please check the following post by my friend.It is quite descriptive and easy to understand, so I won't be writing that again.



Step 2 :

On client (Here Windows server R2 , any Windows OS which supports Qlikview can  be used) .
Install QlikView and install MIT Kerberos for Windows from the following page



Copy the Keytab file  to Kerberos bin directory and  the Krb5.conf file to (Program Data folder is hidden)

C:\ProgramData\MIT\Kerberos5

Also , on Windows we have Krb5.ini file which needs to be modified a bit.

The .ini file should look something like this


[libdefaults]

    default_realm = BSIL.COM

    default_tkt_enctypes = rc4-hmac des3-cbc-sha1 des-cbc-crc des-cbc-md5

    default_tgs_enctypes = rc4-hmac des3-cbc-sha1 des-cbc-crc des-cbc-md5

    permitted_enctypes = rc4-hmac des3-cbc-sha1 des-cbc-crc des-cbc-md5



[realms]

    BSIL.COM = {

        kdc = 192.168.0.253

        admin_server = 192.168.0.253

       
    }

[domain_realms]

    .localhost.localdomain = BSIL.COM

    localhost.localdomain = BSIL.COM



Now go to C:\Program Files\MIT\Kerberos\bin
and initialise the configuration using 

C:\Program Files\MIT\Kerberos\bin>kinit.exe

It will ask for password , enter the password.



Now let us check using klist.exe 


So , we can see that ticket has been generated for user administrator@BSIL.COM


Step 3 : Install Cloudera ODBC Connector for Hive from Cloudera Downloads Page.

Install ODBC Driver.

Next, Let Us Create a DSN for the Connection to Hadoop Server

Go to  Control Panel\System and Security\Administrative Tools

Click on Data Sources
System DSN- Let us Modify the Default DSN with following settings.



Now let us check the connectivity 

It Shows Connection Successful.



Now , Let us open Qlikview and try to connect to Hive.



But Now as We see it gives the following error.

SQL##f - SqlState: S1000, ErrorCode: 38, ErrorMsg: [Cloudera][HiveODBC] (38) The gssapi DLL can not be loaded and it is needed for authenticating using Kerberos. Please ensure MIT Kerberos is installed.

Now to resolve this , we will copy  krb5_32.dll from Kerberos bin Directory to C:\Program Files\QlikView.

Let us restart Qlikview to reload the new .dll file.



Now , as we see we are able to connect from Qlikview as well.

Let us try to run a simple Hive Query.

Select id from default.test;





So,now we are able to call Hive running on a Kerberos Cluster  from Qlikview .


Wednesday, August 20, 2014

Big Data , Hadoop - Where do I start from ?

Lately many people have shown interest in Big Data as this is one of the hottest technology  in market right now.

But,still lot of them do not know exactly what it is or from where to start this journey.(I frequently get messages asking me  where to start from. )

So, In this post I would like to highlight few of the resources as well as tips on how to start your Big Data Journey.

I would suggest that the best notes are always the documentation, but sometimes this can be tricky for a beginner.

Initially I would suggest to buy this book , which is pretty good compared to so many books available now.

Hadoop In Action - by Chuck Lam

Buy Hadoop In Action - on Flipkart

I would suggest to read this at least 5 times. Yes , i mean it , every time you read this, you find something new, I am still reading this!!

Next would be to setup a pseudo node Hadoop cluster. Please go through the blog for other  posts regarding this.

Next step or simultaneously,go to IBM Big Data University free course.
They have lots of free resources , explained clearly , try not to rush through the course but , understand and complete each course one by one.

Go to IBM Big Data University

They also provide a free course completion certificate,if you get more than 60% in quiz at end of each course. If you want to see a sample of it , please visit my LinkedIn profile,find the link on the right center of this page.


Once you are thorough with the basics, Now is the time for some real Action.
Hortonworks provides the best tutorials for beginners as well as some real time implementations.

  Hortonworks Tutorial

Also , Hortonworks provide a VM for beginners. If your interest is only development , then this is for you.
Please go to hortonworks website and download the sandbox.

Other option would be Cloudera's  CDH 4.7 VM.

Next, you can read Hadoop the Definitive Guide- This is also a good book but I feel is this is a bit advanced , but for people who have followed the initial steps , this should be a cakewalk.

Once you have completed these basic steps , by this time you would probably already have idea of what next to do.

Do connect with  me for any doubts or just a general discussion or just to say Hi.