Follow by Email

Tuesday, June 9, 2015

Simple Data analysis using apache spark

Apache spark is a framework for distributed computing. A typical Spark program runs parallel to many nodes in a cluster. For a detail and excellent introduction for spark please have a look into Apache spark website (https://spark.apache.org/documentation.html).

The purpose of this tutorial is to walk through a simple spark example, by setting the development environment and doing some simple analysis on a sample data file compose of userId, age, gender, profession, and zip code (you can download the source and the data file from Github https://github.com/rjilani/SimpleSparkAnalysis).

For the sake of this tutorial I will be using IntelliJ community IDE with Scala plugin; you can download the IntelliJ IDE and the plugin from IntelliJ website. You can also use your favorite editor or Scala IDE for Eclipse if you want to.  A preliminary understanding of Scala as well as Spark is expected.  The version of Scala used for this tutorial is 2.11.4 with Apache Spark 1.3.1.


Once you have installed the IntelliJ IDE and Scala plugin, please go ahead and start a new Project using File->New->Project wizard and then choose Scala and  SBT from the New Project Window Wizard. At this stage if this is your first time to create a project, you may have to choose a Java project SDK, a Scala and SBT version.






Once the project created please copy and pastes the following lines to your SBT file

name := "SparkSimpleTest"
version := "1.0"
scalaVersion := "2.11.4"
libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.3.1",

    "org.apache.spark" %% "spark-sql" % "1.3.1", "org.apache.spark" %% "spark-streaming" % "1.3.1")

Note: you don’t need to have spark SQL and spark streaming library to finish this tutorial, but add it any way in case you have to use spark SQL and streaming for future examples.  Once you save SBT file, IntelliJ will ask you to refresh, and once you hit refresh it will download all the required dependencies. Once all the dependencies are downloaded you are ready for fun stuff.  



Go ahead and add a new Scala class of type Object (without going deep into the Scala semantics, it means your class will be executable with a main method inside it).


For sake of brevity I would also omit the boiler plate code in this tutorial (you can download the full source file from Github https://github.com/rjilani/SimpleSparkAnalysis).  Now let’s jump into the code, but before proceed further lets cut the verbosity by turning off the spark logging using these two lines at the beginning of the code:


Logger.getLogger("org").setLevel(Level.OFF)

Logger.getLogger("akka").setLevel(Level.OFF)

Now add the following two lines:

val conf = new SparkConf().setAppName("Simple Application").setMaster("local")

val sc = new SparkContext(conf)
 
The line above is a boiler plate code for creating a spark context by passing the configuration information to spark context. In spark programming model every application runs in spark context; you can think of spark context as an entry point to interact with Spark execution engine. The configuration object above tells Spark where to execute the spark jobs (in this case the local machine). For a detail explanation of configuration options please refers Spark documentation on spark website.

Spark is written in Scala, and exploit the functional programming paradigm, so writing map and reduce kind of jobs becomes very natural and intuitive.  Spark exposes many transformation functions and the detail explanation of these functions can be found on spark website (https://spark.apache.org/docs/latest/programming-guide.html#transformations).


Now get back to fun stuff (real coding):

val data = sc.textFile("./Data/u.user") //Location of the data file

  .map(line => line.split(","))

  .map(userRecord => (userRecord(0),

  userRecord(1), userRecord(2),userRecord(3),userRecord(4)))
 


The code above is reading a comma delimited text file composed of user’s records, and chaining the two transformations using the map function.  The first map function takes a closure and split the data file in lines using a “,” delimiter.  The second map function takes each line record and creating an RDD (resilient distributed dataset) of Scala tuples. Think of Scala tuples as an immutable list that can hold different type of objects. 

Every spark RDD object exposes a collect method that returns an array of object, so if you want to understand what is going on, you can iterate the whole RDD as an array of tuples by using the code below:

data.collect().foreach(println)

//Data file is transformed in Array of tuples at this point
(1,24,M,technician,85711)
(2,53,F,other,94043)
(3,23,M,writer,32067)
………

The whole fun of using Spark is to do some analysis on Big Data (no buzz intended).  So let’s ask some questions to do the real analysis.

1.       How many unique professions we have in the data file

//Number of unique professions in the data file

val uniqueProfessions = data.map {case (id, age, gender, profession,zipcode) => profession}.distinct().count()

Look at the code above, that is all it takes to find the unique professions in the whole data set, isn't it amazing. Here is the explanation of the code:

The map function is again an example of the transformation, the parameter passed to map function is a case class (see Scala case classes) that returns the attribute profession for the whole RDD in the data set, and then we call the distinct and count function on the RDD .

2.       How many different users belongs to a unique professions 

//Group users by profession and sort them by descending order

val usersByProfession = data.map{ case (id, age, gender, profession,zipcode) => (profession, 1) }
.reduceByKey(_ + _).sortBy(-_._2)

The map function is again an example of the transformation, the parameter passed to map function is a case class (see Scala case classes) that returns a tuple of profession and integer 1, that is further reduce by “reduceByKey” function in unique tuples and sum of all the values related to the unique tuple.

 “SortBy” function is a way to sort the RDD by passing a closure, that takes a tuple as an input and sort the RDD on the basis of second element of tuple (in our case it is the sum of all the unique values of the professions). A “-“sign in front of the closure is a way to tell “sortBy” to sort the value in descending order.

You can print the list of  professions and their count using the line below

usersByProfession.collect().foreach(println)

(student,196)
(other,105)
(educator,95)
(administrator,79)
...........

3.       How many users belongs to a unique zip code in the sample file

//Group users by zip code and sort them by descending order

val usersByZipCode = data.map{ case (id, age, gender, profession,zipcode) => (zipcode, 1) }
.reduceByKey(_ + _).sortBy(-_._2)

(55414,9)
(55105,6)
(55337,5)
(20009,5)
(10003,5)
(55454,4)
........................

4.       How many users are male and Female

 //Group users by Gender and sort them by descending order

val usersByGender = data.map{ case (id, age, gender, profession,zipcode) => (gender, 1) }
.reduceByKey(_ + _).sortBy(-_._2)


(M,670)
(F,273)



The item 3 and 4 uses the same pattern as item 2, the only difference is that map functions returns the tuple of zip code and gender that is further reduce by "reduceByKey" function.

I hope the above tutorial is easy to digest, if not please hang in there, brush up you Scala skills and then review the code again, Hope with the time and practice  you will find the code much easier to understand. 

Sunday, February 26, 2012

Closure in Groovy

The purpose of this article is to go smoothly with some of the main features offered by Groovy and explained closure which is one of the biggest hurdle to understand Groovy for non functional programmers. I expect that readers of this article have rudimentary understanding of the groovy syntax; If not I would recommend to check the official Groovy website (http://groovy.codehaus.org/). Unlike Java which is a statically typed OO/imperative language, Groovy is a dynamically typed hybrid programming language that supports both Imperative and Functional paradigm.

One of the strongest case for Groovy (for Java programmers) VS Python and Ruby is that Groovy syntax is super set of Java (almost with some quirks), and Groovy integration with Java is seamless. Note: once you become more familiar with the Groovy semantics e.g. dynamic and duck typing, functional paradigms etc, and go skin deep you will realize that Java and Groovy are quite different languages that serve the different needs. Understanding the semantic differences between the two paradigms could be quite overwhelming and the best way to grasp these concepts is get your hand dirty with the coding. Before we deep dive into coding let’s talk about some of the main concepts of Groovy.  Below is the list of features that summarize Groovy for Java developers:

  • Just like Java Groovy compiles as Java byte code
  • Unlike Java, Groovy can be utilized as a scripting language too
  • Groovy enhance Java API with Groovy JDK (http://groovy.codehaus.org/groovy-jdk/), besides this Groovy provides its own API (http://groovy.codehaus.org/gapi/) too that has tons of handy features
  • The most important feature that differentiates Groovy from Java is that besides being Imperative Groovy also supports functional paradigm. What it means is that in Groovy universe “Functions” are first class citizens and they can be treated as a regular data types. Hence Groovy supports the notion of Closure that are the heart of any “Functional” language
  • Groovy has very powerful MOP (Meta Object Protocols) facility

Enough theory so let’s jump into closure with one thing in mind, think of closure as regular functions on steroid:

 def a = {param -> println param} // a closure in groovy  

In the code above we define a variable called “a” and assigned it to a closure. In Groovy closures are defines just like any other function with curly braces. Anything before “->” are the method parameters and anything after “->” are method body. Note: a method body can be an expression as well as statements. The biggest hurdle to understand the closure is how to use this closure. Note: Groovy supports closure natively and closures can be used as a regular data type in Groovy universe. It means you can pass and return closure just like any other data type to and from a method. The code below shows a typical use of the closure defined above:

 5.times (a) //Calling the above closure with built in times function in Groovy  

Output:    0
               1
               2
               3
               4

In the code above “times” is a built in Groovy method exposed by integer object that accepts a closure and calls that closure 5 times. The trick to understand closure lies in how calling method in this case “times” calls the closure. In the case above “times” method executes 5 times, it means it calls the closure {param -> println param}  5 times, each time passing a parameter from 0 to 4. We will further explain the calling part later when we will code a custom method that accepts a closure and call the closure defined above. Since Groovy is all about cutting verbosity, it has a special short hand notation in case of closures having a single parameter. So instead of the code below:

 def a = {param -> println param}  

we could have defined the closure above like this

 def a = {println it}   

Note: the first parameter is implicit and can be accessed as “it” in the method body.

In functional languages many times closure can also be used as Lambda’s aka anonymous functions, and Groovy provides a very succinct syntax to define and call closure at the same time. So instead of first defining the closure and then passing it to times, we could have just written something like this:

 5.times {println it} //defining and passing the closure to the times method at the same time  

In order to understand how the closure is used and called from inside the calling method, let’s write our own custom function called “loop” that mimics “times” method above, and see how we can pass the closure to our custom method below:

 def loop(clsr) {  //a Groovy function that accepts a closure as a parameter   
    for (int i =0; i<5; i++)   
    clsr.call(i)  //calling a closure "clsr" with parameter i in a for loop  
 }  

Note: In the method above “clsr” is never defined as of type closure, but since Groovy is a dynamic language at run time the groovy binds the formal parameter to the supplied actual parameter which in our case is of type closure. 

Here are the ways we can call the method “loop” above which takes closure as an input parameter:

 def a = {param -> println param}   
 Loop (a)  
 def b = {println it}   
 loop (b)   
 loop {println it} // calling as anonymous function  

In rest of the article I show the reader the heavy use of closure in the GDK. The area of GDK cover will be collections, files, xml and sql.

Collections:


Code below shows the use of closure in List and HashMap:

 def list = [1,2,3,4,5,2] // Defines a list via Groovy literal syntax  
 list.find {value -> value == 2} // Call list “find” method with a closure   

or short hand notation

 list.find{it == 2}  

returns a list of [2,2]

 list.findAll{it % 2 == 0} // Call the “findAll” method of the list  

returns a list of all even numbers [2,4,2]

 list.findAll{it % 2 == 1} // that is how you return a list of odd numbers [1,3,5]  

So in the code above you see the sheer power of closure, where the findAll() method is coded in abstraction and the value return by the method depends on the caller passing the appropriate closure. Doesn’t it look like the command or strategy pattern without all the interfaces and contracts you have to adhere with?

 def hashmap = [a:1,b:2,c:3] //Defines a HashMap   
 hashmap.each {k,v -> println k println v}    

In the code above the each() method of hash map accepts a closure with 2  parameters in this case key and value (k,v) and returns the output: a 1 b 2 c 3.

File I/O:

The code below shows a typical use of closure in Groovy file object

 def file = new File (/C:\test.txt/)   
 file.eachLine{line,no -> println "${no}: ${line}" }   

The code above creates a file object and then passes a closure with 2 parameters (line and no) to eachLine()  method. The eachLine() method of file will call back the supplied closure with each line and a line number.

Another file example:

 def dir = new File (/C:\Users\xxxx\Documents/)  
 dir.eachFileRecurse() {println it}   

The code above creates a file object an then passes a closure to method eachFilerecurse(). The eachFilerecurse() callbacks the closure with all the files under a directory recursively. 

XML:

The code below shows how Groovy XML Parser and Node object utilizes the closure to print each node

 def xml = """  
 <Department type="Java Practice">  
  <employee>Rashid Jilani</employee>  
  <employee>Albert Yeghikian</employee>  
  <employee>David Wetzel</employee>  
 </Department>  
 """  
 def emp = new XmlParser().parseText(xml) //Returns a Node  
 println "type = ${emp.attribute("type")}"  
 emp.employee.each{ println it.text() }  
In the code above emp is a Node that  holds a list of employee that accepts a closure to print all the nodes under employee.

SQL:

The code below shows the typical use of closures in Groovy sql libraries.

 import groovy.sql.Sql  
 def sql = Sql.newInstance("jdbc:mysql://localhost:3306/sakila", "root",  
       "password", "com.mysql.jdbc.Driver")  
 new File(/C:\Users\rjilani\Documents\actor.txt/).withWriter { file ->   
 sql.eachRow("select * from actor") { println it.first_name + ' ' + it.last_name  
    file.writeLine(it.first_name + ' ' + it.last_name)  
   }  
 }  

In the code above withWriter() method accepts a closure and call back the closure with the file object that could be later used to write objects back to the file. Later eachRow() method of sql object accepts a query string and a closure, and call back the closure with each row of the returned resultset.

Hope you now have a better idea what a closure is and how to define code and call a closure. In my next blog I try to cover MOP features in Groovy.