Friday, March 27, 2020

Extending Spark API --- compute(split: Partition, context: TaskContext)

Motivation

Let’ say we have sales data from an online store. The data is in csv format. It contains transactionIdcustomerIditemId and itemValue. This model is represented as SalesRecord.
class SalesRecord(val transactionId: String,
                  val customerId: String,
                  val itemId: String,
                  val itemValue: Double) extends Comparable[SalesRecord]
with Serializable
So whenever we get sales data, we convert the raw data to RDD[SalesRecord].
val sc = new SparkContext(args(0), "extendingspark")
val dataRDD = sc.textFile(args(1))
val salesRecordRDD = dataRDD.map(row => {
    val colValues = row.split(",")
    new SalesRecord(colValues(0),colValues(1),
    colValues(2),colValues(3).toDouble)
})
Let’s say we want to find out total amount of sales, then in Spark we can write
salesRecordRDD.map(_.itemValue).sum
though it’s concise, it’s not super readable. It will be nice to have
salesRecordRDD.totalSales
In the above code, the totalSales feels like built in spark operator.Of course spark don’t know anything about our data or our data model. Then how we can add our own custom operator on RDD?

Adding custom operators to RDD

The following are the steps to add custom operator’s to RDD.

Step 1 : Define Utility class to hold custom operators

The following code defines an utility class, CustomFunctions , which holds all the custom operators. We take specific RDD,i.e RDD[SalesRecord] so that these operators only available on sales record RDD.
class CustomFunctions(rdd:RDD[SalesRecord]) {
  def totalSales = rdd.map(_.itemValue).sum  
}

Step 2 : Implicit conversion to add operators on RDD

The following code defines an implicit function, addCustomFunctions which will add all the custom functions defined in CustomFunctions to the RDD[SalesRecord]
object CustomFunctions {
  implicit def addCustomFunctions(rdd: RDD[SalesRecord]) = new
  CustomFunctions(rdd) 
}

Step 3: Use custom functions, using implicit import

The following code has access to custom operator, totalSales using CustomFunctions._ import.
import CustomFunctions._
println(salesRecordRDD.totalSales)
With the above steps, you defined a domain specific operator on RDD.

Creating custom RDD

In the earlier example, we implemented an action which result in single value. But what about the situation where we want to represent lazily evaluated actions?. For example, let’s say we want to give discount to each sales in the RDD. These discounts are lazy in nature. So we need a RDD which can represent the laziness. In following steps we are going to create a RDD called DiscountRDD which holds the discount calculation.

Step 1: Create DiscountRDD by extending RDD

class DiscountRDD(prev:RDD[SalesRecord],discountPercentage:Double) 
extends RDD[SalesRecord](prev){

// override compute method to calculate the discount
override def compute(split: Partition, context: TaskContext): Iterator[SalesRecord] =  {
  firstParent[SalesRecord].iterator(split, context).map(salesRecord => {
      val discount = salesRecord.itemValue*discountPercentage
      new SalesRecord(salesRecord.transactionId,
      salesRecord.customerId,salesRecord.itemId,discount)
})}

override protected def getPartitions: Array[Partition] = 
firstParent[SalesRecord].partitions
}
In the above code, we created a RDD called DiscountRDD. It is a RDD derived by applying discount on sales RDD. When we extend RDD, we have to override two methods
  • compute

This method is the one which computes value for each partition of RDD. In our code, we take input sales record and output it by applying discount as specified by discountPercentage.
  • getPartitions

getPartitions method allows developer to specify the new partitions for the RDD. As we don’t change the partitions in our example, we can just reuse the partitions of parent RDD.

Step 2: Add a custom operator named discount

Using similar trick discussed earlier, we can add custom operator called discount which creates DiscountRDD.
def discount(discountPercentage:Double) = new DiscountRDD(rdd,discountPercentage)

Step 3 : Use discount, using implicit import

import CustomFunctions._
 val discountRDD = salesRecordRDD.discount(0.1)
 println(discountRDD.collect().toList)
So now you know how you can extends spark API for your own domain specific use cases.

Wednesday, March 25, 2020

How to Change Hostname on Ubuntu 18.04

Display the Current Hostname

To view the current hostname, enter the following command:
hostnamectl


As you can see in the image above, the current hostname is set to ubuntu1804.localdomain.

