Saturday 14 May 2016

Integrating Neo4j with Spark Streaming

Neo4j + Spark Streaming


I am happy to share my first experience with integrating Neo4J with Spark, though it is not efficient, this will help the newbies to create their own Graph data set based on Twitter Tweets. 

For more details check out my Spark exploration @ https://github.com/Mageswaran1989/aja/




Thursday 26 November 2015

NorthWindDataSet exploration in SparkSQL DataFrames

Example usage of DataFRames in exploring NorthWind dataset!

For latest updates check out my Apache Spark exploration @ https://github.com/Mageswaran1989/aja/


package org.aja.tej.examples.sparksql

import org.aja.tej.utils.TejUtils
import org.apache.spark.sql.SQLContext

/**
 * Created by mageswaran on 18/10/15.
 *
 * Dataset: Url: https://northwinddatabase.codeplex.com/releases/view/71634
 */

/*
Lets take one record from the NW-Employees-NoHdr.csv and refer it for creating case calss
1,Fuller,Andrew,Sales Representative,12/6/48,4/29/92,Seattle,WA,98122,USA,2

Note: case classes will have "extractors" created by the compiler, which is very usefull in the match...case{}
*/

case class Employee(EmployeeId: Int, LastName : String, FirstName : String, Title : String,
                    BirthDate : String, HireDate : String,
                    City : String, State : String, Zip : String, Country : String,
                    ReportsTo : String)
/*
Lets take one record from NW-Orders-NoHdr.csv and refer it for creating case calss
10248,VINET,5,7/2/96,France
 */
case class Orders(OrderID: String, CustomerID: String, EmployeeID: String,
                   OrderData: String, ShipCountry: String)

/*
Lets take one record from NW-Order-Details-NoHdr.csv and refer it for creating case calss
10248,11,14,12,0
 */
case class OrderDetails(OrderID : String, ProductID : String, UnitPrice : Float,
                        Qty : Int, Discount : Float)

object NorthWindDataset {

  def main(args: Array[String]) {

    val sc = TejUtils.getSparkContext(this.getClass.getSimpleName)

    //Enable this to know the back end processing stages
    //Don't be afraid of the logs, you would see some code been generated for the SQL query!
    //That's how SQL queries are been executed across the Spark nodes.
    //Arbitrary code generation for given SQL query based on Spark Catalyst optimizer.

    //sc.setLogLevel("ALL")

    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    val nwEmployeeRecord = sc.textFile("data/northwind/NW-Employees-NoHdr.csv")
    println("NW-Employees-NoHdr.csv file has " + nwEmployeeRecord.count() + " lines")
    //NW-Employees-NoHdr.csv file has 9 lines

    //single file no need to use flatMap()
    val employees = nwEmployeeRecord.map(_.split(","))
    .map(e => Employee(e(0).trim.toInt, e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10)))
    .toDF //Converts the RDD to DataFrame, which comes from last import
    employees.registerTempTable("Employees")
    println("Number of employees : " + employees.count()) //Number of employees : 9
    println("\n******************************************************************\n")

    //Run the SQL query to select all the employees from the table
    var result = sqlContext.sql("SELECT * FROM Employees")
    result.foreach(println)
    println("\n******************************************************************\n")
//    [1,Fuller,Andrew,Sales Representative,12/6/48,4/29/92,Seattle,WA,98122,USA,2]
//    [6,Suyama,Michael,Sales Representative,6/30/63,10/15/93,London,,EC2 7JR,UK,5]
//    [7,King,Robert,Sales Representative,5/27/60,12/31/93,London,,RG1 9SP,UK,5]
//    [2,Davolio,Nancy,"Vice President, Sales",2/17/52,8/12/92,Tacoma,WA,98401,USA]
//    [8,Callahan,Laura,Inside Sales Coordinator,1/7/58,3/3/94,Seattle,WA,98105,USA,2]
//    [3,Leverling,Janet,Sales Representative,8/28/63,3/30/92,Kirkland,WA,98033,USA,2]
//    [9,Buchanan,Steven,Sales Representative,1/25/66,11/13/94,London,,WG2 7LT,UK,5]
//    [4,Peacock,Margaret,Sales Representative,9/17/37,5/1/93,Redmond,WA,98052,USA,2]
//    [5,Dodsworth,Anne,Sales Manager,3/2/55,10/15/93,London,,SW1 8JR,UK,2]


