PySpark DataFrame Methods

This post shows different methods of a DataFrame.

Get the first value in a column

  1. df = some_dataframe_definition
  2.  
  3. value = df.select("SOME_COLUMN_NAME").first()[0]

Convert Dataframe to JSON

  1. df = some_dataframe_definition
  2.  
  3. result_json = df.toJSON()

Get a Row

  1. df = some_dataframe_definition
  2.  
  3. row = df.collect()[0] #You can switch out 0 for whatever row you want.

Count rows of Dataframe

  1. df = some_dataframe_definition
  2.  
  3. num_rows = df.count()

 

 

PySpark: Delta Lake

This post is how to use pyspark to work with Delta Tables.

For more information on Delta Lake you can refer here.

First you need to install the “delta-spark” package for whatever version you require.

  1. pip install delta-spark==3.1.0

Setup a Spark Session.

To read delta tables you can refer to PySpark: Read From ADLS to DataFrame.

To write delta tables you can refer to PySpark: Save a DataFrame To ADLS.

Vacuum Delta Table

  1. from delta.tables import DeltaTable
  2.  
  3. vacuum_hrs = 100
  4. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  5.  
  6. delta_table = DeltaTable.forPath(spark, path)
  7. delta_table.vacuum(vacuum_hrs)

Compaction

Impoves reads by merging small files into larger ones.

  1. from delta.tables import DeltaTable
  2.  
  3. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  4.  
  5. delta_table = DeltaTable.forPath(spark, path)
  6. delta_table.optimize().executeCompaction()

Z-Order

  1. from delta.tables import DeltaTable
  2.  
  3. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  4. columns = ''
  5.  
  6. delta_table = DeltaTable.forPath(spark, path)
  7. delta_table.optimize().executeZOrderBy(columns)

Delete

  1. from delta.tables import DeltaTable
  2. import pyspark.sql.functions as F
  3.  
  4. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  5.  
  6. delta_table = DeltaTable.forPath(spark, path)
  7. delta_table.delete(F.col('<MY_COL>') == '<SOME_VAL>')
  8.  
  9. #You can also use sql
  10. delta_table.delete("column == 'some_VALUE'")

Modify Properties

You can refer here for more properties.

dataSkippingNumIndexedCols

You would do this if you have over the max columns that the delta lake can collect statistics on. Default value is 32.

  1. path = 'abfss://<CONTAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'
  2. skip_cols = <SOME_VALUE>
  3.  
  4. spark.sql("ALTER TABLE delta.`%s` SET TBLPROPERTIES ('delta.dataSkippingNumIndexedCols' == '%s')" % (path, skip_cols))

 

Databricks: Set Spark Configs

This post is how to set the spark configs on Databricks or Synapse Notebooks.

First you will need a spark session. Refer to PySpark: Create a Spark Session for more details.

  1. secret = 'value' #I highly suggest you get the password from the keyvault
  2. storage_account = ''
  3. application_id = ''
  4. tenant_id = ''
  5.  
  6. spark.config.set('fs.azure.account.auth.type.{}.dfs.core.windows.net'.format(storage_account), 'OAuth')
  7.  
  8. spark.config.set('fs.azure.account.oauth.provider.type.{}.dfs.core.windows.net'.format(storage_account), 'org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider')
  9.  
  10. spark.config.set('fs.azure.account.oauth2.client.id.{}.dfs.core.windows.net'.format(storage_account), application_id)
  11.  
  12. spark.config.set('fs.azure.account.oauth2.client.secret.{}.dfs.core.windows.net'.format(storage_account), secret)
  13.  
  14. spark.config.set('fs.azure.account.oauth2.client.endpoint.{}.dfs.core.windows.net'.format(storage_account), 'https://login.microsoftonline.com/{}/oauth2/token'.format(tenant_id))
  15.  

If you are running in Databricks you could add them to cluster start. Although I recommand doing it in a notebook instead.

  1. spark.hadoop.fs.azure.account.auth.type.<STORAGE_ACCOUNT>.dfs.core.windows.net OAuth
  2. fs.azure.account.oauth.provider.type.<STORAGE_ACCOUNT>.dfs.core.windows.net org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider
  3. fs.azure.account.oauth2.client.id.<STORAGE_ACCOUNT>.dfs.core.windows.net <CLIENT_ID>
  4. fs.azure.account.oauth2.client.secret.<STORAGE_ACCOUNT>.dfs.core.windows.net secret
  5. fs.azure.account.oauth2.client.endpoint.<STORAGE_ACCOUNT>.dfs.core.windows.net https://login.microsoftonline.com/<TENANT_ID>/oauth2/token

 

 

PySpark: Read From ADLS to DataFrame

This how-to is how to read from ADLS to a DataFrame.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Read a CSV from ADLS

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
  2. format = 'csv'
  3.  
  4. #you don't need "header" if it is not CSV
  5.  
  6. dataframe = spark.read.format(format) \
  7. .option('header', True) \
  8. .schema(schema) \
  9. .load(path)

Read Parquet from ADLS

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'parquet'
  2.  
  3. dataframe = spark.read.format(format) \
  4. .load(path)
  5.  

