Friday, August 14, 2015

Spark Streaming WordCount Example


Spark Streaming : Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

For more visit :  Spark Streaming



We will write a basic Word Count Program which will be running on port 9999 using nc (netcat utility)

We are going to run spark in local mode by starting spark-shell

Let's get going.

Start the spark-shell

./bin/spark-shell

Import the StreamingCOntext

import org.apache.spark.streaming._

Create a new Spark Streaming context using the default avaiable SparkContext(sc)

val ssc = new StreamingContext(sc,Seconds(5))

// The time window is set to 5 seconds

Create a Dstream that will connect to localost on port 9999

val lines = new ssc.socketTextStream("localhost",9999)

flatMap is combination of Map and Flatten used individually. So line would get split on basis of white space.

val words = lines.flatMap(_.split(" "))

Now let us create pairs of each word with 1 

val pairs = words.map(word => (word,1))

using reduceByKey to find the final count of each word. Also check this to find when to use reduceByKey vs groupByKey .

val wordCounts = pairs.reduceByKey(_+_)

Next we will print each word and it's count.

wordCounts.print()


Notice that this would not print anything until we start the StreamingContext.

On a differnet termial start the nc -lk on port 9999 (using the netcat utility here)

nc -lk 9999



Add few lines on the terminal. In this example I have used the contents from this link .




Now let's start the ssc Context.

ssc.start()

ssc.awaitTermination()

//make sure that as soon as we start the StreamingContext , we would not be able to write this, we can forcefully stop this by pressing Ctrl+C (On Cluster mode this is not a problem)

O/P

Next : Coming Soon : Spark Streaming with Kafka.

Stay Tuned !!! ;)

Git and GitHub Tutorial

GitHub is a Web-based Git repository hosting service, which offers all of the distributed revision control and source code management (SCM) functionality of Git as well as adding its own features. 

Unlike Git, which is strictly a command-line tool, GitHub provides a Web-based graphical interface and desktop as well as mobile integration. It also provides access control and several collaboration features such as bug tracking, feature requests, task management, and wikis for every project.

Source : Github Wiki



Terms and Definitions.

SNAPSHOT/COMMIT
It commits a snapshot that you can refer back to.

CACHE(INDEX / STAGING AREA)
Place where all the active files are placed , monitored before committing

Git Repository 
Located in local folder staging , any folder/dir when initialized becomes git repository.

Create pair of Public / Private Keys

ssh-keygen -t rsa -b 4096 -C "your_email@example.com"

To check our public key works 
After Adding public key to github SSH Keys


Configure Git

git config --global user.name "Name"
git config --global user.email "Email@domain"

To check the config

git config --list 
Now  let's create a directory

mkdir gitproject  // master branch for our project 
cd gitproject
vim readme.txt
vim test.txt

git status  // tells what branch you are in , what files have changed, what files are tracked , hints on what to do next.

Now let's add the files to staging are 

git add .  // shortcut to add all the files in present directory

git rm --cached <filename> // to remove file from staging area

git commit -m "message for commit"

git log // check the commit log

git diff  // check the difference of updated file before adding to the staged area

git diff --cached //check the diff after adding to staged area

git log --oneline //only commit messages

git commit -a -m "message"  // adding files to staged area and committing in one line

git status -s  // short status 