    //Select employees from 'WA' only
    result = sqlContext.sql(" SELECT * FROM Employees WHERE State = 'WA'")
    result.foreach(println)
    println("\n******************************************************************\n")
//     [1,Fuller,Andrew,Sales Representative,12/6/48,4/29/92,Seattle,WA,98122,USA,2]
//    [8,Callahan,Laura,Inside Sales Coordinator,1/7/58,3/3/94,Seattle,WA,98105,USA,2]
//    [3,Leverling,Janet,Sales Representative,8/28/63,3/30/92,Kirkland,WA,98033,USA,2]
//    [4,Peacock,Margaret,Sales Representative,9/17/37,5/1/93,Redmond,WA,98052,USA,2]

    val orders = sc.textFile("data/northwind/NW-Orders-NoHdr.csv")
                   .map(_.split(","))
                   .map(o => Orders(o(0), o(1), o(2), o(3), o(4)))
                   .toDF
    orders.registerTempTable("Orders")
    println("Number of orders : " + orders.count) //Number of orders : 830
    println("\n******************************************************************\n")

    val orderDetails = sc.textFile("data/northwind/NW-Order-Details-NoHdr.csv")
                         .map(_.split(","))
                         .map(e => OrderDetails( e(0), e(1), e(2).trim.toFloat,e(3).trim.toInt, e(4).trim.toFloat ))
                         .toDF
    orderDetails.registerTempTable("OrderDetails")
    println("Number of OrderDetails : " + orderDetails.count)
    sqlContext.sql("SELECT * FROM OrderDetails").take(10).foreach(println)
    println("\n******************************************************************\n")
//    Number of OrderDetails : 2155
//      [10248,11,14.0,12,0.0]
//    [10248,42,9.8,10,0.0]
//    [10248,72,34.8,5,0.0]
//    [10249,14,18.6,9,0.0]
//    [10249,51,42.4,40,0.0]
//    [10250,41,7.7,10,0.0]
//    [10250,51,42.4,35,0.15]
//    [10250,65,16.8,15,0.15]
//    [10251,22,16.8,6,0.05]
//    [10251,57,15.6,15,0.05]

    result = sqlContext.sql("SELECT OrderDetails.OrderID,ShipCountry,UnitPrice,Qty,Discount FROM Orders " +
      "INNER JOIN OrderDetails ON Orders.OrderID = OrderDetails.OrderID")
    //result.take(10).foreach(println)
    result.take(10).foreach(e=>println("%s | %15s | %5.2f | %d | %5.2f |".format(e(0),e(1),e(2),e(3),e(4))))
    println("\n******************************************************************\n")
//      10315 |              UK | 11.20 | 14 |  0.00 |
//      10315 |              UK | 12.00 | 30 |  0.00 |
//      10766 |         Germany | 19.00 | 40 |  0.00 |
//      10766 |         Germany | 30.00 | 35 |  0.00 |
//      10766 |         Germany | 12.50 | 40 |  0.00 |
//      10810 |          Canada |  6.00 | 7 |  0.00 |
//      10810 |          Canada | 14.00 | 5 |  0.00 |
//      10810 |          Canada | 15.00 | 5 |  0.00 |
//      10928 |           Spain |  9.50 | 5 |  0.00 |
//      10928 |           Spain | 18.00 | 5 |  0.00 |

    result = sqlContext.sql("SELECT ShipCountry, Sum(OrderDetails.UnitPrice * Qty * Discount) AS ProductSales FROM " +
      "Orders INNER JOIN OrderDetails ON Orders.OrderID = OrderDetails.OrderID GROUP BY ShipCountry")
    result.take(10).foreach(println)
    // Need to try this
    // println(result.take(30).mkString(" | "))
    // result.take(30).foreach(e -> ... mkString(" | ")
    // probably this would work
    result.take(30).foreach(e=>println("%15s | %9.2f |".format(e(0),e(1))))

//       Mexico  |    491.37 |
//      Denmark  |   2121.23 |
//      Portugal |    996.29 |
//      Poland   |      0.00 |
//      Norway   |      0.00 |
//      Canada   |   5137.81 |
//      UK       |   1645.20 |
//      Belgium  |   1310.13 |
//      Brazil   |   8029.76 |
//      Ireland  |   7337.49 |
//      USA      |  17982.37 |
//      Argentina |      0.00 |
//      Sweden   |   5028.56 |
//      France   |   4140.44 |
//      Germany  |  14356.00 |
//      Spain    |   1448.69 |
//      Finland  |    968.40 |
//      Venezuela |   4004.26 |
//      Austria  |  11492.79 |
//      Switzerland |   1226.84 |
//      04876-786' |     12.94 |
//      Italy    |    935.00 |


  }
}

