Synapse: Get Secret

This post is how to get a secret from a key vault in Synapse.

If you have Data Exfiltration enabled (which is recommended) then you need to have a Managed Private Endpoint setup to your KeyVault.

You also need to ensure your Synapse Managed Identity has access to your Key Vault.

You also need a un-parameterized Linked Service Created.

Then you can query your Key Vault to get the secret with the following command.

from notebookutils import mssparkutils

secret = mssparkutils.credentials.getSecret('<KEY_VAULT_NAME>', '<SECRET_KEY>', '<LINKED_SERVICE_KEYVAULT_NAME>')

 

Databricks: Get Secret

This post is how to get a secret from a key vault in Databricks.

First you need to setup dbutils.

Next you have to make sure your Databricks installation has a Key Vault integrated Scope setup.

Then you need to make sure that Databricks is allowed to communicate with your KeyVault.

Then you can query your Key Vault to get the secret with the following command.

secret = dbutils.secrets.get(scope='<SCOPE>', key='<SECRET_KEY>')

 

Spark Connector Connect to SQL Server

This post is how to use the Spark Connector to Connect to SQL Server.

Install Spark Connector

spark-mssql-connector_2.12-1.2.0.jar

Install msal

pip install msal

Connect using Azure SPN

import msal
global_token_cache = msal.TokenCache()

secret = "<GET SECRET SECURELY>"

global_spn_app = msal.ConfidentialClientApplication(
    <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
    client_credential=secret,
    token_cache=global_token_cache,
)

result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("accessToken", result['access_token']) \
    .option("encrypt", "true") \
    .option("hostNameInCertificate", "*.database.windows.net") \
    .load()

Connect using Domain Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("authentication", "ActiveDirectoryPassword") \
    .option("user", "<USER>@<DOMAIN>") \
    .option("password", "<SECRET>") \
    .load()

Connect using SQL Auth

I do not recommend SQL Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("user", "<USER>") \
    .option("password", "<SECRET>") \
    .load()

 

 

 

Python: pyodbc with SQL Server

This post is in regards to connecting to SQL Server using pyodbc.

Install package

pip install pyodbc

If you are running in Databricks then the current driver will be “{ODBC Driver 17 for SQL Server}”.

If you are running in Synapse then the current driver will be “{ODBC Driver 18 for SQL Server}”.

Check pyodbc Version

import pyodbc
pyodbc.drivers()

Check Which Version of pyodbc in Databricks

%sh
cat /etc/odbcinst.ini

Install Databricks driver 17

curl https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
curl https://packages.microsoft.com/config/ubuntu/20.04/prod.list > /etc/apt/sources.list.d/mssql-release.list
apt-get update
ACCEPT_EULA=Y apt-get install msodbcsql17
apt-get -y install unixodbc-dev

Connect using SQL Auth

I do not recommend SQL Auth

import pyodbc

secret = "<GET SECRET SECURELY>"

connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;')

Connect Using Domain Auth

import pyodbc

secret = "<GET SECRET SECURELY>"

connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;Authentication=ActiveDirectoryPassword')

Connect using Azure SPN

pip install msal
import struct
import msal

global_token_cache = msal.TokenCache()
secret = "<GET SECRET SECURELY>"

global_spn_app = msal.ConfidentialClientApplication(
    <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
    client_credential=secret,
    token_cache=global_token_cache,
)

result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])
SQL_COPT_SS_ACCESS_TOKEN = 1256

token = bytes(result['access_token'], 'utf-8')
exptoken = b"";

for i in token:
    exptoken += bytes({i});
    exptoken += bytes(1);

token_struct = struct.pack("=i", len(exptoken)) + exptoken;

connection = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};Server=tcp:<SERVER_NAME>;PORT=<PORT>;Database=<DATABASE>;Uid=<USER>;Pwd=<SECRET>;Encrypt=yes;TrustServerCertificate=no;Connection Timeout=<TIMEOUT>;' attrs_before = { SQL_COPT_SS_ACCESS_TOKEN:tokenstruct })

