combineByKey[Pair]
Very efficient implementation that combines the values of a RDD consisting of two-component tuples by applying multiple aggregators one after another.
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)]
Example
scala> val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24
scala> val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
Now ,we are goign to use zip method to combine those 2 RDDs and get the tuple RDD.
scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[9] at zip at <console>:27
scala> c.collect
res10: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))
scala> val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[10] at combineByKey at <console>:25
Explanation:
List(_) => createCombine implementation (from combineByKey method signature) => This will make the list of values of that particualr Keys.
Example:
(1,dog) = List(dog) as response.
=================================================================
(x:List[String], y:String) => y :: x =>mergeValue implementation (from combineByKey method signature) => This will take the createCombiner Response and next same key values and prepends the vlaues ...
:: means that prepends the values to List ... Example : 1 :: List(2,3) = List(1,2,3)
Example:
createCombiner values of key 1 = List(dog)
Next values of same Key(1) = cat ..
So by applying the above mergeValue logic ,we will get response as : List(cat,dog)
===================================================================
x:List[String], y:List[String]) => x ::: y => mergeCombiner simplementation (from combineByKey method signature) => This willl combiner all the mergeValue of different partitions .
Example :
Say dog, cat = List(cat,dog) are from Partition 1 .
turkey= List(turkey) is from Partition 2. then by applying the mergeCombiner
List(cat,dog) ::: List(turkey) = List(cat, dog, turkey)
::: means that prepend the List to List ... Example : List(1,2) ::: List(3,4) = List(1,2,3,4)
scala> d.collect
res11: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
Note: createCombiner and mergeValue are acting on each partition ...
mergeCombiner is acting on reducer .
No comments:
Post a Comment