Change the Hostname

The following steps outline how to change the hostname in Ubuntu 18.04.

1. Change the hostname using hostnamectl.

In Ubuntu 18.04 we can change the system hostname and related settings using the command hostnamectl.
For example, to change the system static hostname to nagaraju, you would use the following command:
sudo hostnamectl set-hostname nagaraju
The hostnamectl command does not produce output. On success, 0 is returned, a non-zero failure code otherwise.

2. Edit the /etc/hosts file.

Open the /etc/hosts file and change the old hostname to the new one.
/etc/hosts
127.0.0.1   localhost
127.0.0.1   nagaraju

# The following lines are desirable for IPv6 capable hosts
::1     localhost ip6-localhost ip6-loopback
ff02::1 ip6-allnodes
ff02::2 ip6-allrouters

Hadoop Multi Cluster Setup

Make bigger screen of Virtual Box Application Run:

sudo apt-get remove libcheese-gtk23
sudo apt-get install xserver-xorg-core
sudo apt-get install virtualbox-guest-x11
  1. Click Devices -> Insert Guest Additions CD image from the menu of the VM window
  2. Restart the VM
​​

Spark Submit with Jar file Creation

build.sbt

name := "SparkNowTest"
version := "0.1"
scalaVersion := "2.12.9"
// https://mvnrepository.com/artifact/org.apache.spark/spark-corelibraryDependencies += "org.apache.spark" %% "spark-sql" % "2.4.2" % "provided"
import sbtassembly.MergeStrategy
assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", "hadoop", "yarn", "factories", "package-info.class")         => MergeStrategy.discard  case PathList("org", "apache", "hadoop", "yarn", "provider", "package-info.class")         => MergeStrategy.discard  case PathList("org", "apache", "hadoop", "util", "provider", "package-info.class")         => MergeStrategy.discard  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class")         => MergeStrategy.first  case PathList("org.slf4j", "impl", xs @ _*) => MergeStrategy.first  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}
plugins.sbt

addSbtPlugin("com.eed3si9n" %% "sbt-assembly" % "0.14.5")

Application Run:

spark-submit --class "com.nagaraju.ReadJSON" --master local[*] target/scala-2.12/SparkNowTest-assembly-0.1.jar

Tuesday, March 24, 2020

Login as Root User

nagaraju@nagaraju-ambarisvr:~$ sudo -s
[sudo] password for nagaraju:
root@nagaraju-ambarisvr:~#

Monday, March 23, 2020

Pulse Secure Installation On Ubuntu 18 or 19

Ubuntu 19.04 is not support platform for pulse secure. You may get it work by following below instructions without installing dependencies by package manager. Note: Use carefully && caution with your own risk.
  • As root user, run:
    dpkg -i pulse-9.0.R3.x86_64.deb
    cd /usr/local/pulse/
    
  • Allow installation for 19.04 by sed ( or manually in line 279 in PulseClient_x86_64.sh)
    sed -i "s/UBUNTU_VER\ \=\ 18\ \]/& \|\|\ [\ \$UBUNTU_VER\ \=\ 19 \]/" PulseClient_x86_64.sh
    
  • May not needed. At least it will fail with packages which not found.
    ./PulseClient_x86_64.sh install_dependency_packages
    
  • Lets make debs and extra folders:
    mkdir /usr/local/pulse/extra
    mkdir /usr/local/pulse/debs   
    
  • Download below packages from archice.ubuntu.com to /usr/local/pulse/debs:
    cd /usr/local/pulse/debs
    wget http://archive.ubuntu.com/ubuntu/pool/main/i/icu/libicu60_60.2-3ubuntu3_amd64.deb
    wget http://archive.ubuntu.com/ubuntu/pool/universe/w/webkitgtk/libjavascriptcoregtk-1.0-0_2.4.11-3ubuntu3_amd64.deb
    wget http://archive.ubuntu.com/ubuntu/pool/universe/w/webkitgtk/libwebkitgtk-1.0-0_2.4.11-3ubuntu3_amd64.deb
    
  • Extract *.deb files on /usr/local/pulse/extra
    cd /usr/local/pulse/extra
    dpkg -x /usr/local/pulse/debs/libicu60_60.2-3ubuntu3_amd64.deb .
    dpkg -x /usr/local/pulse/debs/libjavascriptcoregtk-1.0-0_2.4.11-3ubuntu3_amd64.deb .
    dpkg -x /usr/local/pulse/debs/libwebkitgtk-1.0-0_2.4.11-3ubuntu3_amd64.deb .
    
  • Which to normal user and export LD_LIBRARY_PATH in command line:
    export LD_LIBRARY_PATH=$LD_LIBRARY_PATH:/usr/local/pulse/extra/usr/lib/x86_64-linux-gnu/
    
  • As normal user, run pulse secure GUI on command line:
    /usr/local/pulse/pulseUi
    
