Example usage of DataFRames in exploring NorthWind dataset!
For latest updates check out my Apache Spark exploration @ https://github.com/Mageswaran1989/aja/
!-End-!
For latest updates check out my Apache Spark exploration @ https://github.com/Mageswaran1989/aja/
package org.aja.tej.examples.sparksql import org.aja.tej.utils.TejUtils import org.apache.spark.sql.SQLContext /** * Created by mageswaran on 18/10/15. * * Dataset: Url: https://northwinddatabase.codeplex.com/releases/view/71634 */ /* Lets take one record from the NW-Employees-NoHdr.csv and refer it for creating case calss 1,Fuller,Andrew,Sales Representative,12/6/48,4/29/92,Seattle,WA,98122,USA,2 Note: case classes will have "extractors" created by the compiler, which is very usefull in the match...case{} */ case class Employee(EmployeeId: Int, LastName : String, FirstName : String, Title : String, BirthDate : String, HireDate : String, City : String, State : String, Zip : String, Country : String, ReportsTo : String) /* Lets take one record from NW-Orders-NoHdr.csv and refer it for creating case calss 10248,VINET,5,7/2/96,France */ case class Orders(OrderID: String, CustomerID: String, EmployeeID: String, OrderData: String, ShipCountry: String) /* Lets take one record from NW-Order-Details-NoHdr.csv and refer it for creating case calss 10248,11,14,12,0 */ case class OrderDetails(OrderID : String, ProductID : String, UnitPrice : Float, Qty : Int, Discount : Float) object NorthWindDataset { def main(args: Array[String]) { val sc = TejUtils.getSparkContext(this.getClass.getSimpleName) //Enable this to know the back end processing stages //Don't be afraid of the logs, you would see some code been generated for the SQL query! //That's how SQL queries are been executed across the Spark nodes. //Arbitrary code generation for given SQL query based on Spark Catalyst optimizer. //sc.setLogLevel("ALL") val sqlContext = new SQLContext(sc) import sqlContext.implicits._ val nwEmployeeRecord = sc.textFile("data/northwind/NW-Employees-NoHdr.csv") println("NW-Employees-NoHdr.csv file has " + nwEmployeeRecord.count() + " lines") //NW-Employees-NoHdr.csv file has 9 lines //single file no need to use flatMap() val employees = nwEmployeeRecord.map(_.split(",")) .map(e => Employee(e(0).trim.toInt, e(1), e(2), e(3), e(4), e(5), e(6), e(7), e(8), e(9), e(10))) .toDF //Converts the RDD to DataFrame, which comes from last import employees.registerTempTable("Employees") println("Number of employees : " + employees.count()) //Number of employees : 9 println("\n******************************************************************\n") //Run the SQL query to select all the employees from the table var result = sqlContext.sql("SELECT * FROM Employees") result.foreach(println) println("\n******************************************************************\n") // [1,Fuller,Andrew,Sales Representative,12/6/48,4/29/92,Seattle,WA,98122,USA,2] // [6,Suyama,Michael,Sales Representative,6/30/63,10/15/93,London,,EC2 7JR,UK,5] // [7,King,Robert,Sales Representative,5/27/60,12/31/93,London,,RG1 9SP,UK,5] // [2,Davolio,Nancy,"Vice President, Sales",2/17/52,8/12/92,Tacoma,WA,98401,USA] // [8,Callahan,Laura,Inside Sales Coordinator,1/7/58,3/3/94,Seattle,WA,98105,USA,2] // [3,Leverling,Janet,Sales Representative,8/28/63,3/30/92,Kirkland,WA,98033,USA,2] // [9,Buchanan,Steven,Sales Representative,1/25/66,11/13/94,London,,WG2 7LT,UK,5] // [4,Peacock,Margaret,Sales Representative,9/17/37,5/1/93,Redmond,WA,98052,USA,2] // [5,Dodsworth,Anne,Sales Manager,3/2/55,10/15/93,London,,SW1 8JR,UK,2] //Select employees from 'WA' only result = sqlContext.sql(" SELECT * FROM Employees WHERE State = 'WA'") result.foreach(println) println("\n******************************************************************\n") // [1,Fuller,Andrew,Sales Representative,12/6/48,4/29/92,Seattle,WA,98122,USA,2] // [8,Callahan,Laura,Inside Sales Coordinator,1/7/58,3/3/94,Seattle,WA,98105,USA,2] // [3,Leverling,Janet,Sales Representative,8/28/63,3/30/92,Kirkland,WA,98033,USA,2] // [4,Peacock,Margaret,Sales Representative,9/17/37,5/1/93,Redmond,WA,98052,USA,2] val orders = sc.textFile("data/northwind/NW-Orders-NoHdr.csv") .map(_.split(",")) .map(o => Orders(o(0), o(1), o(2), o(3), o(4))) .toDF orders.registerTempTable("Orders") println("Number of orders : " + orders.count) //Number of orders : 830 println("\n******************************************************************\n") val orderDetails = sc.textFile("data/northwind/NW-Order-Details-NoHdr.csv") .map(_.split(",")) .map(e => OrderDetails( e(0), e(1), e(2).trim.toFloat,e(3).trim.toInt, e(4).trim.toFloat )) .toDF orderDetails.registerTempTable("OrderDetails") println("Number of OrderDetails : " + orderDetails.count) sqlContext.sql("SELECT * FROM OrderDetails").take(10).foreach(println) println("\n******************************************************************\n") // Number of OrderDetails : 2155 // [10248,11,14.0,12,0.0] // [10248,42,9.8,10,0.0] // [10248,72,34.8,5,0.0] // [10249,14,18.6,9,0.0] // [10249,51,42.4,40,0.0] // [10250,41,7.7,10,0.0] // [10250,51,42.4,35,0.15] // [10250,65,16.8,15,0.15] // [10251,22,16.8,6,0.05] // [10251,57,15.6,15,0.05] result = sqlContext.sql("SELECT OrderDetails.OrderID,ShipCountry,UnitPrice,Qty,Discount FROM Orders " + "INNER JOIN OrderDetails ON Orders.OrderID = OrderDetails.OrderID") //result.take(10).foreach(println) result.take(10).foreach(e=>println("%s | %15s | %5.2f | %d | %5.2f |".format(e(0),e(1),e(2),e(3),e(4)))) println("\n******************************************************************\n") // 10315 | UK | 11.20 | 14 | 0.00 | // 10315 | UK | 12.00 | 30 | 0.00 | // 10766 | Germany | 19.00 | 40 | 0.00 | // 10766 | Germany | 30.00 | 35 | 0.00 | // 10766 | Germany | 12.50 | 40 | 0.00 | // 10810 | Canada | 6.00 | 7 | 0.00 | // 10810 | Canada | 14.00 | 5 | 0.00 | // 10810 | Canada | 15.00 | 5 | 0.00 | // 10928 | Spain | 9.50 | 5 | 0.00 | // 10928 | Spain | 18.00 | 5 | 0.00 | result = sqlContext.sql("SELECT ShipCountry, Sum(OrderDetails.UnitPrice * Qty * Discount) AS ProductSales FROM " + "Orders INNER JOIN OrderDetails ON Orders.OrderID = OrderDetails.OrderID GROUP BY ShipCountry") result.take(10).foreach(println) // Need to try this // println(result.take(30).mkString(" | ")) // result.take(30).foreach(e -> ... mkString(" | ") // probably this would work result.take(30).foreach(e=>println("%15s | %9.2f |".format(e(0),e(1)))) // Mexico | 491.37 | // Denmark | 2121.23 | // Portugal | 996.29 | // Poland | 0.00 | // Norway | 0.00 | // Canada | 5137.81 | // UK | 1645.20 | // Belgium | 1310.13 | // Brazil | 8029.76 | // Ireland | 7337.49 | // USA | 17982.37 | // Argentina | 0.00 | // Sweden | 5028.56 | // France | 4140.44 | // Germany | 14356.00 | // Spain | 1448.69 | // Finland | 968.40 | // Venezuela | 4004.26 | // Austria | 11492.79 | // Switzerland | 1226.84 | // 04876-786' | 12.94 | // Italy | 935.00 | } }
!-End-!