Create a new repository on GitHub and copy the link
Eg(https://github.com/viprai91/SampleTutorial.git)

To push an existing repository to github 

git remote add origin "name of remote repo"

(Eg : git remote add origin https://github.com/viprai91/SampleTutorial.git)

git push origin master 

// origin is the name we gave in above step , master is the parent directory , also once added to remote , use the same command to push the changes

Branching - Allows to create a separate working copy of code

Merging - Allows to merge branches together

Cloning - Other developers can get copy of code.

Branching - 
git branch    // tells all the branches available for the repository

To create a new Branch

git checkout -b "branch_name"  // creates branch and sets new branch as working branch

or

git branch "name"  // creates branch with the name
eg: git branch r2_index

to change the branch (Note the current branch will have * before the name)

git checkout r2_index
 // Switched to branch 'r2_index'

// to clone the same repo for other dev on diff computer use clone
git clone sshlink

Merge
// to merge changes from branches to master 
git merge master // when we are in r2_index and need to merge to master

Pull
Pull syncs local repository with remote repo

git pull github master

Note: Always pull before push, so that we have the latest version of the code which might have been edited by someone else.

Monday, August 10, 2015

reduceByKey vs groupByKey

reduceByKey and groupByKey are both transformations on RDD in Apache Spark.

In this post we would try to answer few of the questions such as :

But which one to use ?

Which is more efficient ?

When to use reduceByKey and when to use groupByKey ?

To understand this we need to understand the underlying architecture of both the transformations.

The most important part of the execution stage is the Shuffle Phase.

So let us compare this for both reduceByKey and groupByKey

ReduceByKey Shuffle Step :

Data is combined so that at each partition there is at most one value for each key.
And then shuffle happens and it is sent over the network to some particular executor for some action such as reduce.





GroupByKey Shuffle Step :


It does not merge the values for the key but directly the shuffle step happens 
and  here lot of data gets sent to each partition, almost same as the initial data.

And the merging of values for each key happens after the shuffle step.
Here lot of data needs to be stored on final worker node (reducer) thereby resulting in out of memory issue.




Therefore we understand that :

ReduceByKey is more efficient when we run this on large data set.

GroupByKey can cause out of Disk Problem.


Why do we have GroupByKey then ?

Not every problem can be solved by GroupByKey.


Like we would not want to aggregate over same keys always.

Other variants such as reduceByKey , aggregateByKey , foldByKey and combineByKey do some operation before Shuffle phase,
so these should be preferred over groupByKey.





Saturday, August 8, 2015

Spark Optimizations and Hacks.(Part 2)

Continued from : Spark Optimizations and Hacks.(Part 1)

1.       Use SPARK_LOCAL_DIRS  property in spark-env.sh
Used to store the excess data which cannot be fit in memory and at the time of execution, so it directly sends the data to the required executors.
It is still faster than reading directly from HDFS/Cassandra etc.
It is specified in Spark_env.sh file
Use SSD for SPARK_LOCAL_DIRS   as it is faster than RAID HDD.

2.       Prefer Cluster mode rather than Client mode with YARN
Client Mode: - With Shell
Cluster mode – without shell

Same reason for not parallelizing, the driver might not be able to handle then end result and might crash.
Also, for example if we started the execution from shell, and if the shell gets interrupted there is no way we can get the result.

3.       Use all the available executors for processing so that we can use the advantage of Data Locality on each node.
For example if we have 50 nodes and we start only 10 executors then the data from other 40 nodes needs to be shipped to these 40 executors which can be very costly and time consuming.
4.       Pipelining : When we can put more than one RDD in a single stage.(Same thread does many things.- obviously in sequence )
Eg : Map – Filter – Join
5.       Preserve Partitioning
In operations where we make changes to only values and not keys, for example making plural of every word (here value).

Here the key is hash partitioned and we did not make any changes, so to preserve the keys with same key with hash partition we use:
preservesPartitioning = true

6.       Use Broadcast Variable and Accumulators

•Broadcast variables –allows program to efficiently send a large, read-only value to all the worker nodes for use in one or more Spark operations.
 Like sending a large, read-only lookup table to all the nodes.

•Accumulators–allows to aggregate values from worker nodes back to the driver program. Can be used to count the no of errors seen in an RDD of lines spread across 100s of nodes. Only the driver can access the value of an accumulator, tasks cannot. For tasks, accumulators are write-only.
One way to avoid shuffle when performing join operation is to use broadcast variables.
When one of the dataset is small enough to fit in the memory of a single executor then it can be loaded into hash table of the driver and then broadcast to every executor and then the map transformation can reference the hash table to do the lookups.



References:
1.       http://spark.apache.org/
2.       Learning Spark - http://shop.oreilly.com/product/0636920028512.do

3.       YouTube video by Data Bricks Engineer Sameer Farooqui (BTW this is the most awesome video about spark available online right now)- https://www.youtube.com/watch?v=7ooZ4S7Ay6Y

Spark Optimizations and Hacks.(Part 1)

Apache Spark is one of the most active Big Data ecosystems project right now. So I thought about sharing my knowledge what I have gained in past 6 months of my experience on Apache Spark.

More on Apache Spark can be found on: http://spark.apache.org/
Here in this blog post I would like to mention few Optimization Techniques that we can use for making full use of the available resources such as memory, cores, etc. to full use.
1.       Loading data from external dataset rather than parallelizing.
As most users are aware, that there are two ways to create an RDD.
a)      Parallelize a collection
b)      Load from external dataset
The first option is not the preferred option as the data goes to the driver program and incase if the data is huge it might crash the driver JVM.
2.       Filter transformations gives the same number of Child RDD Partitions as the Parent RDD Partitions.
So after a filter transformation we can use Coalesce (n) to bring down the number of partitions to n partitions.
Note: Here n is the number of partitions.
3.       Collect action ships all the required data to Driver Program JVM
Do not call collect on something like 1TB of data, that will cause collect to come at driver and crash the driver. Instead either save to HDFS or sample the bigger RDD to smaller just to see the results while testing.

4.       Use Persist or Cache
When we are sure that the data is clean and might be required again and again.
Eg : Like after map or filter use cache to cache the data to memory.
This will reduce the re computation time for RDD.

5.       Narrow Transformation vs Wide Transformation
Narrow à Can happen in parallel, not dependent on other partitions.
Wide à  Where multiple child partitions may depend on it.

Always prefer Narrow transformation if possible.
  
Most of the transformations work element wise. Like map/filter works on each element and each partition.

But there are few transformation which work on per partition basis.

Eg  : Use Case can be opening a connection to a remote database and iterating each element and then closing the connection, instead of opening connection for each element.
6.       Oversubscribe
Always oversubscribe, for example if we have 5 cores on an executor, oversubscribe at least 1.5 times or 2 times. So here we can subscribe to probably 10 tasks.
Syntax:
Spark-shell –master local[10] //will start with 10 worker  / slots

Spark-shell –master local[*]  //as many slots as logical cores on the machine

More in Spark Optimizations and Hacks.(Part 2)