Databricks: Python SDK

This post is how to use the Databricks Python SDK.

Install the Package

pip install databricks-sdk

Update Package

pip install databricks-sdk --upgrade

Check Package Version

pip show databricks-sdk | grep -oP '(?<=Version: )\S+'

Setup WorkspaceClient

from databricks.sdk import WorkspaceClient

secret = dbutils.secrets.get(scope = "<SCOPE>", key = "<KEY>")

w = WorkspaceClient(
  host = 'https://<URL>/'
  azure_workspace_resource_id = '<RESOURCE_ID_OF_DATABRICKS>',
  azure_tenant_id = '<TENANT_ID>',
  azure_client_id = '<CLIENT_ID>',
  azure_client_secret = secret
)

Setup AccountClient

You can get the account_id from the databricks account portal. By your id in the top right hand corner.

from databricks.sdk import AccountClient

secret = dbutils.secrets.get(scope = "<SCOPE>", key = "<KEY>")

a = AccountClient(
  host = 'https://accounts.azuredatabricks.net'
  account_id = '<ACCOUNT_ID>'
  azure_tenant_id = '<TENANT_ID>',
  azure_client_id = '<CLIENT_ID>',
  azure_client_secret = secret
)

List Workspace Groups

NOTE: You must also setup the workspaceclient to do this.

w.groups.list()

List Account Groups

NOTE: You must also setup the accountclient to do this. You must also be account admin.

a.groups.list()

Create Storage Credential

NOTE: Your SPN must be account admin to do this. You must also setup the workspaceclient to do this.

from databricks.sdk.service.catalog import AzureManagedIdentity

storage_credential_name = '<CREDENTIAL_NAME>'
comment = '<COMMENT>'
connector_id = '<DATABRICKS_ACCESS_CONNECTOR>'
az_mi = AzureManagedIdentity(access_connector_id = connector_id)

w.storage_credenditals.create(
  name = storage_credential_name,
  azure_managed_identity = az_mi
  comment = comment
)

 

PySpark DataFrame Methods

This post shows different methods of a DataFrame.

Get the first value in a column

df = some_dataframe_definition

value = df.select("SOME_COLUMN_NAME").first()[0]

Convert Dataframe to JSON

df = some_dataframe_definition

result_json = df.toJSON()

Get a Row

df = some_dataframe_definition

row = df.collect()[0]      #You can switch out 0 for whatever row you want.

Count rows of Dataframe

df = some_dataframe_definition

num_rows = df.count()

 

 

Databricks Unity Catalog SQL Commands

This post is basic commands you will need to know for working with Unity Catalog.

Display Current Metastore
SELECT CURRENT_METASTORE();
Display Current Catalog
SELECT CURRENT_CATALOG();
Create Catalog
CREATE CATALOG IF NOT EXISTS  <Catalog_Name> COMMENT 'A COMMENT';
Create Catalog With Location
CREATE CATALOG IF NOT EXISTS <Catalog_Name> MANAGED LOCATION 'abfss://<METASTORE_CONTAINER_NAME>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<CATALOG_NAME>' COMMENT 'A COMMENT';
Describe Catalog
DESCRIBE CATALOG <Catalog_Name>;
Create Schema
CREATE SCHEMA IF NOT EXISTS <SCHEMA_NAME> COMMENT '<COMMENT>';
Create Schema With Location
CREATE SCHEMA IF NOT EXISTS <SCHEMA_NAME> MANAGED LOCATION 'abfss://<METASTORE_CONTAINER_NAME>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<CATALOG_NAME>/<SCHEMA_NAME>' COMMENT '<COMMENT>';
Show All Storage Credentials
SHOW STORAGE CREDENTIALS;
Describe Credential
DESCRIBE STORAGE CREDENTIAL <CREDENTIAL_NAME>;
Create External Location

You will first need a storage credential.

You can reference down to the full table path or keep it at the container

