Saturday, April 11, 2020

How to use braodcast variable in spark


Broadcast Variables

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks. They can be used, for example, to give every node a copy of a large input dataset in an efficient manner. Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.

Spark actions are executed through a set of stages, separated by distributed “shuffle” operations. Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

Broadcast variables are created from a variable v by calling SparkContext.broadcast(v). The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method. The code below shows this:



import java.nio.charset.CodingErrorAction

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkContext

import scala.io.{Codec, Source}

object AverageMovieRatings {

    def mapToTuple(line: String): (Int, (Float, Int)) = {
        val fields = line.split(',')
        return (fields(1).toInt, (fields(2).toFloat, 1))
    }

    def loadMovieNames(): Map[Int, String] = {

        // Handle character encoding issues
        implicit val codec = Codec("UTF-8")
        codec.onMalformedInput(CodingErrorAction.REPLACE)
        codec.onUnmappableCharacter(CodingErrorAction.REPLACE)

        var movieNames: Map[Int, String] = Map()

        // Read lines from movies.csv into Iterator. Drop the first (header) row.
        val lines = Source.fromFile("/tmp/ml-latest-small/movies.csv").getLines().drop(1)
        for (line <- lines) {
            val fields = line.split(',')
            movieNames += (fields(0).toInt -> fields(1))
        }
        return movieNames
    }

    def main(args: Array[String]): Unit = {
        Logger.getLogger("org").setLevel(Level.ERROR)

        val sc = new SparkContext("local[*]", "AverageMovieRatings")

        // Broadcast the movie names
        val names = loadMovieNames()

        // Read a text file
        var data = sc.textFile("/tmp/ml-latest-small/ratings.csv")

        // Extract the first row which is the header
        val header = data.first();

        // Filter out the header from the dataset
        data = data.filter(row => row != header)

        val result = data.map(mapToTuple)
          .reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
          .map(x => (x._1, x._2._1 / x._2._2))
          .sortBy(_._2, false)
          .map(x => (names(x._1), x._2))
          .collect()

        result.foreach(println)
    }
}

No comments:

Post a Comment

Recent Post

Databricks Delta table merge Example

here's some sample code that demonstrates a merge operation on a Delta table using PySpark:   from pyspark.sql import SparkSession # cre...