PySpark DataFrame Methods

This post shows different methods of a DataFrame.

Get the first value in a column

  1. df = some_dataframe_definition
  2.  
  3. value = df.select("SOME_COLUMN_NAME").first()[0]

Convert Dataframe to JSON

  1. df = some_dataframe_definition
  2.  
  3. result_json = df.toJSON()

Get a Row

  1. df = some_dataframe_definition
  2.  
  3. row = df.collect()[0] #You can switch out 0 for whatever row you want.

Count rows of Dataframe

  1. df = some_dataframe_definition
  2.  
  3. num_rows = df.count()

 

 

PySpark: Delta Lake

This post is how to use pyspark to work with Delta Tables.

For more information on Delta Lake you can refer here.

First you need to install the “delta-spark” package for whatever version you require.

  1. pip install delta-spark==3.1.0

Setup a Spark Session.

To read delta tables you can refer to PySpark: Read From ADLS to DataFrame.

To write delta tables you can refer to PySpark: Save a DataFrame To ADLS.

Vacuum Delta Table

  1. from delta.tables import DeltaTable
  2.  
  3. vacuum_hrs = 100
  4. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  5.  
  6. delta_table = DeltaTable.forPath(spark, path)
  7. delta_table.vacuum(vacuum_hrs)

Compaction

Impoves reads by merging small files into larger ones.

  1. from delta.tables import DeltaTable
  2.  
  3. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  4.  
  5. delta_table = DeltaTable.forPath(spark, path)
  6. delta_table.optimize().executeCompaction()

Z-Order

  1. from delta.tables import DeltaTable
  2.  
  3. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  4. columns = ''
  5.  
  6. delta_table = DeltaTable.forPath(spark, path)
  7. delta_table.optimize().executeZOrderBy(columns)

Delete

  1. from delta.tables import DeltaTable
  2. import pyspark.sql.functions as F
  3.  
  4. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  5.  
  6. delta_table = DeltaTable.forPath(spark, path)
  7. delta_table.delete(F.col('<MY_COL>') == '<SOME_VAL>')
  8.  
  9. #You can also use sql
  10. delta_table.delete("column == 'some_VALUE'")

Modify Properties

You can refer here for more properties.

dataSkippingNumIndexedCols

You would do this if you have over the max columns that the delta lake can collect statistics on. Default value is 32.

  1. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  2. skip_cols = <SOME_VALUE>
  3.  
  4. spark.sql("ALTER TABLE delta.`%s` SET TBLPROPERTIES ('delta.dataSkippingNumIndexedCols' == '%s')" % (path, skip_cols))

 

Spark Connector Connect to SQL Server

This post is how to use the Spark Connector to Connect to SQL Server.

Install Spark Connector

  1. spark-mssql-connector_2.12-1.2.0.jar

Install msal

  1. pip install msal

Connect using Azure SPN

  1. import msal
  2. global_token_cache = msal.TokenCache()
  3.  
  4. secret = "<GET SECRET SECURELY>"
  5.  
  6. global_spn_app = msal.ConfidentialClientApplication(
  7. <CLIENT_ID>, Authority='https://login.microsoftonline.com/<TENANT_ID>',
  8. client_credential=secret,
  9. token_cache=global_token_cache,
  10. )
  11.  
  12. result = global_spn_app.acquire_token_for_client(scopes=['https://database.windows.net//.default'])
  13.  
  14. jdbc_df = spark.read \
  15. .format("com.microsoft.sqlserver.jdbc.spark") \
  16. .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
  17. .option("query", "SELECT * FROM SOMETHING") \
  18. .option("accessToken", result['access_token']) \
  19. .option("encrypt", "true") \
  20. .option("hostNameInCertificate", "*.database.windows.net") \
  21. .load()

Connect using Domain Auth

  1. secret = "<GET SECRET SECURELY>"
  2.  
  3. jdbc_df = spark.read \
  4. .format("com.microsoft.sqlserver.jdbc.spark") \
  5. .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
  6. .option("query", "SELECT * FROM SOMETHING") \
  7. .option("authentication", "ActiveDirectoryPassword") \
  8. .option("user", "<USER>@<DOMAIN>") \
  9. .option("password", "<SECRET>") \
  10. .load()