!-End-!

Wednesday 25 November 2015

L-BGFS in Spark

package org.apache.spark.mllib.optimization

LBFGS

Study materials:


Mathematics:

Aja Notes

Jacobian Matrix -> Hessian Matrix -> BGFS -> L-BGFS


In class LBFGS CachedDiffFunction is used, 

CachedDiffFunction vs DiffFunction
  • DiffFunctions per se don't have any caching whereas CachedDiffFunction is a simple wrapper around any DiffFunction that caches the last input and its result.
  • In case the CachedDiffFunction gets called with the same input, it returns the result of the previous evaluation of the inner DiffFunction instead of re-evaluating.
  • Especially in the various Minimizer implementations, DiffFunctions can get called multiple times with the same input, so in that case CachedDiffFunctions help a lot
  • if the evaluation of your goal function is expensive.

Tuesday 24 November 2015

Spark HashPartitioner Example

For latest updates check out my Apache Spark exploration @ https://github.com/Mageswaran1989/aja/

package org.aja.tej.examples.spark.rdd

import org.aja.tej.utils.TejUtils
import org.apache.spark.{HashPartitioner, SparkContext}

/**
 * Created by mageswaran on 21/11/15.
 */
/*
RDD is distributed this means it is split on some number of parts. Each of this partitions is potentially on different
machine. Hash partitioner with arument numPartitions choses on what partition to place pair (key, value)
in following way:
1. Creates exactly numPartitions partitions.
2. Places (key, value) in partition with number Hash(key) % numPartition

 */
object HashPartitionerExample extends App {

  def useCases(sc: SparkContext): Unit = {

    println(this.getClass.getSimpleName) //HashPartitionExample$

    val rddData = sc.parallelize(for {
      x <- 1 to 3
      y <- 1 to 2
    } yield (x, None), 8) //(key, value) with 8 partitions

    println("rddData partitons: " + rddData.partitions.length) //rdd partitons: 8
    println("rddData contents: " + rddData.collect.foreach(print)) // (1,None)(1,None)(2,None)(2,None)(3,None)(3,None)
    println("rddData partitioner: " + rddData.partitioner) //rddData partitioner: None


    //mapPartitions U -> T of iterator
    val numElementsPerPartition = rddData.mapPartitions(iter => Iterator(iter.length))

    println("numElementsPerPartition : " + numElementsPerPartition.collect().foreach(print)) //0 1 1 1 0 1 1 1
    //so 2 of 8 partition is empty

    println("----------------------------------------------------------")

    val rddDataWith1Partition = rddData.partitionBy(new HashPartitioner(1))
    println("rddDataWith1Partition partitons: " + rddDataWith1Partition.partitions.length) //rddDataWith1Partition partitons: 1
    println("rddDataWith1Partition contents: " + rddDataWith1Partition.collect.foreach(print))
    //(1,None)(1,None)(2,None)(2,None)(3,None)(3,None)rddDataWith1Partition conten
    //Since we have only one partition it contains all elements

    println("numElementsPerPartition : " +
      rddDataWith1Partition.mapPartitions(iter => Iterator(iter.length)).collect().foreach(println)) //6

    println("----------------------------------------------------------")

    val rddDataWith2Partition = rddData.partitionBy(new HashPartitioner(2))
    println("rddDataWith2Partition partitons: " + rddDataWith2Partition.partitions.length) //rddDataWith1Partition partitons: 1
    println("rddDataWith2Partition contents: " + rddDataWith2Partition.collect.foreach(print))
    //(2,None)(2,None)(1,None)(1,None)(3,None)(3,None)
    ////i.e partitionBy -> ShuffleRDD -> partition for current element is selected by (k.hashcode % numPartition)

    println("numElementsPerPartition : " +
      rddDataWith2Partition.mapPartitions(iter => Iterator(iter.length)).collect().foreach(println)) //2 4
    //Since rdd is partitioned by key data won't be distributed uniformly anymore:
    //Lets see why?
    //Because with have three keys and only two different values of hashCode mod numPartitions(2) there is nothing unexpected
    println("(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2)) : " + (1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2)))
    //Vector((1,1,1), (2,2,0), (3,3,1))

    //Lets confirm in our rdd
    println("Keys in each partition" + rddDataWith2Partition.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect.foreach(println))
    //Set(2)
    //Set(1,3)

    println("----------------------------------------------------------")

