Thursday, March 5, 2020

Pipe in Spark

Pipe operator in Spark, allows developer to process RDD data using external applications. Sometimes in data analysis, we need to use an external library which may not be written using Java/Scala. Ex: Fortran math libraries. In that case, spark’s pipe operator allows us to send the RDD data to the external application.
In this post, first we are going to look at how we can use pipe operator. Once we understand the usage, then we will see how we can implement pipe operation in normal scala programs. This implementation is taken from spark implementation.

Pipe in Spark


The following steps shows how to use pipe operator. To start with, we will create an RDD from inmemory list. Step 1 : Create a RDD
val data = List("hi","hello","how","are","you")
 val dataRDD = sc.makeRDD(data) //sc is SparkContext  
Step 2 : Create a shell script
Once we have RDD, then we will pipe it to a shell script. Let’s create a file called echo.sh, then put the following content.
#!/bin/sh
echo "Running shell script"
while read LINE; do
   echo ${LINE}    
done

This is a simple shell script which reads the input from stdin and output that to stdout. You can do any other shell operation in this shell script.
Step 3 : Pipe rdd data to shell script
One we have the shell script, we can pipe the RDD through this script. Make sure that you change the scriptPath variable to match path of your file.
val scriptPath = "/home/hadoop/echo.sh"
val pipeRDD = dataRDD.pipe(scriptPath)
pipeRDD.collect() 

Now you should able to see, the line printed on console with echo messages from shell script. In place of shell script, you can use any other executable.

No comments:

Post a Comment

Recent Post

Databricks Delta table merge Example

here's some sample code that demonstrates a merge operation on a Delta table using PySpark:   from pyspark.sql import SparkSession # cre...