Databricks Unity Catalog SQL Commands

This post is basic commands you will need to know for working with Unity Catalog.

Display Current Metastore
  1. SELECT CURRENT_METASTORE();
Display Current Catalog
  1. SELECT CURRENT_CATALOG();
Create Catalog
  1. CREATE CATALOG IF NOT EXISTS <Catalog_Name> COMMENT 'A COMMENT';
Create Catalog With Location
  1. CREATE CATALOG IF NOT EXISTS <Catalog_Name> MANAGED LOCATION 'abfss://<METASTORE_CONTAINER_NAME>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<CATALOG_NAME>' COMMENT 'A COMMENT';
Describe Catalog
  1. DESCRIBE CATALOG <Catalog_Name>;
Create Schema
  1. CREATE SCHEMA IF NOT EXISTS <SCHEMA_NAME> COMMENT '<COMMENT>';
Create Schema With Location
  1. CREATE SCHEMA IF NOT EXISTS <SCHEMA_NAME> MANAGED LOCATION 'abfss://<METASTORE_CONTAINER_NAME>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<CATALOG_NAME>/<SCHEMA_NAME>' COMMENT '<COMMENT>';
Show All Storage Credentials
  1. SHOW STORAGE CREDENTIALS;
Describe Credential
  1. DESCRIBE STORAGE CREDENTIAL <CREDENTIAL_NAME>;
Create External Location

You will first need a storage credential.

You can reference down to the full table path or keep it at the container

  1. CREATE EXTERNAL LOCATION IF NOT EXISTS <NAME>
  2. URL 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/'
  3. WITH (STORAGE CREDENTIAL <CREDENTIAL_NAME>)
  4. COMMENT '<COMMENT>';
Create External Table
  1. CREATE TABLE <CATALOG_NAME>.<SCHEMA_NAME>.<TABLE_NAME>
  2. USING <FORMAT>
  3. LOCATION 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/FOLDER/PATH;'
Grant Create Storage Credential on Metastore
  1. GRANT CREATE STORAGE CREDENTIAL ON METASTORE TO `<USER>`;
Grant Permission to Create External Locations on Storage Credential
  1. GRANT CREATE EXTERNAL LOCATION ON STORAGE CREDENTIAL <CREDENTIAL_NAME> TO `<USER>`;
Grant Permission to Create External Location On Metastored
  1. GRANT CREATE EXTERNAL LOCATION ON METASTORE TO `<USER>`;
Grant Permission to Use Catalog
  1. GRANT USE_CATALOG ON CATALOG <CATALOG_NAME> TO `<USER>`;
Show all Grants On Metastore
  1. SHOW GRANTS `<USER>` ON METASTORE;
Grant Permission to Use Schema
  1. GRANT USE_SCHEMA ON SCHEMA <CATALOG_NAME>.<SCHEMA_NAME> TO `<USER>`;
Grant Permission to Create Table
  1. GRANT CREATE TABLE ON SCHEMA <CATALOG_NAME>.<SCHEMA_NAME> TO <USER>;

 

Databricks: Notebook SQL

This post is how to work with Databricks SQL through a Notebook.

Create a Temp View of a DataFrame.

  1. df = <SOMETHING>
  2. df.createOrReplaceTempView("<TABLE_NAME>")

Drop a Table

  1. %sql
  2. drop table <SCHEMA>.<TABLE>;

Describe Table

  1. %sql
  2. desc table extended <SCHEMA>.<TABLE>;

Describe Detail

  1. %sql
  2. describe detail <SCHEMA>.<TABLE>;

Show Table Properties

  1. %sql
  2. SHOW TBLPROPERTIES <SCHEMA>.<TABLE>;

Describe History

  1. %sql
  2. describe history <SCHEMA>.<TABLE>;

Create Schema

  1. %sql
  2. CREATE SCHEMA IF NOT EXISTS <SCHEMA>;

Create Parquet Table

  1. %sql
  2. CREATE TABLE <SCHEMA>.<TABLE> USING PARQUET LOCATION 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

Create Delta Table

  1. %sql
  2. CREATE TABLE <SCHEMA>.<TABLE> USING DELTA LOCATION 'abfss://<COTNAINER>@<STORAGE_ACCOUNT>.dfs.core.windows.net/<FOLDER>/'

