PySpark: Delta Lake

This post is how to use pyspark to work with Delta Tables.

For more information on Delta Lake you can refer here.

First you need to install the “delta-spark” package for whatever version you require.

pip install delta-spark==3.1.0

Setup a Spark Session.

To read delta tables you can refer to PySpark: Read From ADLS to DataFrame.

To write delta tables you can refer to PySpark: Save a DataFrame To ADLS.

Vacuum Delta Table

from delta.tables import DeltaTable

vacuum_hrs = 100
path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

delta_table = DeltaTable.forPath(spark, path)
delta_table.vacuum(vacuum_hrs)

Compaction

Impoves reads by merging small files into larger ones.

from delta.tables import DeltaTable

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 

delta_table = DeltaTable.forPath(spark, path)
delta_table.optimize().executeCompaction()

Z-Order

from delta.tables import DeltaTable

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 
columns = ''

delta_table = DeltaTable.forPath(spark, path)
delta_table.optimize().executeZOrderBy(columns)

Delete

from delta.tables import DeltaTable
import pyspark.sql.functions as F

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 

delta_table = DeltaTable.forPath(spark, path)
delta_table.delete(F.col('<MY_COL>') == '<SOME_VAL>')

#You can also use sql
delta_table.delete("column == 'some_VALUE'")

Modify Properties

You can refer here for more properties.

dataSkippingNumIndexedCols

You would do this if you have over the max columns that the delta lake can collect statistics on. Default value is 32.

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
skip_cols = <SOME_VALUE>

spark.sql("ALTER TABLE delta.`%s` SET TBLPROPERTIES ('delta.dataSkippingNumIndexedCols' == '%s')" % (path, skip_cols))

 

Spark Connector Connect to SQL Server

This post is how to use the Spark Connector to Connect to SQL Server.

Install Spark Connector

spark-mssql-connector_2.12-1.2.0.jar

Install msal

pip install msal

Connect using Azure SPN

import msal
global_token_cache = msal.TokenCache()

secret = "<GET SECRET SECURELY>"

global_spn_app = msal.ConfidentialClientApplication(
    <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
    client_credential=secret,
    token_cache=global_token_cache,
)

result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("accessToken", result['access_token']) \
    .option("encrypt", "true") \
    .option("hostNameInCertificate", "*.database.windows.net") \
    .load()

Connect using Domain Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("authentication", "ActiveDirectoryPassword") \
    .option("user", "<USER>@<DOMAIN>") \
    .option("password", "<SECRET>") \
    .load()

Connect using SQL Auth

I do not recommend SQL Auth

secret = "<GET SECRET SECURELY>"

jdbc_df = spark.read \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
    .option("query", "SELECT * FROM SOMETHING") \
    .option("user", "<USER>") \
    .option("password", "<SECRET>") \
    .load()

 

 

 

Databricks: Set Spark Configs

This post is how to set the spark configs on Databricks or Synapse Notebooks.

First you will need a spark session. Refer to PySpark: Create a Spark Session for more details.

secret = 'value' #I highly suggest you get the password from the keyvault
storage_account = ''
application_id = ''
tenant_id = ''

spark.config.set('fs.azure.account.auth.type.{}.dfs.core.windows.net'.format(storage_account), 'OAuth')

spark.config.set('fs.azure.account.oauth.provider.type.{}.dfs.core.windows.net'.format(storage_account), 'org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider')

spark.config.set('fs.azure.account.oauth2.client.id.{}.dfs.core.windows.net'.format(storage_account), application_id)

spark.config.set('fs.azure.account.oauth2.client.secret.{}.dfs.core.windows.net'.format(storage_account), secret)

spark.config.set('fs.azure.account.oauth2.client.endpoint.{}.dfs.core.windows.net'.format(storage_account), 'https://login.microsoftonline.com/{}/oauth2/token'.format(tenant_id))

If you are running in Databricks you could add them to cluster start. Although I recommand doing it in a notebook instead.

spark.hadoop.fs.azure.account.auth.type.<STORAGE_ACCOUNT>.dfs.core.windows.net OAuth
fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT>.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT>.dfs.core.windows.net <CLIENT_ID>
fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT>.dfs.core.windows.net secret
fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT>.dfs.core.windows.net https://login.microsoftonline.com/<TENANT_ID>/oauth2/token

 

 

