Wednesday, February 22, 2023

Databricks Delta table merge Example

here's some sample code that demonstrates a merge operation on a Delta table using PySpark: 




from pyspark.sql import SparkSession


# create a SparkSession object

spark = SparkSession.builder.appName("DeltaMergeDemo").getOrCreate()


# create a sample Delta table

data = [("Alice", "Sales", 3000),

        ("Bob", "Engineering", 4000),

        ("Charlie", "Marketing", 5000)]

df = spark.createDataFrame(data, ["name", "dept", "salary"])

df.write.format("delta").save("/path/to/delta-table")


# create a new DataFrame with updated salary information

updates = [("Alice", 3500),

           ("Bob", 4500),

           ("David", 6000)]

updates_df = spark.createDataFrame(updates, ["name", "salary"])


# perform a merge operation on the Delta table

from pyspark.sql.functions import when


delta_table = spark.read.format("delta").load("/path/to/delta-table")

delta_table_merged = delta_table \

    .merge(updates_df, "name") \

    .whenMatchedUpdate(set={"salary": when(delta_table.salary < updates_df.salary, updates_df.salary).otherwise(delta_table.salary)}) \

    .whenNotMatchedInsert(values={"name": updates_df.name, "dept": "Sales", "salary": updates_df.salary}) \

    .execute()


# display the results

delta_table_merged.show()




This code first creates a sample Delta table with some employee information. It then creates a new DataFrame with updated salary information, and performs a merge operation on the Delta table using the merge method.

The merge method takes the DataFrame to be merged (updates_df) and the name of the column to match on (name), and returns a DeltaMergeBuilder object. This object allows you to specify how to handle updates and inserts using the whenMatchedUpdate and whenNotMatchedInsert methods, respectively.

In this example, we're updating the salary column using the whenMatchedUpdate method, which updates the salary column only when the new value is greater than the existing value. We're also using the whenNotMatchedInsert method to insert new records into the Delta table when there is no match based on the name column.

Finally, the execute method is called to perform the merge operation on the Delta table. The results are then displayed using the show method.

No comments:

Post a Comment

Recent Post

Databricks Delta table merge Example

here's some sample code that demonstrates a merge operation on a Delta table using PySpark:   from pyspark.sql import SparkSession # cre...