CREATE EXTERNAL LOCATION IF NOT EXISTS <NAME>
URL 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/'
WITH (STORAGE CREDENTIAL <CREDENTIAL_NAME>)
COMMENT '<COMMENT>';
Create External Table
CREATE TABLE <CATALOG_NAME>.<SCHEMA_NAME>.<TABLE_NAME>
USING <FORMAT>
LOCATION 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/FOLDER/PATH;'
Grant Create Storage Credential on Metastore
GRANT CREATE STORAGE CREDENTIAL ON METASTORE TO `<USER>`;
Grant Permission to Create External Locations on Storage Credential
GRANT CREATE EXTERNAL LOCATION ON STORAGE CREDENTIAL <CREDENTIAL_NAME> TO `<USER>`;
Grant Permission to Create External Location On Metastored
GRANT CREATE EXTERNAL LOCATION ON METASTORE TO `<USER>`;
Grant Permission to Use Catalog
GRANT USE_CATALOG ON CATALOG <CATALOG_NAME> TO `<USER>`;
Show all Grants On Metastore
SHOW GRANTS `<USER>` ON METASTORE;
Grant Permission to Use Schema
GRANT USE_SCHEMA ON SCHEMA <CATALOG_NAME>.<SCHEMA_NAME> TO `<USER>`;
Grant Permission to Create Table
GRANT CREATE TABLE ON SCHEMA <CATALOG_NAME>.<SCHEMA_NAME> TO <USER>;

 

Databricks Unity Catalog Rest API’s

This post is how to work with Databricks Unity Catalog Rest API’s.

Set Catalog Isolation Mode to ISOLATED

curl --location --request PATCH 'https://<DATABRICK_URL>/api/2.1/unity-catalog/catalogs/<CATALOG_NAME>' \
--header 'Authorization: Bearer <TOKEN>' \
--header 'Content-Type: application/json' \
--data-raw '{
"isolation_mode": "ISOLATED"
}'

Bind Workspace to Catalog

curl --location --request PATCH 'https://<DATABRICK_URL>/api/2.1/unity-catalog/bindings/catalog/<CATALOG_NAME>' \
--header 'Authorization: Bearer <TOKEN>' \
--header 'Content-Type: application/json' \
--data-raw '{
"add": [{ "workspace_id": "<WORKSPACEE_ID>", "binding_type": "BINDING_TYPE_READ_WRITE" }]
"remove": []
}'

Unbind Workspace to Catalog

curl --location --request PATCH 'https://<DATABRICK_URL>/api/2.1/unity-catalog/bindings/catalog/<CATALOG_NAME>' \
--header 'Authorization: Bearer <TOKEN>' \
--header 'Content-Type: application/json' \
--data-raw '{
"unassign_workspaces": ["<WORKSPACE_ID>"]
}'

List Workspaces Assigned to Catalog

curl --location --request GET 'https://<DATABRICK_URL>/api/2.1/unity-catalog/bindings/catalog/<CATALOG_NAME>' \
--header 'Authorization: Bearer <TOKEN>' \
--header 'Content-Type: application/json'

 

PySpark: Delta Lake

This post is how to use pyspark to work with Delta Tables.

For more information on Delta Lake you can refer here.

First you need to install the “delta-spark” package for whatever version you require.

pip install delta-spark==3.1.0

Setup a Spark Session.

To read delta tables you can refer to PySpark: Read From ADLS to DataFrame.

To write delta tables you can refer to PySpark: Save a DataFrame To ADLS.

Vacuum Delta Table

from delta.tables import DeltaTable

vacuum_hrs = 100
path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

delta_table = DeltaTable.forPath(spark, path)
delta_table.vacuum(vacuum_hrs)

Compaction

Impoves reads by merging small files into larger ones.

from delta.tables import DeltaTable

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 

delta_table = DeltaTable.forPath(spark, path)
delta_table.optimize().executeCompaction()

Z-Order

from delta.tables import DeltaTable

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 
columns = ''

delta_table = DeltaTable.forPath(spark, path)
delta_table.optimize().executeZOrderBy(columns)

Delete

from delta.tables import DeltaTable
import pyspark.sql.functions as F

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 