Connect using SQL Auth

I do not recommend SQL Auth

  1. secret = "<GET SECRET SECURELY>"
  2.  
  3. jdbc_df = spark.read \
  4. .format("com.microsoft.sqlserver.jdbc.spark") \
  5. .option("url", 'jdbc:sqlserver://<SERVER_NAME>:<PORT>;database=<DATABASE>;') \
  6. .option("query", "SELECT * FROM SOMETHING") \
  7. .option("user", "<USER>") \
  8. .option("password", "<SECRET>") \
  9. .load()

 

 

 

Databricks: Set Spark Configs

This post is how to set the spark configs on Databricks or Synapse Notebooks.

First you will need a spark session. Refer to PySpark: Create a Spark Session for more details.

  1. secret = 'value' #I highly suggest you get the password from the keyvault
  2. storage_account = ''
  3. application_id = ''
  4. tenant_id = ''
  5.  
  6. spark.config.set('fs.azure.account.auth.type.{}.dfs.core.windows.net'.format(storage_account), 'OAuth')
  7.  
  8. spark.config.set('fs.azure.account.oauth.provider.type.{}.dfs.core.windows.net'.format(storage_account), 'org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider')
  9.  
  10. spark.config.set('fs.azure.account.oauth2.client.id.{}.dfs.core.windows.net'.format(storage_account), application_id)
  11.  
  12. spark.config.set('fs.azure.account.oauth2.client.secret.{}.dfs.core.windows.net'.format(storage_account), secret)
  13.  
  14. spark.config.set('fs.azure.account.oauth2.client.endpoint.{}.dfs.core.windows.net'.format(storage_account), 'https://login.microsoftonline.com/{}/oauth2/token'.format(tenant_id))
  15.  

If you are running in Databricks you could add them to cluster start. Although I recommand doing it in a notebook instead.

  1. spark.hadoop.fs.azure.account.auth.type.<STORAGE_ACCOUNT>.dfs.core.windows.net OAuth
  2. fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT>.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
  3. fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT>.dfs.core.windows.net <CLIENT_ID>
  4. fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT>.dfs.core.windows.net secret
  5. fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT>.dfs.core.windows.net https://login.microsoftonline.com/<TENANT_ID>/oauth2/token

 

 

PySpark: Read From ADLS to DataFrame

This how-to is how to read from ADLS to a DataFrame.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Read a CSV from ADLS

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
  2. format = 'csv'
  3.  
  4. #you don't need "header" if it is not CSV
  5.  
  6. dataframe = spark.read.format(format) \
  7. .option('header', True) \
  8. .schema(schema) \
  9. .load(path)

Read Parquet from ADLS

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'parquet'
  2.  
  3. dataframe = spark.read.format(format) \
  4. .load(path)
  5.  

Read Delta from ADLS

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'delta'
  2.  
  3. dataframe = spark.read.format(format) \
  4. .load(path)

 

PySpark: Save a DataFrame To ADLS

This how-to is how to save a DataFrame to ADLS

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Then we need to create a DataFrame. See PySpark: Create a DataFrame.

Then we do the following:

You should note you don’t need all the options below. I just gave an example.

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
  2. mode = 'overwrite'
  3. format = 'delta'
  4. partitions = []
  5.  
  6. df.write.mode(mode).format(format).option('mergeSchema', False).partitionBy(*partitions).save(path)

 

 

 

 

 

 

PySpark: Create a Spark Session

This post is how to create a Spark Session

Imports

  1. from pyspark.sql import SparkSession

Create the Spark Session

  1. spark = SparkSession.builder.appName('pyspark_app_name').getOrCreate()

You can add any configs you wish during creation. You would add this before the “.getOrCreate()”.

You can see a list here

  • .config(“spark.sql.jsonGenerator.ignoreNullFields”, “false”)
    • When reading JSON you will not ignore NULL fields
  • .config(“spark.sql.parquet.int96RebaseModeInWrite”, “CORRECTED”)
    • Fixes issues in timestamps in write operations
  • .config(“spark.sql.parquet.int96RebaseModeInRead”, “CORRECTED”)
    • Fixes issues in timestamps in read operations

 

