This post is how to use pyspark to work with Delta Tables.
For more information on Delta Lake you can refer here.
First you need to install the “delta-spark” package for whatever version you require.
pip install delta-spark==3.1.0
Setup a Spark Session.
To read delta tables you can refer to PySpark: Read From ADLS to DataFrame.
To write delta tables you can refer to PySpark: Save a DataFrame To ADLS.
Vacuum Delta Table
from delta.tables import DeltaTable vacuum_hrs = 100 path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' delta_table = DeltaTable.forPath(spark, path) delta_table.vacuum(vacuum_hrs)
Compaction
Impoves reads by merging small files into larger ones.
from delta.tables import DeltaTable path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' delta_table = DeltaTable.forPath(spark, path) delta_table.optimize().executeCompaction()
Z-Order
from delta.tables import DeltaTable path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' columns = '' delta_table = DeltaTable.forPath(spark, path) delta_table.optimize().executeZOrderBy(columns)
Delete
from delta.tables import DeltaTable import pyspark.sql.functions as F path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' delta_table = DeltaTable.forPath(spark, path) delta_table.delete(F.col('<MY_COL>') == '<SOME_VAL>') #You can also use sql delta_table.delete("column == 'some_VALUE'")
Modify Properties
You can refer here for more properties.
dataSkippingNumIndexedCols
You would do this if you have over the max columns that the delta lake can collect statistics on. Default value is 32.
path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' skip_cols = <SOME_VALUE> spark.sql("ALTER TABLE delta.`%s` SET TBLPROPERTIES ('delta.dataSkippingNumIndexedCols' == '%s')" % (path, skip_cols))
You must be logged in to post a comment.