Now you should able to see pulseUI and continue with it. Note: For troubleshooting purpose, you can use ldd command to check which libraries are needed, like: ldd /usr/local/pulse/pulseUi

Sunday, March 22, 2020

Desktop Icons are not showing in Windows 10

Disable the second monitor

If you’ve used a dual monitor setup before, it’s possible that Desktop icons are set to appear on the second monitor. So, make sure to disable the second monitor if you’re not using it, and the icons may appear.

Solution 5 – Make sure the Show Desktop icons feature is enabled

Although it may sound banal, it’s quite possible you overlooked this. So, just to be sure, check if your Desktop icons are set to appear once again. Here’s how to do that:
  1. Right-click on the empty area on your desktop.
  2. Choose View and you should see the Show Desktop icons option.
  3. Try checking and unchecking Show Desktop icons option a few times but remember to leave this option checked

Thursday, March 19, 2020

Spark combineByKey[Pair] Transformation


combineByKey[Pair]
Very efficient implementation that combines the values of a RDD consisting of two-component tuples by applying multiple aggregators one after another.

Listing Variants

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]
def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)]


Example


scala> val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
a: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[7] at parallelize at <console>:24

scala> val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

Now ,we  are goign to use zip method to combine those 2 RDDs and get the tuple RDD.

scala> val c = b.zip(a)
c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[9] at zip at <console>:27

scala> c.collect
res10: Array[(Int, String)] = Array((1,dog), (1,cat), (2,gnu), (2,salmon), (2,rabbit), (1,turkey), (2,wolf), (2,bear), (2,bee))


scala> val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
d: org.apache.spark.rdd.RDD[(Int, List[String])] = ShuffledRDD[10] at combineByKey at <console>:25

Explanation: 

List(_)  => createCombine implementation (from combineByKey method signature) => This will make the list of values of that particualr Keys.
Example: 
                 (1,dog)  = List(dog) as response.

=================================================================

(x:List[String], y:String) => y :: x   =>mergeValue implementation (from combineByKey method signature)  => This will take the createCombiner Response and next same key values and prepends the vlaues ... 

:: means that prepends the values to List ... Example :  1 :: List(2,3) = List(1,2,3)

Example: 
   createCombiner values of key 1 = List(dog)
   Next values of same Key(1) = cat ..
   So by applying the above mergeValue logic ,we will get  response as : List(cat,dog)


===================================================================
x:List[String], y:List[String]) => x ::: y => mergeCombiner simplementation (from combineByKey method signature) => This willl combiner all the mergeValue of different partitions .

Example : 
  Say  dog, cat = List(cat,dog) are from Partition 1 .
          turkey= List(turkey)  is from Partition 2. then by applying the mergeCombiner
List(cat,dog) ::: List(turkey)  = List(cat, dog, turkey)

::: means that prepend the List  to List  ... Example : List(1,2) ::: List(3,4) = List(1,2,3,4)


scala> d.collect
res11: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))


Note: createCombiner and mergeValue are acting on each partition ...
          mergeCombiner is acting on reducer .





What's the difference between :: and ::: in Scala

In general:
  • :: - adds an element at the beginning of a list and returns a list with the added element
  • ::: - concatenates two lists and returns the concatenated list
For example:
1 :: List(2, 3)             will return     List(1, 2, 3)
List(1, 2) ::: List(3, 4)   will return     List(1, 2, 3, 4)
In your specific question, using :: will result in list in a list (nested list) so I believe you prefer to use :::.

Recent Post

Databricks Delta table merge Example

here's some sample code that demonstrates a merge operation on a Delta table using PySpark:   from pyspark.sql import SparkSession # cre...