PySpark: Delta Lake

(Last Updated On: )

This post is how to use pyspark to work with Delta Tables.

For more information on Delta Lake you can refer here.

First you need to install the “delta-spark” package for whatever version you require.

  1. pip install delta-spark==3.1.0

Setup a Spark Session.

To read delta tables you can refer to PySpark: Read From ADLS to DataFrame.

To write delta tables you can refer to PySpark: Save a DataFrame To ADLS.

Vacuum Delta Table

  1. from delta.tables import DeltaTable
  2.  
  3. vacuum_hrs = 100
  4. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  5.  
  6. delta_table = DeltaTable.forPath(spark, path)
  7. delta_table.vacuum(vacuum_hrs)

Compaction

Impoves reads by merging small files into larger ones.

  1. from delta.tables import DeltaTable
  2.  
  3. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  4.  
  5. delta_table = DeltaTable.forPath(spark, path)
  6. delta_table.optimize().executeCompaction()

Z-Order

  1. from delta.tables import DeltaTable
  2.  
  3. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  4. columns = ''
  5.  
  6. delta_table = DeltaTable.forPath(spark, path)
  7. delta_table.optimize().executeZOrderBy(columns)

Delete

  1. from delta.tables import DeltaTable
  2. import pyspark.sql.functions as F
  3.  
  4. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  5.  
  6. delta_table = DeltaTable.forPath(spark, path)
  7. delta_table.delete(F.col('<MY_COL>') == '<SOME_VAL>')
  8.  
  9. #You can also use sql
  10. delta_table.delete("column == 'some_VALUE'")

Modify Properties

You can refer here for more properties.

dataSkippingNumIndexedCols

You would do this if you have over the max columns that the delta lake can collect statistics on. Default value is 32.

  1. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  2. skip_cols = <SOME_VALUE>
  3.  
  4. spark.sql("ALTER TABLE delta.`%s` SET TBLPROPERTIES ('delta.dataSkippingNumIndexedCols' == '%s')" % (path, skip_cols))