here's some sample code that demonstrates a merge operation on a Delta table using PySpark:
from pyspark.sql import SparkSession
# create a SparkSession object
spark = SparkSession.builder.appName("DeltaMergeDemo").getOrCreate()
# create a sample Delta table
data = [("Alice", "Sales", 3000),
("Bob", "Engineering", 4000),
("Charlie", "Marketing", 5000)]
df = spark.createDataFrame(data, ["name", "dept", "salary"])
df.write.format("delta").save("/path/to/delta-table")
# create a new DataFrame with updated salary information
updates = [("Alice", 3500),
("Bob", 4500),
("David", 6000)]
updates_df = spark.createDataFrame(updates, ["name", "salary"])
# perform a merge operation on the Delta table
from pyspark.sql.functions import when
delta_table = spark.read.format("delta").load("/path/to/delta-table")
delta_table_merged = delta_table \
.merge(updates_df, "name") \
.whenMatchedUpdate(set={"salary": when(delta_table.salary < updates_df.salary, updates_df.salary).otherwise(delta_table.salary)}) \
.whenNotMatchedInsert(values={"name": updates_df.name, "dept": "Sales", "salary": updates_df.salary}) \
.execute()
# display the results
delta_table_merged.show()
This code first creates a sample Delta table with some employee information. It then creates a new DataFrame with updated salary information, and performs a merge operation on the Delta table using the merge
method.
The merge
method takes the DataFrame to be merged (updates_df
) and the name of the column to match on (name
), and returns a DeltaMergeBuilder
object. This object allows you to specify how to handle updates and inserts using the whenMatchedUpdate
and whenNotMatchedInsert
methods, respectively.
In this example, we're updating the salary
column using the whenMatchedUpdate
method, which updates the salary
column only when the new value is greater than the existing value. We're also using the whenNotMatchedInsert
method to insert new records into the Delta table when there is no match based on the name
column.
Finally, the execute
method is called to perform the merge operation on the Delta table. The results are then displayed using the show
method.
No comments:
Post a Comment