PySpark: Create a DataFrame

This post is how to create a DataFrame in pyspark.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Next we need to import

  1. from pyspark.sql import Row
  2. from pyspark.sql.types import StringType, DecimalType, TimestampType, FloatType, IntegerType, LongType, StructField, StructType

Then you create the schema

  1. schema = StructType([
  2. StructField('id', IntegerType()),
  3. .....
  4. ])
  5.  
  6. data = [Row(id=1)]

Create the DataFrame

  1. df = spark.createDataFrame(data, schema=schema)

If you want to use a JSON file to build your schema do the following

  1. import json
  2. from pyspark.sql.types import StructType
  3.  
  4. data = {
  5. "fields": [
  6. {
  7. "metadata": {},
  8. "name": "column_a",
  9. "nullable": false,
  10. "type": "string"
  11. }
  12. ],
  13. "type": "struct"
  14. }
  15.  
  16. json_schema = json.loads(data)
  17. table_schema = StructType.fromJson(dict(json_schema))
  18.  
  19. df = spark.createDataFrame(data, schema=table_schema)
  20.  

 

Spark Installation on Hadoop

In this tutorial I will show you how to use Kerberos/SSL with Spark integrated with Yarn. I will use self signed certs for this example. Before you begin ensure you have installed Kerberos Server and Hadoop.

This assumes your hostname is “hadoop”

Create Kerberos Principals

  1. cd /etc/security/keytabs/
  2.  
  3. sudo kadmin.local
  4.  
  5. #You can list princepals
  6. listprincs
  7.  
  8. #Create the following principals
  9. addprinc -randkey spark/hadoop@REALM.CA
  10.  
  11. #Create the keytab files.
  12. #You will need these for Hadoop to be able to login
  13. xst -k spark.service.keytab spark/hadoop@REALM.CA