    val rddDataWith10Partition = rddData.partitionBy(new HashPartitioner(10))
    println("rddDataWith10Partition partitons: " + rddDataWith10Partition.partitions.length) //rddDataWith1Partition partitons: 10
    println("rddDataWith2Partition contents: " + rddDataWith10Partition.collect.foreach(print))
    //(1,None)(1,None)(2,None)(2,None)(3,None)(3,None)
    ////i.e partitionBy -> ShuffleRDD -> partition for current element is selected by (k.hashcode % numPartition)

    println("numElementsPerPartition : " +
      rddDataWith10Partition.mapPartitions(iter => Iterator(iter.length)).collect().foreach(println))
    //0   2     2    2    0    0    0    0    0    0

    println("(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 10)) : " + (1 to 10).map((k: Int) => (k, k.hashCode, k.hashCode % 10)))
    // Vector((1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5), (6,6,6), (7,7,7), (8,8,8), (9,9,9), (10,10,0))

    //Lets confirm in our rdd
    println("Keys in each partition" + rddDataWith10Partition.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect.foreach(println))
    //Set()
    //Set(1)
    //Set(2)
    //Set(3)
    //Set()
    //Set()
    //Set()
    //Set()
    //Set()
    //Set()
  }

  useCases(TejUtils.getSparkContext(this.getClass.getSimpleName))
}


Scala XML Preview

For latest updates check out my Apache Spark exploration @ https://github.com/Mageswaran1989/aja/

Following is extract of Scala worksheet, from Aja git!

import scala.xml._                                                              //|import scala.xml._
                                                                                
val someXMLString = """                                                         //|someXMLString: String =
<Aja>                                                                           //|<Aja>
<Topics> Scala Spark NeuralNetwork </Topics>                                    //|<Topics> Scala Spark NeuralNetwork </Topics>
<Examples>                                                                      //|<Examples>
<example>Tej</example>                                                          //|<example>Tej</example>
<example>Tantra</example>                                                       //|<example>Tantra</example>
<example>Dhira</example>                                                        //|<example>Dhira</example>
</Examples>                                                                     //|</Examples>
</Aja>                                                                          //|</Aja>
"""                                                                             
                                                                                
val someXML = XML.loadString(someXMLString)                                     //|someXML: scala.xml.Elem = <Aja>
someXML.getClass                                                                //|<Topics> Scala Spark NeuralNetwork </Topics>
                                                                                //|<Examples>
val someXML1 =                                                                  //|<example>Tej</example>
<Aja>                                                                           //|<example>Tantra</example>
<Topics> Scala Spark NeuralNetwork </Topics>                                    //|<example>Dhira</example>
<Examples>                                                                      //|</Examples>
<example>Tej</example>                                                          //|</Aja>
<example>Tantra</example>                                                       //|res0: Class[?0] = class scala.xml.Elem
<example>Dhira</example>                                                        
</Examples>                                                                     //|someXML1: scala.xml.Elem = <Aja>
</Aja>                                                                          //|<Topics> Scala Spark NeuralNetwork </Topics>
                                                                                //|<Examples>
someXML1.getClass                                                               //|<example>Tej</example>
                                                                                //|<example>Tantra</example>
                                                                                //|<example>Dhira</example>
println("//////////////////////////////////////////////////////")               //|</Examples>
                                                                                //|</Aja>
someXML \ "Topics"                                                              
(someXML \ "Topics").text                                                       
someXML \ "Examples"                                                            //|res1: Class[?0] = class scala.xml.Elem
                                                                                
someXML \ "example" //no child elements                                         
someXML \\ "example"                                                            //|//////////////////////////////////////////////////////                                                                                //|res2: Unit = ()
                                                                                                                                                                //|res3: scala.xml.NodeSeq = <Topics> Scala Spark NeuralNetwork </Topics>
                                                                                //|res4: String =  Scala Spark NeuralNetwork 
                                                                                //|res5: scala.xml.NodeSeq = <Examples>
                                                                                //|  <example>Tej</example>
                                                                                //|  <example>Tantra</example>
                                                                                //|  <example>Dhira</example>
                                                                                //|  </Examples>
                                                                                                                                                                //|res6: scala.xml.NodeSeq = 
                                                                                                                                                                //|res7: scala.xml.NodeSeq = 
                                                                                


Spark GroupBy Explained

For latest updates check out my Apache Spark exploration @ https://github.com/Mageswaran1989/aja/

Spark Artificial Neural Network Example

For latest updates check out my Apache Spark exploration @ https://github.com/Mageswaran1989/aja/