PySpark: Read From ADLS to DataFrame

This how-to is how to read from ADLS to a DataFrame.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Read a CSV from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
format = 'csv'

#you don't need "header" if it is not CSV

dataframe = spark.read.format(format) \
  .option('header', True) \
  .schema(schema) \
  .load(path)

Read Parquet from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'parquet'

dataframe = spark.read.format(format) \
    .load(path)

Read Delta from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'delta'

dataframe = spark.read.format(format) \
    .load(path)

 

PySpark: Save a DataFrame To ADLS

This how-to is how to save a DataFrame to ADLS

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Then we need to create a DataFrame. See PySpark: Create a DataFrame.

Then we do the following:

You should note you don’t need all the options below. I just gave an example.

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
mode = 'overwrite'
format = 'delta'
partitions = []

df.write.mode(mode).format(format).option('mergeSchema', False).partitionBy(*partitions).save(path)

 

 

 

 

 

 

PySpark: Create a Spark Session

This post is how to create a Spark Session

Imports

from pyspark.sql import SparkSession

Create the Spark Session

spark = SparkSession.builder.appName('pyspark_app_name').getOrCreate()

You can add any configs you wish during creation. You would add this before the “.getOrCreate()”.

You can see a list here

  • .config(“spark.sql.jsonGenerator.ignoreNullFields”, “false”)
    • When reading JSON you will not ignore NULL fields
  • .config(“spark.sql.parquet.int96RebaseModeInWrite”, “CORRECTED”)
    • Fixes issues in timestamps in write operations
  • .config(“spark.sql.parquet.int96RebaseModeInRead”, “CORRECTED”)
    • Fixes issues in timestamps in read operations

 

PySpark: Create a DataFrame

This post is how to create a DataFrame in pyspark.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Next we need to import

from pyspark.sql import Row
from pyspark.sql.types import StringType, DecimalType, TimestampType, FloatType, IntegerType, LongType, StructField, StructType

Then you create the schema

schema = StructType([
    StructField('id', IntegerType()),
    .....
])

data = [Row(id=1)]

Create the DataFrame

df = spark.createDataFrame(data, schema=schema)

If you want to use a JSON file to build your schema do the following

import json
from pyspark.sql.types import StructType

data = {
    "fields": [
        {
            "metadata": {},
            "name": "column_a",
            "nullable": false,
            "type": "string"
        }
    ],
    "type": "struct"
}

json_schema = json.loads(data)
table_schema = StructType.fromJson(dict(json_schema))

df = spark.createDataFrame(data, schema=table_schema)

 

Python: Unit Testing

This post focus’ on common hurdles when trying to do unit testing.

Testing Values During Run

You add the following line to anywhere you want to pause the unit test to check values.

import pdb
pdb.set_trace()

How to Patch a Function

from unittest.mock import path

@patch('src.path.to.file.my_function')
@path('src.path.to.file.my_function_add')
def test_some_function(mock_my_function_add, mock_my_function):
    mock_function_add.return_value = <something>
    .......

How to Patch a Function With No Return Value

from unittest.mock import patch