Once you have the connection you can setup the cursor.

cursor = connection.cursor()

Then execute a command

command = "<COMMAND>"
params = ()
cursor.execute(command, params)
connection.commit()

After you Are finish Close

cursor.close()
connection.close()

 

Python: Arguments

This post is in how do use argparse package.

First you must import the package.

import argparse

Next you setup the argument parser.

parser = argparse.ArgumentParser()

Then you create a list of arguments. See the link above for more options then the below set.

argument_list = [
    { "name": "<NAME>", "help": "<HELP_TEXT>", "type": "<TYPE>", "required": True}
]

Then we take your argument_list and create arguments and assign them to the parser.

for arg in argument_list:
    parser.add_argument("--{}".format(arg["name"], help=arg["help"], type=arg["type"], required=arg["required"])

Then we parse the args from “sys.argv”. Parsing args this way means that if anything is unknown to your program than your program won’t fail but instead it will set those variables to the unknown variable and continue your application.

args, unknown = parser.parse_known_args()

You could also parse the args from “sys.argv” this way. However that means that all the args passed to sys.argv must be known otherwise it will fail.

args = parser.parse_args()

Then as a final step we set the values with their key to the config.

config = vars(args)

 

 

 

Azure: Python SDK

This post is how to use the Azure Python SDK.

If you are using Databricks you can get the secret by using the following Databricks: Get Secret

If you are using Synapse you can get the secret by using the following Synapse: Get Secret

Package Installations

pip install azure-identity
pip install azure-storage-file
pip install azure-storage-file-datalake

Setup Credentials

Service Principal

from azure.common.credentials import ServicePrincipalCredentials
secret = "<GET_SECRET_SECURELY>"
credential = ServicePrincipalCredential("<SPN_CLIENT_ID>", secret, tenant="<TENANT_ID>")

Token Credential

from azure.identity import ClientSecretCredential
secret = "<GET_SECRET_SECURELY>"
token_credential = ClientSecretCredential("<TENANT_ID>", "<SPN_CLIENT_ID>", secret)

Subscription Client

Client

from azure.mgmt.resource import SubscriptionClient
subscription_client = SubscriptionClient(credential)

Get List

subscriptions = subscription_client.subscriptions.list()
for subscription in subscriptions:
    print(subscription.display_name)

Storage Account

Client

from azure.mgmt.storage import StorageManagementClient
storage_client = StorageManagementClient(credential, "<SUBSCRIPTION_ID>")

Get List by Resource Group

storage_accounts = storage_client.storage_accounts.list_by_resource_group("<RESOURCE_GROUP_NAME>")
for sa in storage_accounts:
    print(sa.name)

List Containers in Storage Account

containers = storage_client.blob_containers.list("<RESOURCE_GROUP_NAME>", sa.name)

Containers

Client

from azure.storage.blob import ContainerClient
account_url_blob = f"https://{sa.name}.blob.core.windows.net"
container_client = ContainerClient.from_container_url(
    container_url=account_url_blob + "/" + container.name,
    credential=token_credential
)

Get Container Properties

container_client.get_container_properties()

List Blobs

for b in container_client.list_blobs():
    print(b)

Data Lake Service

Client

from azure.storage.filedatalake import DataLakeServiceClient
storage_account_url_dfs = f"https://{sa.name}.df.core.windows.net"
data_lake_service_client = DataLakeServiceClient(storage_account_url_dfs, token_credential)

DataLake Directory

from azure.storage.filedatalake import DataLakeDirectoryClient
data_lake_directory_client = DataLakeDirectoryClient(account_url=account_url_dfs, credential=credential)

FileSystem

Client

file_system_client = data_lake_service_client.get_file_system_client(file_system="<CONTAINER_NAME>")

Get Directory Client

directory_client = file_system_client.get_directory_client("<CONTAINER_SUB_FOLDER>")

Get Directory Access Control

acl_props = directory_client.get_access_control()

Microsoft Graph Client

Package Installations

pip install msgraph-sdk
pip install msrestazure
pip install azure-identity

Credential

from azure.identity.aio import ClientSecretCredential

secret = "<GET_SECRET_SECURELY>"
credential = ClientSecretCredential('<TENANT_ID>', '<CLIENT_ID>', secret)

Client

from msgraph import GraphServiceClient

def create_session(credential):
  scopes = ['https://graph.microsoft.com/.default']
  graph_client = GraphServiceClient(credential, scopes)
  return graph_client

graph_client = create_session(credential)

Get Groups

#This will only get you the first 100 groups. If you have more then you need to check again
groups = await graph_client.groups.get()
print(len(groups))

while groups is not None and groups.odata_next_link is not None:
  groups = await graph_client.groups.with_url(groups.odata_next_link).get()
  print(len(groups))

Get Group Members

id = '<GROUP_ID>'
group_members = await graph_client.groups.by_group_id(id).members.get()

 

Databricks: Notebook SQL

This post is how to work with Databricks SQL through a Notebook.

Create a Temp View of a DataFrame.

df = <SOMETHING>
df.createOrReplaceTempView("<TABLE_NAME>")

Drop a Table

%sql
drop table <SCHEMA>.<TABLE>;

Describe Table

%sql
desc table extended <SCHEMA>.<TABLE>;

Describe Detail

%sql
describe detail <SCHEMA>.<TABLE>;

Show Table Properties

%sql
SHOW TBLPROPERTIES <SCHEMA>.<TABLE>;

Describe History

%sql
describe history <SCHEMA>.<TABLE>;

Create Schema

%sql
CREATE SCHEMA IF NOT EXISTS <SCHEMA>;

Create Parquet Table

%sql
CREATE TABLE <SCHEMA>.<TABLE> USING PARQUET LOCATION 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

Create Delta Table

%sql
CREATE TABLE <SCHEMA>.<TABLE> USING DELTA LOCATION 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

Upsert

MERGE INTO schema.table t \
USING ( \
  SELECT columns \
  FROM table \
) AS source ON (source.column = t.column) \
WHEN NOT MATCHED THEN \
  INSERT ( \
    ( \
      column, column2 \
    ) \
  VALUES ( \
    source.column, source.column2 \
  ) \
WHEN MATCHED THEN \
  UPDATE SET \
    t.column = source.column \

 

Databricks: Mounts

This post is how to mount on Databricks.

Notes

  • Security Issue: They are shared across all clusters and users
  • Should always be unmounted after use
    • Due to Service Prinicpal password rotations
    • Reliability esspecially in BCDR
  • Databricks recommends using Unity Catalog instead of mounts as they are legacy.
  • Could be conflicts in other projects due to naming
  • Do not create mounts manually. Always have your project mount and unmount at the end

List Mounts

dbutils.fs.mounts()

Unmount

dbutils.fs.unmount("<MOUNT>")

Mount

client_id = "<CLIENTID>"
secret = dbutils.secrets.get(scope = "<SCOPE_NAME>", key = "<SECRET_NAME>")
tenant_id = "<TENANT_ID>"
storage_account_name = "<STORAGE_ACCOUNT_NAME>"
container_name = "<CONTAINER_NAME>"

configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": client_id,
  "fs.azure.account.oauth2.client.secret": secret,
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/"  tenant_id + "/oauth2/token"
}

path = "abfss://%s@%s.dfs.core.windows.net/" % (container_name, storage_account_name)

dbutils.fs.mount(
    source = path,
    mount_point = "/mnt/<MOUNT_NAME>",
    extra_configs = configs
)

 

 

 

 

 

Databricks: Notebook Commands

This post is all about notebook commands.

List a directory on DBFS using Shell

%sh
ls /dbfs

List a Directory on DBFS using FS

%fs
ls "<DIRECTORY>"

List Python Packages

%pip list

Install a Python Requirements.txt

%pip install --index <URL> -r requirements.txt

Install a Single Python Package

%pip install --index <URL> <PACKAGE>==<VERSION>

 

Databricks: Bearer Token CLI

This post is how to get the bearer token using the CLI and setting the env variable.

First install Azure CLI.

Databricks Resource ID = 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d

Get Access Token

az account get-access-token --resource 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d

Set Access Token

Linux

export DATABRICKS_AAD_TOKEN="<TOKEN>"

Windows

set DATABRICKS_AAD_TOKEN="<TOKEN>"

Set Config File

Linux

export DATABRICKS_CONFIG_FILE="<LOCATION>"

Windows

set DATABRICKS_CONFIG_FILE="<LOCATION>"

 

Databricks: Rest API

This post is how to communicate with Databricks using Rest API’s.

Databricks Resource ID = 2ff814a6-3304-4ab8-85cb-cd0e6f879c1d

Get Bearer Token for Service Principal

curl -X GET https://login.microsoft.com/<TENANTID>/oauth2/token -H 'Content-Type: application/x-www-form-urlencoded' -d'grant_type=client_credential&client_id=<CLIENTID>&resource=2ff814a6-3304-4ab8-85cb-cd0e6f879c1d&client_secret=<SECRET>

Get Bearer Token for Service Principal Using management.core.windows.net

curl -X GET https://login.microsoftonline.com/<TENANTID>/oauth2/token -H 'Content-Type: application/x-www-form-urlencoded' -d'grant_type=client_credential&client_id=<CLIENTID>&resource=https://management.core.windows.net/&amp;client_secret=<SECRET>'

Start Cluster

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/clusters/start -d '{ "cluster_id": "<CLUSTER_ID>"}'

Stop Cluster

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/clusters/stop -d '{ "cluster_id": "<CLUSTER_ID>"}'

List Clusters

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/clusters/list

Job List

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/list

Job Python Run

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/run-now -d '{"job_id": <JOB_ID>, "python_params": [] }'

Job Get

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/runs/get?run_id=<JOB_RUN_ID>

Create Job

Databricks Create Job

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/jobs/create -d '<PAYLOAD>'

Create Job Payload

{
	"name": "<NAME>",
	"max_concurrent_runs": 1,
	"tasks": [
		{
			"task_key": "<TASK_KEY>",
			"run_if": "ALL_SUCCESS",
			"max_retries": 1,
			"timeout_seconds": <TIMEOUT_SECONDS>, 
			"notebook_tasks": {
				"notebook_path": "<PATH>",
				"source": "WORKSPACE",
				"base_parameters": {
					"<KEY>": "<VALUE>",
					"<KEY2>": "<VALUE2>",
				}
			},
			"libraries": [
				{
					"pypi": {
						"package": "<PACKAGE_NAME==VERSION>",
						"coordinates": ""
					}
				},
				{
					"jar": "<LOCATION>"
				}
			],
			"new_cluster": {
				"custom_tags": {
					"<TAG_NAME>": "<TAG_VALUE>"
				},
				"azure_attributes": {
					"first_on_demand": 1,
					"availability": "SPOT_AZURE",
					"spot_bid_max_price": 75
				},
				"instance_pool_id": "<WORKER_INSTANCE_POOL_ID>",
				"driver_instances_pool_id": "<DRIVER_INSTANCE_POOL_ID>",
				"data_security_mode": "SINGLE_USER",
				"spark_version": "<SPARK_VERSION>",
				"node_type_id": "<NODE_TYPE_ID>", 
				"runtime_engine": "STANDARD",
				"policy_id": "<POLICY_ID>",
				"autoscale": {
					"min_workers": <MIN_WORKERS>,
					"max_workers": <MAX_WORKERS>
				},
				"spark_conf": {
					"<CONFIG_KEY>": "<CONFIG_VALUE>"
				},
				"cluster_log_conf": {
					"dbfs": {
						"destination": "<LOG_DESTINATION>"
					}
				},
				"spark_env_vars": {
					"<ENV_NAME>": "<ENV_VALUE>"
				},
				"init_scripts": [
					{
						"volumes": {
							"destination": "<INIT_SCRIPT_LOCATION>"
						}
					}
				]
			}
		}
	],
	"format": "SINGLE_TASK"
}

Job Permission Patch

curl --location -g --trace -X --request PATCH -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/permissions/jobs/<JOB_ID> -d '{ "access_control_list": [{ "group_name": "<GROUP_NAME>", "permission_level": "<PERMISSION>"}]}'

Get Service Principal List

curl -X GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/preview/scim/v2/ServicePrincipals

Delete Service Principal List From Databricks ONLY

curl --location -g --trace -X --request DELETE -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/preview/scim/v2/ServicePrincipals/<APPLICATION_ID>

Add Service Principal To Databricks

curl --location --request POST 'https://<DATABRICKS_url>/api/2.0/preview/scim/v2/ServicePrincipals' --header 'Authorization: Bearer <TOKEN>' --header 'Content-Type: application/json' --data-raw '{ "schemas": ["urn:ietf:params:scim:schemas:core:2.0:ServicePrincipal"], "applicationId": "<CLIENTID>", "displayName": "<DISPLAYNAME>", "groups": [{"value": "<GROUP_ID>"}], "entitlements": [{ "value": "allow-cluster-create"}] }'

List Secret Scopes

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/secrets/scopes/list

Create KeyVault Secret Scope

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/secrets/scopes/create -d '{"scope": "<Keyvault_name>", "scope_backend_type": "AZURE_KEYVAULT", "backend_azure_keyvault": {"resource_id": "<RESOURCE_ID>", "dns_name": "<KEYVAULT_URL>"}, "initial_manage_principal": "users"}'

IP Access Lists

curl -X GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/ip-access-lists

List Git Repos

curl --location -g --trace -X --request GET -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/repos

Update Git Repo

curl --location -g --trace -X --request POST -H 'Authorization: Bearer <TOKEN>' https://<DATABRICKS_url>/api/2.0/repos/<REPO_ID> -d '{ "branch": "<BRANCH_NAME>" }'

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Python: lxml

This post focus’ on the lxml package.

First you need to install the package

from lxml import etree

Create xml object by string

xml_str = "<root><subitem attr='test'>rec</subitem></root>"
root = etree.fromstring(xml_str)

Get text in node

text_str = root.xpath('//root/subitem/text()')[0]

Get Attribute

attr = root.xpath('//root/subitem')[0].attrib['attr']

 

Azure: Install/Configure CLI

This post will show you how to install the Azure CLI.

First you need to install the CLI.

Once it is installed you can set your config directory. This is useful for having multiple logins going at the same time.

set AZURE_CONFIG_DIR=<YOUR_DIRECTORY>

You can then login. There are different ways to do that

Way 1: This will popup a login where you enter your login credentials

az login

Way 2: This will ask you for password via the command line

az login -u <YOUR_LOGIN>

Way 3:

az login -u <YOUR_LOGIN> -p <YOUR_PASSWORD>

Way 4: logs in as a service principal

az login --service-principal --user-name <SPN_ID> --password <SPN_KEY> --tenant <TENANTID>

Show your Account

az account show

Set Account Subscription

az account set -s <SUBSCRIPTION_ID>

List Tags For A Resource

az tag list --subscription <SUBSCRIPTION_NAME>

Install Graph

az extension add --name resource-graph

Query for Anything that Has a Tag

az graph query -q "resourceGraoup, type, tags" | where tags.<TAG_NAME>=~'<VALUE>'

Query for More than One Tag

az graph query -q "resourceGraoup, type, tags" | where tags.<TAG_NAME>=~'<VALUE>' | tags.<TAG_NAME>=='<VALUE>'

Query Type

az graph query -q "resourceGroup, type, tags" | where type =~ 'microsoft.sql/servers/databases'

 

Python: Create a Logger

This post is how-to create a logger.

First we need to import

import sys
import logging
from datetime import datetime
from pytz import timezone

Then we create a class for Formatter

class CustomFormatter(logging.Formatter):
    grey = "\x1b[38;20m"
    reset = "\x1b[0m"
    format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s (%(filename)s:)"
    FORMATS = {
        logging.DEBUG: '\x1b[38;5;23m' + format + reset,
        logging.INFO: grey + format + reset,
        logging.WARNING: '\x1b[38;5;56m' + format + reset,
        logging.ERROR: '\x1b[38;5;197m' + format + reset,
        logging.CRITICAL: '\x1b[38;5;1m' + format +reset
    }

    def format(self, record):
        log_fmt = self.FORMATS.get(record.levelno)
        formatter = logging.Formatter(log_fmt)
        return formatter.format(record)

Then we create a function set our logger up.

def set_logger(logging_level, name, log_dir, timezone):
    LOGGING_LEVELS = ['WARNING','INFO','DEBUG','ERROR']
    if logging_level not in LOGGING_LEVELS:
        logging_level = 'INFO'

    level_lookup = {
        'WARNING': logging.WARNING,
        'INFO': logging.INFO,
        'DEBUG': logging.DEBUG,
        'ERROR': logging.ERROR,
    }
    logging.Formatter.converter = lambda *args: datetime.now(tz=timezone(timezone)).timetuple()
    logging.basicConfig(level=level_lookup[logging_level], format="[%(levelname)s] %(asctime)s - %(message)s:%(lineno)d")
    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setFormatter(CustomFormatter())
    logger = logging.getLogger(name)
    logger.addHandler(stream_handler)
    logger.setLevel(logging_level)

    Path(log_dir).mkdir(parents=True, exist_ok=True)

    now = datetime.now(tz=timezone(timezone))
    now = now.strftime("%H-%M-%S")

    log_file = '%slog_%s.log' % (log_dir, now)
    file_handler = logging.FileHandler(log_file, mode='a')
    file_handler.setFormatter(logging.Formatter("[%(levelname)s] %(asctime)s - %(message)s:%(lineno)d"))
    logger.addHandler(file_handler)

    return logger

References

https://alexandra-zaharia.github.io/posts/make-your-own-custom-color-formatter-with-python-logging/

Databricks: Set Spark Configs

This post is how to set the spark configs on Databricks or Synapse Notebooks.

First you will need a spark session. Refer to PySpark: Create a Spark Session for more details.

secret = 'value' #I highly suggest you get the password from the keyvault
storage_account = ''
application_id = ''
tenant_id = ''

spark.config.set('fs.azure.account.auth.type.{}.dfs.core.windows.net'.format(storage_account), 'OAuth')

spark.config.set('fs.azure.account.oauth.provider.type.{}.dfs.core.windows.net'.format(storage_account), 'org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider')

spark.config.set('fs.azure.account.oauth2.client.id.{}.dfs.core.windows.net'.format(storage_account), application_id)

spark.config.set('fs.azure.account.oauth2.client.secret.{}.dfs.core.windows.net'.format(storage_account), secret)

spark.config.set('fs.azure.account.oauth2.client.endpoint.{}.dfs.core.windows.net'.format(storage_account), 'https://login.microsoftonline.com/{}/oauth2/token'.format(tenant_id))

If you are running in Databricks you could add them to cluster start. Although I recommand doing it in a notebook instead.

spark.hadoop.fs.azure.account.auth.type.<STORAGE_ACCOUNT>.dfs.core.windows.net OAuth
fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT>.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT>.dfs.core.windows.net <CLIENT_ID>
fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT>.dfs.core.windows.net secret
fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT>.dfs.core.windows.net https://login.microsoftonline.com/<TENANT_ID>/oauth2/token

 

 

PySpark: Read From ADLS to DataFrame

This how-to is how to read from ADLS to a DataFrame.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Read a CSV from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
format = 'csv'

#you don't need "header" if it is not CSV

dataframe = spark.read.format(format) \
  .option('header', True) \
  .schema(schema) \
  .load(path)

Read Parquet from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'parquet'

dataframe = spark.read.format(format) \
    .load(path)

Read Delta from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'delta'

dataframe = spark.read.format(format) \
    .load(path)