Thursday, March 19, 2020

Spark combineByKey[Pair] Transformation


combineByKey[Pair]
Very efficient implementation that combines the values of a RDD consisting of two-component tuples by applying multiple aggregators one after another.

Listing Variants

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)]


Example


scala> val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

Now ,we  are goign to use zip method to combine those 2 RDDs and get the tuple RDD.

scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[9] at zip at <console>:27

scala> c.collect
res10: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))


scala> val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[10] at combineByKey at <console>:25

Explanation: 

List(_)  => createCombine implementation (from combineByKey method signature) => This will make the list of values of that particualr Keys.
Example: 
                 (1,dog)  = List(dog) as response.

=================================================================

(x:List[String], y:String) => y :: x   =>mergeValue implementation (from combineByKey method signature)  => This will take the createCombiner Response and next same key values and prepends the vlaues ... 

:: means that prepends the values to List ... Example :  1 :: List(2,3) = List(1,2,3)

Example: 
   createCombiner values of key 1 = List(dog)
   Next values of same Key(1) = cat ..
   So by applying the above mergeValue logic ,we will get  response as : List(cat,dog)


===================================================================
x:List[String], y:List[String]) => x ::: y => mergeCombiner simplementation (from combineByKey method signature) => This willl combiner all the mergeValue of different partitions .

Example : 
  Say  dog, cat = List(cat,dog) are from Partition 1 .
          turkey= List(turkey)  is from Partition 2. then by applying the mergeCombiner
List(cat,dog) ::: List(turkey)  = List(cat, dog, turkey)

::: means that prepend the List  to List  ... Example : List(1,2) ::: List(3,4) = List(1,2,3,4)


scala> d.collect
res11: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))


Note: createCombiner and mergeValue are acting on each partition ...
          mergeCombiner is acting on reducer .





No comments:

Post a Comment

Recent Post

Databricks Delta table merge Example

here's some sample code that demonstrates a merge operation on a Delta table using PySpark:   from pyspark.sql import SparkSession # cre...