def test_some_function():
    with(patch('src.path.to.file.my_function'):
        ...

How to Patch a Function With 1 Return Value

from unittest.mock import patch

def test_some_function():
    with(patch('src.path.to.file.my_function', MagicMock(return_value=[<MY_VALUES>])):
        ...

How to Patch a Function With Multiple Return Value

from unittest.mock import patch

def test_some_function():
    with(patch('src.path.to.file.my_function', MagicMock(side-effect=[[<MY_VALUES>], [<OTHER_VALUES>]])):
        ...

How to Create a Test Module

from unittest import TestCase

class MyModule(TestCase):
    def setUp(self):
        some_class.my_variable = <something>
        ... DO OTHER STUFF
    def test_my_function(self):
        ... DO Function Test Stuff

How to Patch a Method

patch_methods = [
    "pyodbc.connect"
]

for method in patch_methods:
    patch(method).start()

How to create a PySpark Session

Now once you do this you can just call spark and it will set it.

import pytest
from pyspark.sql import SparkSession

@pytest.fixture(scope='module')
def spark():
    return (SparkSession.builder.appName('pyspark_test').getOrCreate())

How to Create a Spark SQL Example

import pytest
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType

@pytest.fixture(scope='module')
def spark():
    return (SparkSession.builder.appName('pyspark_test').getOrCreate())

def test_function(spark):
    query = 'SELECT * FROM SOMETHING'
    schema = StructType([
        StructField('column_a', StringType()),
        StructField('column_b', StringType()),
        StructField('column_c', StringType()),
    ])

data = [Row(column_a='a', column_b='b', column_c='c')]
table = spark.createDataFrame(data, schema=schema)
table.createOrReplaceTempView('<table_name>')
df = spark.sql(query).toPandas()

assert not df.empty
assert df.shape[0] == 1
assert df.shape(1) == 5

spark.catalog.dropTempView('<table_name>')

How to Mock a Database Call

First let’s assume you have an exeucte sql function

def execute_sql(cursor, sql, params):
    result = cursor.execute(sql, params).fetchone()
    connection.commit()
    return result

Next in your unit tests you want to test that funciton

def test_execute_sql():
    val = <YOUR_RETURN_VALUE>
    with patch('path.to.code.execute_sql', MagicMock(return_value=val)) as mock_execute:
        return_val = some_other_function_that_calls_execute_sql(....)
        assert return_val == val

If you need to close a cursor or DB connection

def test_execute_sql():
    val = <YOUR_RETURN_VALUE>
    mock_cursor = MagicMock()
    mock_cursor.configure_mock(
        **{
              "close": MagicMock()
         }
    )
    mock_connection = MagicMock()
    mock_connection.configure_mock(
        **{
            "close": MagicMock()
        }
    )

    with patch('path.to.code.cursor', MagicMock(return_value=mock_cursor)) as mock_cursor_close:
        with patch('path.to.code.connection', MagicMock(return_value=mock_connection)) as mock_connection_close:
            return_val = some_other_function_that_calls_execute_sql(....)
            assert return_val == val

How to Mock Open a File Example 1

@patch('builtins.open", new_callable=mock_open, read_data='my_data')
def test_file_open(mock_file):
    assert open("my/file/path/filename.extension").read() == 'my_data'
    mock_file.assert_called_with("my/file/path/filename.extension")

    val = function_to_test(....)
    assert 'my_data' == val

How to Mock Open a File Example 2

def test_file_open():
    fake_file_path = 'file/path/to/mock'
    file_content_mock = 'test'
    with patch('path.to.code.function'.format(__name__), new=mock_open(read_data=file_content_mock)) as mock_file:
        with patch(os.utime') as mock_utime:
            actual = function_to_test(fake_file_path)
            mock_file.assert_called_once_with(fake_file_path)
            assertIsNotNone(actual)

Compare DataFrames

def as_dicts(df):
    df = [row.asDict() for row in df.collect()]
    return sorted(df, key=lambda row: str(row))

assert as_dicts(df1) == as_dicts(df2)

Spark Installation on Hadoop

In this tutorial I will show you how to use Kerberos/SSL with Spark integrated with Yarn. I will use self signed certs for this example. Before you begin ensure you have installed Kerberos Server and Hadoop.

This assumes your hostname is “hadoop”

Create Kerberos Principals

cd /etc/security/keytabs/

sudo kadmin.local

#You can list princepals
listprincs

#Create the following principals
addprinc -randkey spark/hadoop@REALM.CA

#Create the keytab files.
#You will need these for Hadoop to be able to login
xst -k spark.service.keytab spark/hadoop@REALM.CA

Set Keytab Permissions/Ownership

sudo chown root:hadoopuser /etc/security/keytabs/*
sudo chmod 750 /etc/security/keytabs/*

Download

Go to Apache Spark Download and get the link for Spark.

wget http://apache.forsale.plus/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
tar -xvf spark-2.4.4-bin-hadoop2.7.tgz
mv spark-2.4.4-bin-hadoop2.7 /usr/local/spark/

Update .bashrc

sudo nano ~/.bashrc

#Ensure we have the following in the Hadoop section
export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop

#Add the following

#SPARK VARIABLES START
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin
export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native:$LD_LIBRARY_PATH
#SPARK VARIABLES STOP

source ~/.bashrc

Setup Configuration

cd /usr/local/spark/conf
mv spark-defaults.conf.template spark-defaults.conf
nano spark-defaults.conf

#Add to the end
spark.master                            yarn
spark.yarn.historyServer.address        ${hadoopconf-yarn.resourcemanager.hostname}:18080
spark.yarn.keytab                       /etc/security/keytabs/spark.service.keytab
spark.yarn.principal                    spark/hadoop@REALM.CA
spark.yarn.access.hadoopFileSystems     hdfs://NAMENODE:54310
spark.authenticate                      true
spark.driver.bindAddress                0.0.0.0
spark.authenticate.enableSaslEncryption true
spark.eventLog.enabled                  true
spark.eventLog.dir                      hdfs://NAMENODE:54310/user/spark/applicationHistory
spark.history.fs.logDirectory           hdfs://NAMENODE:54310/user/spark/applicationHistory
spark.history.fs.update.interval        10s
spark.history.ui.port                   18080

#SSL
spark.ssl.enabled                       true
spark.ssl.keyPassword                   PASSWORD
spark.ssl.keyStore                      /etc/security/serverKeys/keystore.jks
spark.ssl.keyStorePassword              PASSWORD
spark.ssl.keyStoreType                  JKS
spark.ssl.trustStore                    /etc/security/serverKeys/truststore.jks
spark.ssl.trustStorePassword            PASSWORD
spark.ssl.trustStoreType                JKS

Kinit

kinit -kt /etc/security/keytabs/spark.service.keytab spark/hadoop@REALM.CA
klist
hdfs dfs -mkdir /user/spark/
hdfs dfs -mkdir /user/spark/applicationHistory
hdfs dfs -ls /user/spark

Start The Service

$SPARK_HOME/sbin/start-history-server.sh

Stop The Service

$SPARK_HOME/sbin/stop-history-server.sh

Spark History Server Web UI

References

I used a lot of different resources and reference material on this. Below are just a few I used.

https://spark.apache.org/docs/latest/running-on-yarn.html#configuration

https://spark.apache.org/docs/latest/security.html

https://www.linode.com/docs/databases/hadoop/install-configure-run-spark-on-top-of-hadoop-yarn-cluster/

 

 

 

 

PySpark: StandAlone Installation on Windows

This tutorial will guide you through installing PySpark StandAlone on Windows for development.

Install Python 3

You need to have python 3 installed. You can get it here.

Install Spark

Go to Apache Spark and download the latest version and package “Pre-built for Apache Hadoop 2.7 and later”. Download spark-2.4.1-bin-hadoop2.7.tgz.

Install 7 Zip

You will need 7 Zip to open spark-2.4.1-bin-hadoop2.7.tgz.

Install Java

You need to ensure you have Java 8 install.

Extract Spark

Once you have installed 7 Zip you can extract spark into C:\spark\ directory. The directory structure will look like this c:\spark\spark-2.4.1-bin-hadoop2.7\

Download WinUtils.exe

Download the winutils.exe and put to C:\spark\spark-2.4.1-bin-hadoop2.7\bin\

Environment Variables

You the following commands to set your Spark specific ENV variables. The “-m” option means all users. You can either use that or not. If you don’t then it adds for current user only.

setx -m SPARK_HOME C:\spark\spark-2.4.1-bin-hadoop2.7
setx -m HADOOP_HOME C:\spark\spark-2.4.1-bin-hadoop2.7

You also want to add the following to the “Path” env variable “;C:\spark\spark-2.4.1-bin-hadoop2.7\bin”

Run PySpark

Open a command prompt and type the following. The –master parameter is used for setting the master node address. local[2] is to tell Spark to run locally on 2 cores.

pyspark --master local[2]

Test

You can then test that it is working by running the following code.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "hbase",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print("Number of elements in RDD -> %i" % (counts))

References

I used this as a guide. https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c