delta_table = DeltaTable.forPath(spark, path)
delta_table.delete(F.col('<MY_COL>') == '<SOME_VAL>')

#You can also use sql
delta_table.delete("column == 'some_VALUE'")

Modify Properties

You can refer here for more properties.

dataSkippingNumIndexedCols

You would do this if you have over the max columns that the delta lake can collect statistics on. Default value is 32.

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
skip_cols = <SOME_VALUE>

spark.sql("ALTER TABLE delta.`%s` SET TBLPROPERTIES ('delta.dataSkippingNumIndexedCols' == '%s')" % (path, skip_cols))

 

Databricks: Get Secret

This post is how to get a secret from a key vault in Databricks.

First you need to setup dbutils.

Next you have to make sure your Databricks installation has a Key Vault integrated Scope setup.

Then you need to make sure that Databricks is allowed to communicate with your KeyVault.

Then you can query your Key Vault to get the secret with the following command.

secret = dbutils.secrets.get(scope='<SCOPE>', key='<SECRET_KEY>')

 

Spark Connector Connect to SQL Server

This post is how to use the Spark Connector to Connect to SQL Server.

Install Spark Connector

spark-mssql-connector_2.12-1.2.0.jar

Install msal

pip install msal

Connect using Azure SPN

import msal
global_token_cache = msal.TokenCache()

secret = "<GET SECRET SECURELY>"

global_spn_app = msal.ConfidentialClientApplication(
    <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
    client_credential=secret,
    token_cache=global_token_cache,
)

result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("accessToken", result['access_token']) \
    .option("encrypt", "true") \
    .option("hostNameInCertificate", "*.database.windows.net") \
    .load()

Connect using Domain Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("authentication", "ActiveDirectoryPassword") \
    .option("user", "<USER>@<DOMAIN>") \
    .option("password", "<SECRET>") \
    .load()

Connect using SQL Auth

I do not recommend SQL Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("user", "<USER>") \
    .option("password", "<SECRET>") \
    .load()

 

 

 

Databricks: Notebook SQL

This post is how to work with Databricks SQL through a Notebook.

Create a Temp View of a DataFrame.

df = <SOMETHING>
df.createOrReplaceTempView("<TABLE_NAME>")

Drop a Table

%sql
drop table <SCHEMA>.<TABLE>;

Describe Table

%sql
desc table extended <SCHEMA>.<TABLE>;

Describe Detail

%sql
describe detail <SCHEMA>.<TABLE>;

Show Table Properties

%sql
SHOW TBLPROPERTIES <SCHEMA>.<TABLE>;

Describe History

%sql
describe history <SCHEMA>.<TABLE>;

Create Schema

%sql
CREATE SCHEMA IF NOT EXISTS <SCHEMA>;

Create Parquet Table

%sql
CREATE TABLE <SCHEMA>.<TABLE> USING PARQUET LOCATION 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

Create Delta Table

%sql
CREATE TABLE <SCHEMA>.<TABLE> USING DELTA LOCATION 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

Upsert

MERGE INTO schema.table t \
USING ( \
  SELECT columns \
  FROM table \
) AS source ON (source.column = t.column) \
WHEN NOT MATCHED THEN \
  INSERT ( \
    ( \
      column, column2 \
    ) \
  VALUES ( \
    source.column, source.column2 \
  ) \
WHEN MATCHED THEN \
  UPDATE SET \
    t.column = source.column \

 

Databricks: Mounts

This post is how to mount on Databricks.

Notes

  • Security Issue: They are shared across all clusters and users
  • Should always be unmounted after use
    • Due to Service Prinicpal password rotations
    • Reliability esspecially in BCDR
  • Databricks recommends using Unity Catalog instead of mounts as they are legacy.
  • Could be conflicts in other projects due to naming
  • Do not create mounts manually. Always have your project mount and unmount at the end

List Mounts

dbutils.fs.mounts()

Unmount

dbutils.fs.unmount("<MOUNT>")

Mount

client_id = "<CLIENTID>"
secret = dbutils.secrets.get(scope = "<SCOPE_NAME>", key = "<SECRET_NAME>")
tenant_id = "<TENANT_ID>"
storage_account_name = "<STORAGE_ACCOUNT_NAME>"
container_name = "<CONTAINER_NAME>"

configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": client_id,
  "fs.azure.account.oauth2.client.secret": secret,
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/"  tenant_id + "/oauth2/token"
}

