Tuesday, 24 November 2015

Spark HashPartitioner Example

For latest updates check out my Apache Spark exploration @ https://github.com/Mageswaran1989/aja/

package org.aja.tej.examples.spark.rdd

import org.aja.tej.utils.TejUtils
import org.apache.spark.{HashPartitioner, SparkContext}

/**
 * Created by mageswaran on 21/11/15.
 */
/*
RDD is distributed this means it is split on some number of parts. Each of this partitions is potentially on different
machine. Hash partitioner with arument numPartitions choses on what partition to place pair (key, value)
in following way:
1. Creates exactly numPartitions partitions.
2. Places (key, value) in partition with number Hash(key) % numPartition

 */
object HashPartitionerExample extends App {

  def useCases(sc: SparkContext): Unit = {

    println(this.getClass.getSimpleName) //HashPartitionExample$

    val rddData = sc.parallelize(for {
      x <- 1 to 3
      y <- 1 to 2
    } yield (x, None), 8) //(key, value) with 8 partitions

    println("rddData partitons: " + rddData.partitions.length) //rdd partitons: 8
    println("rddData contents: " + rddData.collect.foreach(print)) // (1,None)(1,None)(2,None)(2,None)(3,None)(3,None)
    println("rddData partitioner: " + rddData.partitioner) //rddData partitioner: None


    //mapPartitions U -> T of iterator
    val numElementsPerPartition = rddData.mapPartitions(iter => Iterator(iter.length))

    println("numElementsPerPartition : " + numElementsPerPartition.collect().foreach(print)) //0 1 1 1 0 1 1 1
    //so 2 of 8 partition is empty

    println("----------------------------------------------------------")

    val rddDataWith1Partition = rddData.partitionBy(new HashPartitioner(1))
    println("rddDataWith1Partition partitons: " + rddDataWith1Partition.partitions.length) //rddDataWith1Partition partitons: 1
    println("rddDataWith1Partition contents: " + rddDataWith1Partition.collect.foreach(print))
    //(1,None)(1,None)(2,None)(2,None)(3,None)(3,None)rddDataWith1Partition conten
    //Since we have only one partition it contains all elements

    println("numElementsPerPartition : " +
      rddDataWith1Partition.mapPartitions(iter => Iterator(iter.length)).collect().foreach(println)) //6

    println("----------------------------------------------------------")

    val rddDataWith2Partition = rddData.partitionBy(new HashPartitioner(2))
    println("rddDataWith2Partition partitons: " + rddDataWith2Partition.partitions.length) //rddDataWith1Partition partitons: 1
    println("rddDataWith2Partition contents: " + rddDataWith2Partition.collect.foreach(print))
    //(2,None)(2,None)(1,None)(1,None)(3,None)(3,None)
    ////i.e partitionBy -> ShuffleRDD -> partition for current element is selected by (k.hashcode % numPartition)

    println("numElementsPerPartition : " +
      rddDataWith2Partition.mapPartitions(iter => Iterator(iter.length)).collect().foreach(println)) //2 4
    //Since rdd is partitioned by key data won't be distributed uniformly anymore:
    //Lets see why?
    //Because with have three keys and only two different values of hashCode mod numPartitions(2) there is nothing unexpected
    println("(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2)) : " + (1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2)))
    //Vector((1,1,1), (2,2,0), (3,3,1))

    //Lets confirm in our rdd
    println("Keys in each partition" + rddDataWith2Partition.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect.foreach(println))
    //Set(2)
    //Set(1,3)

    println("----------------------------------------------------------")

    val rddDataWith10Partition = rddData.partitionBy(new HashPartitioner(10))
    println("rddDataWith10Partition partitons: " + rddDataWith10Partition.partitions.length) //rddDataWith1Partition partitons: 10
    println("rddDataWith2Partition contents: " + rddDataWith10Partition.collect.foreach(print))
    //(1,None)(1,None)(2,None)(2,None)(3,None)(3,None)
    ////i.e partitionBy -> ShuffleRDD -> partition for current element is selected by (k.hashcode % numPartition)

    println("numElementsPerPartition : " +
      rddDataWith10Partition.mapPartitions(iter => Iterator(iter.length)).collect().foreach(println))
    //0   2     2    2    0    0    0    0    0    0

    println("(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 10)) : " + (1 to 10).map((k: Int) => (k, k.hashCode, k.hashCode % 10)))
    // Vector((1,1,1), (2,2,2), (3,3,3), (4,4,4), (5,5,5), (6,6,6), (7,7,7), (8,8,8), (9,9,9), (10,10,0))

    //Lets confirm in our rdd
    println("Keys in each partition" + rddDataWith10Partition.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect.foreach(println))
    //Set()
    //Set(1)
    //Set(2)
    //Set(3)
    //Set()
    //Set()
    //Set()
    //Set()
    //Set()
    //Set()
  }

  useCases(TejUtils.getSparkContext(this.getClass.getSimpleName))
}


No comments:

Post a Comment