Upsert

  1. MERGE INTO schema.table t \
  2. USING ( \
  3. SELECT columns \
  4. FROM table \
  5. ) AS source ON (source.column = t.column) \
  6. WHEN NOT MATCHED THEN \
  7. INSERT ( \
  8. ( \
  9. column, column2 \
  10. ) \
  11. VALUES ( \
  12. source.column, source.column2 \
  13. ) \
  14. WHEN MATCHED THEN \
  15. UPDATE SET \
  16. t.column = source.column \

 

PySpark: Create a DataFrame

This post is how to create a DataFrame in pyspark.

First we need a spark Session. See PySpark: Create a Spark Session for my details on that.

Next we need to import

  1. from pyspark.sql import Row
  2. from pyspark.sql.types import StringType, DecimalType, TimestampType, FloatType, IntegerType, LongType, StructField, StructType

Then you create the schema

  1. schema = StructType([
  2. StructField('id', IntegerType()),
  3. .....
  4. ])
  5.  
  6. data = [Row(id=1)]

Create the DataFrame

  1. df = spark.createDataFrame(data, schema=schema)

If you want to use a JSON file to build your schema do the following

  1. import json
  2. from pyspark.sql.types import StructType
  3.  
  4. data = {
  5. "fields": [
  6. {
  7. "metadata": {},
  8. "name": "column_a",
  9. "nullable": false,
  10. "type": "string"
  11. }
  12. ],
  13. "type": "struct"
  14. }
  15.  
  16. json_schema = json.loads(data)
  17. table_schema = StructType.fromJson(dict(json_schema))
  18.  
  19. df = spark.createDataFrame(data, schema=table_schema)
  20.  

 

Avro & Java: Record Parsing

This tutorial will guide you through how to convert json to avro and then back to json. I suggest you first read through the documentation on Avro to familiarize yourself with it. This tutorial assumes you have a maven project already setup and a resources folder.

POM:

Add Avro Dependency

 

 

 

 

Add Jackson Dependency

Avro Schema File:

Next you need to create the avro schema file in your resources folder. Name the file “schema.avsc”. The extension avsc is the Avro schema extension.

  1. {
  2. "namespace": "test.avro",
  3. "type": "record",
  4. "name": "MY_NAME",
  5. "fields": [
  6. {"name": "name_1", "type": "int"},
  7. {"name": "name_2", "type": {"type": "array", "items": "float"}},
  8. {"name": "name_3", "type": "float"}
  9. ]
  10. }

Json Record to Validate:

Next you need to create a json file that conforms to your schema you just made. Name the file “record.json” and put it in your resources folder. The contents can be whatever you want as long as it conforms to your schema above.

  1. { "name_1": 234, "name_2": [23.34,654.98], "name_3": 234.7}

It’s Avro Time:

Imports:

  1. import java.io.ByteArrayOutputStream;
  2. import java.io.DataInputStream;
  3. import java.io.File;
  4. import java.io.IOException;
  5. import java.io.InputStream;
  6.  
  7. import org.apache.avro.Schema;
  8. import org.apache.avro.generic.GenericData;
  9. import org.apache.avro.generic.GenericDatumReader;
  10. import org.apache.avro.generic.GenericDatumWriter;
  11. import org.apache.avro.io.DatumReader;
  12. import org.apache.avro.io.Decoder;
  13. import org.apache.avro.io.DecoderFactory;
  14. import org.apache.avro.io.Encoder;
  15. import org.apache.avro.io.EncoderFactory;
  16.  
  17. import com.fasterxml.jackson.databind.JsonNode;
  18. import com.fasterxml.jackson.databind.ObjectMapper;

Conversion to Avro and Back:

  1. private void run() throws IOException {
  2. //Get the schema and json record from resources
  3. final ClassLoader loader = getClass().getClassLoader();
  4. final File schemaFile = new File(loader.getResource("schema.avsc").getFile());
  5. final InputStream record = loader.getResourceAsStream("record.json");
  6. //Create avro schema
  7. final Schema schema = new Schema.Parser().parse(schemaFile);
  8.  
  9. //Encode to avro
  10. final byte[] avro = encodeToAvro(schema, record);
  11.  
  12. //Decode back to json
  13. final JsonNode node = decodeToJson(schema, avro);
  14.  
  15. System.out.println(node);
  16. System.out.println("done");
  17. }
  18.  
  19. /**
  20. * Encode json to avro
  21. *
  22. * @param schema the schema the avro pertains to
  23. * @param record the data to convert to avro
  24. * @return the avro bytes
  25. * @throws IOException if decoding fails
  26. */
  27. private byte[] encodeToAvro(Schema schema, InputStream record) throws IOException {
  28. final DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
  29. final DataInputStream din = new DataInputStream(record);
  30. final Decoder decoder = new DecoderFactory().jsonDecoder(schema, din);
  31. final Object datum = reader.read(null, decoder);
  32. final GenericDatumWriter<Object> writer = new GenericDatumWriter<>(schema);
  33. final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
  34. final Encoder encoder = new EncoderFactory().binaryEncoder(outputStream, null);
  35. writer.write(datum, encoder);
  36. encoder.flush();
  37.  
  38. return outputStream.toByteArray();
  39. }
  40.  
  41. /**
  42. * Decode avro back to json.
  43. *
  44. * @param schema the schema the avro pertains to
  45. * @param avro the avro bytes
  46. * @return the json
  47. * @throws IOException if jackson fails
  48. */
  49. private JsonNode decodeToJson(Schema schema, byte[] avro) throws IOException {
  50. final ObjectMapper mapper = new ObjectMapper();
  51. final DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
  52. final Decoder decoder = new DecoderFactory().binaryDecoder(avro, null);
  53. final JsonNode node = mapper.readTree(reader.read(null, decoder).toString());
  54.  
  55. return node;
  56. }

Avro & Python: How to Schema, Write, Read

I have been experimenting with Apache Avro and Python. Below is what I have learned thus far.

Pip Install

At the time of this writing I am using 1.8.2.

  1. pip install avro-python3

Schema

There are so many different ways to work with the schema definition. There are primitive and complex types. You can find way more documentation on the schema definition here.

  1. import json
  2. import avro.schema
  3.  
  4. my_schema = avro.schema.Parse(json.dumps(
  5. {
  6. 'namespace': 'test.avro',
  7. 'type': 'record',
  8. 'name': 'MY_NAME',
  9. 'fields': [
  10. {'name': 'name_1', 'type': 'int'},
  11. {'name': 'name_2', 'type': {'type': 'array', 'items': 'float'}},
  12. {'name': 'name_3', 'type': 'float'},
  13. ]
  14. }))

Method 1

Write

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3. import io
  4.  
  5. #write binary
  6. file = open(filename, 'wb')
  7.  
  8. datum_writer = DatumWriter()
  9. fwriter = DataFileWriter(file, datum_writer, my_schema)
  10. fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
  11. fwriter.close()

Write Deflate

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3.  
  4. #write binary
  5. file = open(filename, 'wb')
  6.  
  7. datum_writer = DatumWriter()
  8. fwriter = DataFileWriter(file, datum_writer, my_schema, codec = 'deflate')
  9. fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
  10. fwriter.close()

Append

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3. import io
  4.  
  5. #append binary
  6. file = open(filename, 'a+b')
  7.  
  8. datum_writer = DatumWriter()
  9. #Notice that the schema is not added the the datafilewriter. This is because you are appending to an existing avro file
  10. fwriter = DataFileWriter(file, datum_writer)
  11. fwriter.append({'name_1': 645675, 'name_2': [5.6,34.9], 'name_3': 649.5645})
  12. fwriter.close()

Read Schema

  1. from avro.datafile import DataFileReader
  2. from avro.io import DatumReader
  3.  
  4. file = open(filename, 'rb')
  5. datum_reader = DatumReader()
  6. file_reader = DataFileReader(file, datum_reader)
  7.  
  8. print(file_reader .meta)

Read

  1. from avro.datafile import DataFileReader
  2. from avro.io import DatumReader
  3.  
  4. #read binary
  5. fd = open(filename, 'rb')
  6. datum_reader = DatumReader()
  7. file_reader = DataFileReader(fd, datum_reader)
  8.  
  9. for datum in file_reader:
  10. print(datum['name_1'])
  11. print(datum['name_2'])
  12. print(datum['name_3'])
  13. file_reader.close()

Method 2

Write/Append BinaryEncoder

  1. import io
  2. from avro.io import DatumWriter, BinaryEncoder
  3.  
  4. #write binary
  5. file = open(filename, 'wb')
  6. #append binary
  7. file = open(filename, 'a+b')
  8. bytes_writer = io.BytesIO()
  9. encoder = BinaryEncoder(bytes_writer)
  10. writer_binary = DatumWriter(my_schema)
  11. writer_binary.write({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645}, encoder)
  12. file.write(bytes_writer.getvalue())

Read BinaryDecoder

  1. import io
  2. from avro.io import DatumReader, BinaryDecoder
  3.  
  4. file = open(filename, 'rb')
  5. bytes_reader = io.BytesIO(file.read())
  6. decoder = BinaryDecoder(bytes_reader)
  7. reader = DatumReader(my_schema)
  8.  
  9. while True:
  10. try:
  11. rec = reader.read(decoder)
  12. print(rec['name_1'])
  13. print(rec['name_2'])
  14. print(rec['name_3'])
  15. except:
  16. break