path = "abfss://%s@%s.dfs.core.windows.net/" % (container_name, storage_account_name)

dbutils.fs.mount(
    source = path,
    mount_point = "/mnt/<MOUNT_NAME>",
    extra_configs = configs
)

 

 

 

 

 

Databricks: Notebook Commands

This post is all about notebook commands.

List a directory on DBFS using Shell

%sh
ls /dbfs

List a Directory on DBFS using FS

%fs
ls "<DIRECTORY>"

List Python Packages

%pip list

Install a Python Requirements.txt

%pip install --index <URL> -r requirements.txt

Install a Single Python Package

%pip install --index <URL> <PACKAGE>==<VERSION>

 

Databricks: Bearer Token CLI

This post is how to get the bearer token using the CLI and setting the env variable.

First install Azure CLI.

Databricks Resource ID = 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d

Get Access Token

az account get-access-token --resource 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d

Set Access Token

Linux

export DATABRICKS_AAD_TOKEN="<TOKEN>"

Windows

set DATABRICKS_AAD_TOKEN="<TOKEN>"

Set Config File

Linux

export DATABRICKS_CONFIG_FILE="<LOCATION>"

Windows

set DATABRICKS_CONFIG_FILE="<LOCATION>"

 

Databricks: Rest API

This post is how to communicate with Databricks using Rest API’s.

Databricks Resource ID = 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d

Get Bearer Token for Service Principal

curl -X GET https://login.microsoft.com/<TENANTID>/oauth2/token -H 'Content-Type: application/x-www-form-urlencoded' -d'grant_type=client_credential&client_id=<CLIENTID>&resource=2ff814a6-3304-4ab8-85cb-cd0e6f879c1d&client_secret=<SECRET>

Get Bearer Token for Service Principal Using management.core.windows.net

curl -X GET https://login.microsoftonline.com/<TENANTID>/oauth2/token -H 'Content-Type: application/x-www-form-urlencoded' -d'grant_type=client_credential&client_id=<CLIENTID>&resource=https://management.core.windows.net/&amp;client_secret=<SECRET>'

Start Cluster

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/clusters/start -d '{ "cluster_id": "<CLUSTER_ID>"}'

Stop Cluster

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/clusters/stop -d '{ "cluster_id": "<CLUSTER_ID>"}'

List Clusters

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/clusters/list

Job List

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/list

Job Python Run

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/run-now -d '{"job_id": <JOB_ID>, "python_params": [] }'

Job Get

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/runs/get?run_id=<JOB_RUN_ID>

Create Job

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/create -d '{
  "name": "<JOB_NAME>",
  "new_cluster": {
    "name": "<CLUSTER_NAME>",
    "spark_version": "<SPARK_VERSION>",
    "node_type_id": "<NODE_TYPE>",
    "autoscale": {
      "min_workers": 1,
      "max_workers": 3
    },
    "init_scripts": [
      {
        "dbfs": {
          "destination": "dbfs:/<LOCATION>"
        }
      }
    ],
    "cluster_log_conf": {
      "dbfs": {
        "destination": "dbfs:/mnt/<LOCATION>"
      },
      "spark_env_vars": {
        "<KEY>": "<VALUE>"
      }
    },
    "libraries": [
      {
        "pypi": {
          "package": "<PACKAGE>==<VERSION>"
        }
      }
    ],
    "timeout_seconds": <VALUE>,
    "max_retries: 1,
    "spark_python_task: {
      "python_file": "dbfs:/<SOURCE_LOCATION>",
      "parameters": []
    }
  }
}'

Job Permission Patch

