PySpark DataFrame Methods

This post shows different methods of a DataFrame.

Get the first value in a column

df = some_dataframe_definition

value = df.select("SOME_COLUMN_NAME").first()[0]

Convert Dataframe to JSON

df = some_dataframe_definition

result_json = df.toJSON()

Get a Row

df = some_dataframe_definition

row = df.collect()[0]      #You can switch out 0 for whatever row you want.

Count rows of Dataframe

df = some_dataframe_definition

num_rows = df.count()

 

 

PySpark: Delta Lake

This post is how to use pyspark to work with Delta Tables.

For more information on Delta Lake you can refer here.

First you need to install the “delta-spark” package for whatever version you require.

pip install delta-spark==3.1.0

Setup a Spark Session.

To read delta tables you can refer to PySpark: Read From ADLS to DataFrame.

To write delta tables you can refer to PySpark: Save a DataFrame To ADLS.

Vacuum Delta Table

from delta.tables import DeltaTable

vacuum_hrs = 100
path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

delta_table = DeltaTable.forPath(spark, path)
delta_table.vacuum(vacuum_hrs)

Compaction

Impoves reads by merging small files into larger ones.

from delta.tables import DeltaTable

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 

delta_table = DeltaTable.forPath(spark, path)
delta_table.optimize().executeCompaction()

Z-Order

from delta.tables import DeltaTable

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 
columns = ''

delta_table = DeltaTable.forPath(spark, path)
delta_table.optimize().executeZOrderBy(columns)

Delete

from delta.tables import DeltaTable
import pyspark.sql.functions as F

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/' 

delta_table = DeltaTable.forPath(spark, path)
delta_table.delete(F.col('<MY_COL>') == '<SOME_VAL>')

#You can also use sql
delta_table.delete("column == 'some_VALUE'")

Modify Properties

You can refer here for more properties.

dataSkippingNumIndexedCols

You would do this if you have over the max columns that the delta lake can collect statistics on. Default value is 32.

path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
skip_cols = <SOME_VALUE>

spark.sql("ALTER TABLE delta.`%s` SET TBLPROPERTIES ('delta.dataSkippingNumIndexedCols' == '%s')" % (path, skip_cols))

 

Databricks: Set Spark Configs

This post is how to set the spark configs on Databricks or Synapse Notebooks.

First you will need a spark session. Refer to PySpark: Create a Spark Session for more details.

secret = 'value' #I highly suggest you get the password from the keyvault
storage_account = ''
application_id = ''
tenant_id = ''

spark.config.set('fs.azure.account.auth.type.{}.dfs.core.windows.net'.format(storage_account), 'OAuth')

spark.config.set('fs.azure.account.oauth.provider.type.{}.dfs.core.windows.net'.format(storage_account), 'org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider')

spark.config.set('fs.azure.account.oauth2.client.id.{}.dfs.core.windows.net'.format(storage_account), application_id)

spark.config.set('fs.azure.account.oauth2.client.secret.{}.dfs.core.windows.net'.format(storage_account), secret)

spark.config.set('fs.azure.account.oauth2.client.endpoint.{}.dfs.core.windows.net'.format(storage_account), 'https://login.microsoftonline.com/{}/oauth2/token'.format(tenant_id))

If you are running in Databricks you could add them to cluster start. Although I recommand doing it in a notebook instead.

spark.hadoop.fs.azure.account.auth.type.<STORAGE_ACCOUNT>.dfs.core.windows.net OAuth
fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT>.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT>.dfs.core.windows.net <CLIENT_ID>
fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT>.dfs.core.windows.net secret
fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT>.dfs.core.windows.net https://login.microsoftonline.com/<TENANT_ID>/oauth2/token

 

 

PySpark: Read From ADLS to DataFrame

This how-to is how to read from ADLS to a DataFrame.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Read a CSV from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
format = 'csv'

#you don't need "header" if it is not CSV

dataframe = spark.read.format(format) \
  .option('header', True) \
  .schema(schema) \
  .load(path)

Read Parquet from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'parquet'

dataframe = spark.read.format(format) \
    .load(path)

Read Delta from ADLS

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'delta'

dataframe = spark.read.format(format) \
    .load(path)

 

PySpark: Save a DataFrame To ADLS

This how-to is how to save a DataFrame to ADLS

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Then we need to create a DataFrame. See PySpark: Create a DataFrame.

