Wednesday, February 22, 2023

Databricks Delta table merge Example

here's some sample code that demonstrates a merge operation on a Delta table using PySpark: 




from pyspark.sql import SparkSession


# create a SparkSession object

spark = SparkSession.builder.appName("DeltaMergeDemo").getOrCreate()


# create a sample Delta table

data = [("Alice", "Sales", 3000),

        ("Bob", "Engineering", 4000),

        ("Charlie", "Marketing", 5000)]

df = spark.createDataFrame(data, ["name", "dept", "salary"])

df.write.format("delta").save("/path/to/delta-table")


# create a new DataFrame with updated salary information

updates = [("Alice", 3500),

           ("Bob", 4500),

           ("David", 6000)]

updates_df = spark.createDataFrame(updates, ["name", "salary"])


# perform a merge operation on the Delta table

from pyspark.sql.functions import when


delta_table = spark.read.format("delta").load("/path/to/delta-table")

delta_table_merged = delta_table \

    .merge(updates_df, "name") \

    .whenMatchedUpdate(set={"salary": when(delta_table.salary < updates_df.salary, updates_df.salary).otherwise(delta_table.salary)}) \

    .whenNotMatchedInsert(values={"name": updates_df.name, "dept": "Sales", "salary": updates_df.salary}) \

    .execute()


# display the results

delta_table_merged.show()




This code first creates a sample Delta table with some employee information. It then creates a new DataFrame with updated salary information, and performs a merge operation on the Delta table using the merge method.

The merge method takes the DataFrame to be merged (updates_df) and the name of the column to match on (name), and returns a DeltaMergeBuilder object. This object allows you to specify how to handle updates and inserts using the whenMatchedUpdate and whenNotMatchedInsert methods, respectively.

In this example, we're updating the salary column using the whenMatchedUpdate method, which updates the salary column only when the new value is greater than the existing value. We're also using the whenNotMatchedInsert method to insert new records into the Delta table when there is no match based on the name column.

Finally, the execute method is called to perform the merge operation on the Delta table. The results are then displayed using the show method.

Characteristics with show-off Manager

 Characteristics  with show-off Manager

  1. They will have less knowledge on the topic but behaves like they know everything.
  2. They will not listen who the Employee is telling.
  3. They do not know how to behave with Employee.
  4. They will not appreciate good work.
  5. They will not take suggestions.
  6. They will always the problem in work done by Employee.
  7. It is very hard to work them.
  8. They will not do any homework before meeting.
  9. They will not improve their knowledge. 


Wednesday, February 1, 2023

Importing Data into Amazon S3 Using Sqoop

 https://docs.cloudera.com/documentation/enterprise/6/6.3/topics/admin_sqoop_s3_import.html


Import Data into S3 Bucket in Incremental Mode

The --temporary-rootdir option must be set to point to a location in the S3 bucket to import data into an S3 bucket in incremental mode.

Append Mode

When importing data into a target directory in an Amazon S3 bucket in incremental append mode, the location of the temporary root directory must be in the same bucket as the directory. For example: s3a://example-bucket/temporary-rootdir or s3a://example-bucket/target-directory/temporary-rootdir.

Example command: Import data into a target directory in an Amazon S3 bucket in incremental append mode.

sqoop import --connect $CONN --username $USER --password $PWD --table $TABLE_NAME --target-dir s3a://example-bucket/target-directory --incremental append --check-column $CHECK_COLUMN --last-value $LAST_VALUE --temporary-rootdir s3a://example-bucket/temporary-rootdir 

Data from RDBMS can be imported into S3 in incremental append mode as Sequence or Avro file format. too

Parquet import into S3 in incremental append mode is also supported if the Parquet Hadoop API based implementation is used, meaning that the --parquet-configurator-implementation option is set to hadoop. For more information about the Parquet Hadoop API based implementation, see Importing Data into Parquet Format Using Sqoop.

Example command: Import data into a target directory in an Amazon S3 bucket in incremental append mode as Parquet file.