curl --location -g --trace -X --request PATCH -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/permissions/jobs/<JOB_ID> -d '{ "access_control_list": [{ "group_name": "<GROUP_NAME>", "permission_level": "<PERMISSION>"}]}'

Get Service Principal List

curl -X GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/preview/scim/v2/ServicePrincipals

Delete Service Principal List From Databricks ONLY

curl --location -g --trace -X --request DELETE -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/preview/scim/v2/ServicePrincipals/<APPLICATION_ID>

Add Service Principal To Databricks

curl --location --request POST 'https://<DATABRICKS_url>/api/2.0/preview/scim/v2/ServicePrincipals' --header 'Authorization: Bearer <TOKEN>' --header 'Content-Type: application/json' --data-raw '{ "schemas": ["urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal"], "applicationId": "<CLIENTID>", "displayName": "<DISPLAYNAME>", "groups": [{"value": "<GROUP_ID>"}], "entitlements": [{ "value": "allow-cluster-create"}] }'

List Secret Scopes

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/secrets/scopes/list

Create KeyVault Secret Scope

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/secrets/scopes/create -d '{"scope": "<Keyvault_name>", "scope_backend_type": "AZURE_KEYVAULT", "backend_azure_keyvault": {"resource_id": "<RESOURCE_ID>", "dns_name": "<KEYVAULT_URL>"}, "initial_manage_principal": "users"}'

IP Access Lists

curl -X GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/ip-access-lists

List Git Repos

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/repos

Update Git Repo

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/repos/<REPO_ID> -d '{ "branch": "<BRANCH_NAME>" }'

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Databricks: Set Spark Configs

This post is how to set the spark configs on Databricks or Synapse Notebooks.

First you will need a spark session. Refer to PySpark: Create a Spark Session for more details.

secret = 'value' #I highly suggest you get the password from the keyvault
storage_account = ''
application_id = ''
tenant_id = ''

spark.config.set('fs.azure.account.auth.type.{}.dfs.core.windows.net'.format(storage_account), 'OAuth')

spark.config.set('fs.azure.account.oauth.provider.type.{}.dfs.core.windows.net'.format(storage_account), 'org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider')

spark.config.set('fs.azure.account.oauth2.client.id.{}.dfs.core.windows.net'.format(storage_account), application_id)

spark.config.set('fs.azure.account.oauth2.client.secret.{}.dfs.core.windows.net'.format(storage_account), secret)

spark.config.set('fs.azure.account.oauth2.client.endpoint.{}.dfs.core.windows.net'.format(storage_account), 'https://login.microsoftonline.com/{}/oauth2/token'.format(tenant_id))

If you are running in Databricks you could add them to cluster start. Although I recommand doing it in a notebook instead.

spark.hadoop.fs.azure.account.auth.type.<STORAGE_ACCOUNT>.dfs.core.windows.net OAuth
fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT>.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT>.dfs.core.windows.net <CLIENT_ID>
fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT>.dfs.core.windows.net secret
fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT>.dfs.core.windows.net https://login.microsoftonline.com/<TENANT_ID>/oauth2/token

 

 

PySpark: Read From ADLS to DataFrame

This how-to is how to read from ADLS to a DataFrame.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Read a CSV from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
format = 'csv'

#you don't need "header" if it is not CSV

dataframe = spark.read.format(format) \
  .option('header', True) \
  .schema(schema) \
  .load(path)

Read Parquet from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'parquet'

dataframe = spark.read.format(format) \
    .load(path)

Read Delta from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'delta'

dataframe = spark.read.format(format) \
    .load(path)

 

PySpark: Save a DataFrame To ADLS

This how-to is how to save a DataFrame to ADLS

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Then we need to create a DataFrame. See PySpark: Create a DataFrame.

Then we do the following:

You should note you don’t need all the options below. I just gave an example.

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
mode = 'overwrite'
format = 'delta'
partitions = []

df.write.mode(mode).format(format).option('mergeSchema', False).partitionBy(*partitions).save(path)