Read Delta from ADLS

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'delta'
  2.  
  3. dataframe = spark.read.format(format) \
  4. .load(path)

 

PySpark: Save a DataFrame To ADLS

This how-to is how to save a DataFrame to ADLS

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Then we need to create a DataFrame. See PySpark: Create a DataFrame.

Then we do the following:

You should note you don’t need all the options below. I just gave an example.

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
  2. mode = 'overwrite'
  3. format = 'delta'
  4. partitions = []
  5.  
  6. df.write.mode(mode).format(format).option('mergeSchema', False).partitionBy(*partitions).save(path)

 

 

 

 

 

 

PySpark: Create a Spark Session

This post is how to create a Spark Session

Imports

  1. from pyspark.sql import SparkSession

Create the Spark Session

  1. spark = SparkSession.builder.appName('pyspark_app_name').getOrCreate()

You can add any configs you wish during creation. You would add this before the “.getOrCreate()”.

You can see a list here

  • .config(“spark.sql.jsonGenerator.ignoreNullFields”, “false”)
    • When reading JSON you will not ignore NULL fields
  • .config(“spark.sql.parquet.int96RebaseModeInWrite”, “CORRECTED”)
    • Fixes issues in timestamps in write operations
  • .config(“spark.sql.parquet.int96RebaseModeInRead”, “CORRECTED”)
    • Fixes issues in timestamps in read operations

 

PySpark: Create a DataFrame

This post is how to create a DataFrame in pyspark.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Next we need to import

  1. from pyspark.sql import Row
  2. from pyspark.sql.types import StringType, DecimalType, TimestampType, FloatType, IntegerType, LongType, StructField, StructType

Then you create the schema

  1. schema = StructType([
  2. StructField('id', IntegerType()),
  3. .....
  4. ])
  5.  
  6. data = [Row(id=1)]

Create the DataFrame

  1. df = spark.createDataFrame(data, schema=schema)

If you want to use a JSON file to build your schema do the following

  1. import json
  2. from pyspark.sql.types import StructType
  3.  
  4. data = {
  5. "fields": [
  6. {
  7. "metadata": {},
  8. "name": "column_a",
  9. "nullable": false,
  10. "type": "string"
  11. }
  12. ],
  13. "type": "struct"
  14. }
  15.  
  16. json_schema = json.loads(data)
  17. table_schema = StructType.fromJson(dict(json_schema))
  18.  
  19. df = spark.createDataFrame(data, schema=table_schema)
  20.  

 

PySpark: Eclipse Integration

This tutorial will guide you through configuring PySpark on Eclipse.

First you need to install Eclipse.

You need to add “pyspark.zip” and “py4j-0.10.7-src.zip” to “Libraries” for the Python Interpreter.

Next you need to configure the Environment variables for PySpark.

Test that it works!

  1. from pyspark import SparkConf, SparkContext
  2. from pyspark.sql import SparkSession
  3.  
  4. def init_spark():
  5. spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
  6. sc = spark.sparkContext
  7. return spark,sc
  8.  
  9. if __name__ == '__main__':
  10. spark,sc = init_spark()
  11. nums = sc.parallelize([1,2,3,4])
  12. print(nums.map(lambda x: x*x).collect())

PySpark: StandAlone Installation on Windows

This tutorial will guide you through installing PySpark StandAlone on Windows for development.

Install Python 3

You need to have python 3 installed. You can get it here.

Install Spark

Go to Apache Spark and download the latest version and package “Pre-built for Apache Hadoop 2.7 and later”. Download spark-2.4.1-bin-hadoop2.7.tgz.

Install 7 Zip

You will need 7 Zip to open spark-2.4.1-bin-hadoop2.7.tgz.

Install Java

You need to ensure you have Java 8 install.

Extract Spark

Once you have installed 7 Zip you can extract spark into C:\spark\ directory. The directory structure will look like this c:\spark\spark-2.4.1-bin-hadoop2.7\

Download WinUtils.exe

Download the winutils.exe and put to C:\spark\spark-2.4.1-bin-hadoop2.7\bin\

Environment Variables

You the following commands to set your Spark specific ENV variables. The “-m” option means all users. You can either use that or not. If you don’t then it adds for current user only.

  1. setx -m SPARK_HOME C:\spark\spark-2.4.1-bin-hadoop2.7
  2. setx -m HADOOP_HOME C:\spark\spark-2.4.1-bin-hadoop2.7

You also want to add the following to the “Path” env variable “;C:\spark\spark-2.4.1-bin-hadoop2.7\bin”

Run PySpark

Open a command prompt and type the following. The –master parameter is used for setting the master node address. local[2] is to tell Spark to run locally on 2 cores.

  1. pyspark --master local[2]

Test

You can then test that it is working by running the following code.

  1. words = sc.parallelize (
  2. ["scala",
  3. "java",
  4. "hadoop",
  5. "spark",
  6. "hbase",
  7. "spark vs hadoop",
  8. "pyspark",
  9. "pyspark and spark"]
  10. )
  11. counts = words.count()
  12. print("Number of elements in RDD -> %i" % (counts))

References

I used this as a guide. https://medium.com/@GalarnykMichael/install-spark-on-windows-pyspark-4498a5d8d66c