Then we do the following:

You should note you don’t need all the options below. I just gave an example.

path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
mode = 'overwrite'
format = 'delta'
partitions = []

df.write.mode(mode).format(format).option('mergeSchema', False).partitionBy(*partitions).save(path)

 

 

 

 

 

 

PySpark: Create a Spark Session

This post is how to create a Spark Session

Imports

from pyspark.sql import SparkSession

Create the Spark Session

spark = SparkSession.builder.appName('pyspark_app_name').getOrCreate()

You can add any configs you wish during creation. You would add this before the “.getOrCreate()”.

You can see a list here

  • .config(“spark.sql.jsonGenerator.ignoreNullFields”, “false”)
    • When reading JSON you will not ignore NULL fields
  • .config(“spark.sql.parquet.int96RebaseModeInWrite”, “CORRECTED”)
    • Fixes issues in timestamps in write operations
  • .config(“spark.sql.parquet.int96RebaseModeInRead”, “CORRECTED”)
    • Fixes issues in timestamps in read operations

 

PySpark: Create a DataFrame

This post is how to create a DataFrame in pyspark.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Next we need to import

from pyspark.sql import Row
from pyspark.sql.types import StringType, DecimalType, TimestampType, FloatType, IntegerType, LongType, StructField, StructType

Then you create the schema

schema = StructType([
    StructField('id', IntegerType()),
    .....
])

data = [Row(id=1)]

Create the DataFrame

df = spark.createDataFrame(data, schema=schema)

If you want to use a JSON file to build your schema do the following

import json
from pyspark.sql.types import StructType

data = {
    "fields": [
        {
            "metadata": {},
            "name": "column_a",
            "nullable": false,
            "type": "string"
        }
    ],
    "type": "struct"
}

json_schema = json.loads(data)
table_schema = StructType.fromJson(dict(json_schema))

df = spark.createDataFrame(data, schema=table_schema)

 

PySpark: Eclipse Integration

This tutorial will guide you through configuring PySpark on Eclipse.

First you need to install Eclipse.

You need to add “pyspark.zip” and “py4j-0.10.7-src.zip” to “Libraries” for the Python Interpreter.

Next you need to configure the Environment variables for PySpark.

Test that it works!

from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

def init_spark():
    spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
    sc = spark.sparkContext
    return spark,sc

if __name__ == '__main__':
    spark,sc = init_spark()
    nums = sc.parallelize([1,2,3,4])
    print(nums.map(lambda x: x*x).collect())

PySpark: StandAlone Installation on Windows

This tutorial will guide you through installing PySpark StandAlone on Windows for development.

Install Python 3

You need to have python 3 installed. You can get it here.

Install Spark

Go to Apache Spark and download the latest version and package “Pre-built for Apache Hadoop 2.7 and later”. Download spark-2.4.1-bin-hadoop2.7.tgz.

Install 7 Zip

You will need 7 Zip to open spark-2.4.1-bin-hadoop2.7.tgz.

Install Java

You need to ensure you have Java 8 install.

Extract Spark

Once you have installed 7 Zip you can extract spark into C:\spark\ directory. The directory structure will look like this c:\spark\spark-2.4.1-bin-hadoop2.7\

Download WinUtils.exe

Download the winutils.exe and put to C:\spark\spark-2.4.1-bin-hadoop2.7\bin\

Environment Variables

You the following commands to set your Spark specific ENV variables. The “-m” option means all users. You can either use that or not. If you don’t then it adds for current user only.

setx -m SPARK_HOME C:\spark\spark-2.4.1-bin-hadoop2.7
setx -m HADOOP_HOME C:\spark\spark-2.4.1-bin-hadoop2.7

You also want to add the following to the “Path” env variable “;C:\spark\spark-2.4.1-bin-hadoop2.7\bin”

Run PySpark

Open a command prompt and type the following. The –master parameter is used for setting the master node address. local[2] is to tell Spark to run locally on 2 cores.

pyspark --master local[2]

Test

You can then test that it is working by running the following code.

words = sc.parallelize (
   ["scala", 
   "java", 
   "hadoop", 
   "spark", 
   "hbase",
   "spark vs hadoop", 
   "pyspark",
   "pyspark and spark"]
)
counts = words.count()
print("Number of elements in RDD -> %i" % (counts))

References

I used this as a guide. https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c