PySpark DataFrame Methods

This post shows different methods of a DataFrame.

Get the first value in a column

  1. df = some_dataframe_definition
  2.  
  3. value = df.select("SOME_COLUMN_NAME").first()[0]

Convert Dataframe to JSON

  1. df = some_dataframe_definition
  2.  
  3. result_json = df.toJSON()

Get a Row

  1. df = some_dataframe_definition
  2.  
  3. row = df.collect()[0] #You can switch out 0 for whatever row you want.

Count rows of Dataframe

  1. df = some_dataframe_definition
  2.  
  3. num_rows = df.count()

 

 

PySpark: Read From ADLS to DataFrame

This how-to is how to read from ADLS to a DataFrame.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Read a CSV from ADLS

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
  2. format = 'csv'
  3.  
  4. #you don't need "header" if it is not CSV
  5.  
  6. dataframe = spark.read.format(format) \
  7. .option('header', True) \
  8. .schema(schema) \
  9. .load(path)

Read Parquet from ADLS

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'parquet'
  2.  
  3. dataframe = spark.read.format(format) \
  4. .load(path)
  5.  

Read Delta from ADLS

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/' format = 'delta'
  2.  
  3. dataframe = spark.read.format(format) \
  4. .load(path)

 

PySpark: Save a DataFrame To ADLS

This how-to is how to save a DataFrame to ADLS

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Then we need to create a DataFrame. See PySpark: Create a DataFrame.

Then we do the following:

You should note you don’t need all the options below. I just gave an example.

  1. path = 'abfss://my_container@my_storage_account.dfs.core.windows.net/my_folder/'
  2. mode = 'overwrite'
  3. format = 'delta'
  4. partitions = []
  5.  
  6. df.write.mode(mode).format(format).option('mergeSchema', False).partitionBy(*partitions).save(path)

 

 

 

 

 

 

PySpark: Create a DataFrame

This post is how to create a DataFrame in pyspark.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Next we need to import

  1. from pyspark.sql import Row
  2. from pyspark.sql.types import StringType, DecimalType, TimestampType, FloatType, IntegerType, LongType, StructField, StructType

Then you create the schema

  1. schema = StructType([
  2. StructField('id', IntegerType()),
  3. .....
  4. ])
  5.  
  6. data = [Row(id=1)]

Create the DataFrame

  1. df = spark.createDataFrame(data, schema=schema)

If you want to use a JSON file to build your schema do the following

  1. import json
  2. from pyspark.sql.types import StructType
  3.  
  4. data = {
  5. "fields": [
  6. {
  7. "metadata": {},
  8. "name": "column_a",
  9. "nullable": false,
  10. "type": "string"
  11. }
  12. ],
  13. "type": "struct"
  14. }
  15.  
  16. json_schema = json.loads(data)
  17. table_schema = StructType.fromJson(dict(json_schema))
  18.  
  19. df = spark.createDataFrame(data, schema=table_schema)
  20.