sqoop import --connect $CONN --username $USER --password $PWD --table $TABLE_NAME --target-dir s3a://example-bucket/target-directory --incremental append --check-column $CHECK_COLUMN --last-value $LAST_VALUE --temporary-rootdir s3a://example-bucket/temporary-rootdir --as-parquetfile --parquet-configurator-implementation hadoop 

Lastmodified Mode

When importing data into a target directory in an Amazon S3 bucket in incremental lastmodified mode, the location of the temporary root directory must be in the same bucket and in the same directory as the target directory. For example: s3a://example-bucket/temporary-rootdir in case of s3a://example-bucket/target-directory.

Example command: Import data into a target directory in an Amazon S3 bucket in incremental lastmodified mode.

sqoop import --connect $CONN --username $USER --password $PWD --table $TABLE_NAME --target-dir s3a://example-bucket/target-directory --incremental lastmodified --check-column $CHECK_COLUMN --merge-key $MERGE_KEY --last-value $LAST_VALUE --temporary-rootdir s3a://example-bucket/temporary-rootdir 

Parquet import into S3 in incremental lastmodified mode is supported if the Parquet Hadoop API based implementation is used, meaning that the --parquet-configurator-implementation option is set to hadoop. For more information about the Parquet Hadoop API based implementation, see Importing Data into Parquet Format Using Sqoop.

Example command: Import data into a target directory in an Amazon S3 bucket in incremental lastmodified mode as Parquet file.

sqoop import --connect $CONN --username $USER --password $PWD --table $TABLE_NAME --target-dir s3a://example-bucket/target-directory --incremental lastmodified --check-column $CHECK_COLUMN --merge-key $MERGE_KEY --last-value $LAST_VALUE --temporary-rootdir s3a://example-bucket/temporary-rootdir  --as-parquetfile --parquet-configurator-implementation hadoop

Free-form Query Imports --------- sqoop import with query in double quotes

 

7.2.3. Free-form Query Imports

Sqoop can also import the result set of an arbitrary SQL query. Instead of using the --table--columns and --where arguments, you can specify a SQL statement with the --query argument.

When importing a free-form query, you must specify a destination directory with --target-dir.

If you want to import the results of a query in parallel, then each map task will need to execute a copy of the query, with results partitioned by bounding conditions inferred by Sqoop. Your query must include the token $CONDITIONS which each Sqoop process will replace with a unique condition expression. You must also select a splitting column with --split-by.

For example:

$ sqoop import \
  --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \
  --split-by a.id --target-dir /user/foo/joinresults

Alternately, the query can be executed once and imported serially, by specifying a single map task with -m 1:

$ sqoop import \
  --query 'SELECT a.*, b.* FROM a JOIN b on (a.id == b.id) WHERE $CONDITIONS' \
  -m 1 --target-dir /user/foo/joinresults
[Note]Note

If you are issuing the query wrapped with double quotes ("), you will have to use \$CONDITIONS instead of just $CONDITIONS to disallow your shell from treating it as a shell variable. For example, a double quoted query may look like: "SELECT * FROM x WHERE a='foo' AND \$CONDITIONS"

[Note]Note

The facility of using free-form query in the current version of Sqoop is limited to simple queries where there are no ambiguous projections and no OR conditions in the WHERE clause. Use of complex queries such as queries that have sub-queries or joins leading to ambiguous projections can lead to unexpected results.

Python Docstring—How to Document Your Code



def add(a, b):
"""
Sum up two integers
Arguments:
a: an integer
b: an integer
Return:
The sum of the two integer arguments
"""
return a + b

help(add)

"""
add(a, b)
Sum up two integers
Arguments:
a: an integer
b: an integer
Return:
The sum of the two integer arguments


Process finished with exit code 0
"""

Recent Post

Databricks Delta table merge Example

here's some sample code that demonstrates a merge operation on a Delta table using PySpark:   from pyspark.sql import SparkSession # cre...