Set Keytab Permissions/Ownership

  1. sudo chown root:hadoopuser /etc/security/keytabs/*
  2. sudo chmod 750 /etc/security/keytabs/*

Download

Go to Apache Spark Download and get the link for Spark.

  1. wget http://apache.forsale.plus/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
  2. tar -xvf spark-2.4.4-bin-hadoop2.7.tgz
  3. mv spark-2.4.4-bin-hadoop2.7 /usr/local/spark/

Update .bashrc

  1. sudo nano ~/.bashrc
  2.  
  3. #Ensure we have the following in the Hadoop section
  4. export HADOOP_CONF_DIR=/usr/local/hadoop/etc/hadoop
  5.  
  6. #Add the following
  7.  
  8. #SPARK VARIABLES START
  9. export SPARK_HOME=/usr/local/spark
  10. export PATH=$PATH:$SPARK_HOME/bin
  11. export LD_LIBRARY_PATH=$HADOOP_HOME/lib/native:$LD_LIBRARY_PATH
  12. #SPARK VARIABLES STOP
  13.  
  14. source ~/.bashrc

Setup Configuration

  1. cd /usr/local/spark/conf
  2. mv spark-defaults.conf.template spark-defaults.conf
  3. nano spark-defaults.conf
  4.  
  5. #Add to the end
  6. spark.master yarn
  7. spark.yarn.historyServer.address ${hadoopconf-yarn.resourcemanager.hostname}:18080
  8. spark.yarn.keytab /etc/security/keytabs/spark.service.keytab
  9. spark.yarn.principal spark/hadoop@REALM.CA
  10. spark.yarn.access.hadoopFileSystems hdfs://NAMENODE:54310
  11. spark.authenticate true
  12. spark.driver.bindAddress 0.0.0.0
  13. spark.authenticate.enableSaslEncryption true
  14. spark.eventLog.enabled true
  15. spark.eventLog.dir hdfs://NAMENODE:54310/user/spark/applicationHistory
  16. spark.history.fs.logDirectory hdfs://NAMENODE:54310/user/spark/applicationHistory
  17. spark.history.fs.update.interval 10s
  18. spark.history.ui.port 18080
  19.  
  20. #SSL
  21. spark.ssl.enabled true
  22. spark.ssl.keyPassword PASSWORD
  23. spark.ssl.keyStore /etc/security/serverKeys/keystore.jks
  24. spark.ssl.keyStorePassword PASSWORD
  25. spark.ssl.keyStoreType JKS
  26. spark.ssl.trustStore /etc/security/serverKeys/truststore.jks
  27. spark.ssl.trustStorePassword PASSWORD
  28. spark.ssl.trustStoreType JKS

Kinit

  1. kinit -kt /etc/security/keytabs/spark.service.keytab spark/hadoop@REALM.CA
  2. klist
  3. hdfs dfs -mkdir /user/spark/
  4. hdfs dfs -mkdir /user/spark/applicationHistory
  5. hdfs dfs -ls /user/spark

Start The Service

  1. $SPARK_HOME/sbin/start-history-server.sh

Stop The Service

  1. $SPARK_HOME/sbin/stop-history-server.sh

Spark History Server Web UI

References

I used a lot of different resources and reference material on this. Below are just a few I used.

https://spark.apache.org/docs/latest/running-on-yarn.html#configuration

https://spark.apache.org/docs/latest/security.html

https://www.linode.com/docs/databases/hadoop/install-configure-run-spark-on-top-of-hadoop-yarn-cluster/

 

 

 

 

PySpark: Eclipse Integration

This tutorial will guide you through configuring PySpark on Eclipse.

First you need to install Eclipse.

You need to add “pyspark.zip” and “py4j-0.10.7-src.zip” to “Libraries” for the Python Interpreter.

Next you need to configure the Environment variables for PySpark.

Test that it works!

  1. from pyspark import SparkConf, SparkContext
  2. from pyspark.sql import SparkSession
  3.  
  4. def init_spark():
  5. spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
  6. sc = spark.sparkContext
  7. return spark,sc
  8.  
  9. if __name__ == '__main__':
  10. spark,sc = init_spark()
  11. nums = sc.parallelize([1,2,3,4])
  12. print(nums.map(lambda x: x*x).collect())

PySpark: StandAlone Installation on Windows

This tutorial will guide you through installing PySpark StandAlone on Windows for development.

Install Python 3

You need to have python 3 installed. You can get it here.

Install Spark

Go to Apache Spark and download the latest version and package “Pre-built for Apache Hadoop 2.7 and later”. Download spark-2.4.1-bin-hadoop2.7.tgz.

Install 7 Zip

You will need 7 Zip to open spark-2.4.1-bin-hadoop2.7.tgz.

Install Java

You need to ensure you have Java 8 install.

Extract Spark

Once you have installed 7 Zip you can extract spark into C:\spark\ directory. The directory structure will look like this c:\spark\spark-2.4.1-bin-hadoop2.7\

Download WinUtils.exe

Download the winutils.exe and put to C:\spark\spark-2.4.1-bin-hadoop2.7\bin\

Environment Variables

You the following commands to set your Spark specific ENV variables. The “-m” option means all users. You can either use that or not. If you don’t then it adds for current user only.

  1. setx -m SPARK_HOME C:\spark\spark-2.4.1-bin-hadoop2.7
  2. setx -m HADOOP_HOME C:\spark\spark-2.4.1-bin-hadoop2.7

You also want to add the following to the “Path” env variable “;C:\spark\spark-2.4.1-bin-hadoop2.7\bin”

Run PySpark

Open a command prompt and type the following. The –master parameter is used for setting the master node address. local[2] is to tell Spark to run locally on 2 cores.

  1. pyspark --master local[2]

Test

You can then test that it is working by running the following code.

  1. words = sc.parallelize (
  2. ["scala",
  3. "java",
  4. "hadoop",
  5. "spark",
  6. "hbase",
  7. "spark vs hadoop",
  8. "pyspark",
  9. "pyspark and spark"]
  10. )
  11. counts = words.count()
  12. print("Number of elements in RDD -> %i" % (counts))

References

I used this as a guide. https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c