Avro & Python: How to Schema, Write, Read

(Last Updated On: )

I have been experimenting with Apache Avro and Python. Below is what I have learned thus far.

Pip Install

At the time of this writing I am using 1.8.2.

  1. pip install avro-python3

Schema

There are so many different ways to work with the schema definition. There are primitive and complex types. You can find way more documentation on the schema definition here.

  1. import json
  2. import avro.schema
  3.  
  4. my_schema = avro.schema.Parse(json.dumps(
  5. {
  6. 'namespace': 'test.avro',
  7. 'type': 'record',
  8. 'name': 'MY_NAME',
  9. 'fields': [
  10. {'name': 'name_1', 'type': 'int'},
  11. {'name': 'name_2', 'type': {'type': 'array', 'items': 'float'}},
  12. {'name': 'name_3', 'type': 'float'},
  13. ]
  14. }))

Method 1

Write

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3. import io
  4.  
  5. #write binary
  6. file = open(filename, 'wb')
  7.  
  8. datum_writer = DatumWriter()
  9. fwriter = DataFileWriter(file, datum_writer, my_schema)
  10. fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
  11. fwriter.close()

Write Deflate

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3.  
  4. #write binary
  5. file = open(filename, 'wb')
  6.  
  7. datum_writer = DatumWriter()
  8. fwriter = DataFileWriter(file, datum_writer, my_schema, codec = 'deflate')
  9. fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
  10. fwriter.close()

Append

  1. from avro.datafile import DataFileWriter
  2. from avro.io import DatumWriter
  3. import io
  4.  
  5. #append binary
  6. file = open(filename, 'a+b')
  7.  
  8. datum_writer = DatumWriter()
  9. #Notice that the schema is not added the the datafilewriter. This is because you are appending to an existing avro file
  10. fwriter = DataFileWriter(file, datum_writer)
  11. fwriter.append({'name_1': 645675, 'name_2': [5.6,34.9], 'name_3': 649.5645})
  12. fwriter.close()

Read Schema

  1. from avro.datafile import DataFileReader
  2. from avro.io import DatumReader
  3.  
  4. file = open(filename, 'rb')
  5. datum_reader = DatumReader()
  6. file_reader = DataFileReader(file, datum_reader)
  7.  
  8. print(file_reader .meta)

Read

  1. from avro.datafile import DataFileReader
  2. from avro.io import DatumReader
  3.  
  4. #read binary
  5. fd = open(filename, 'rb')
  6. datum_reader = DatumReader()
  7. file_reader = DataFileReader(fd, datum_reader)
  8.  
  9. for datum in file_reader:
  10. print(datum['name_1'])
  11. print(datum['name_2'])
  12. print(datum['name_3'])
  13. file_reader.close()

Method 2

Write/Append BinaryEncoder

  1. import io
  2. from avro.io import DatumWriter, BinaryEncoder
  3.  
  4. #write binary
  5. file = open(filename, 'wb')
  6. #append binary
  7. file = open(filename, 'a+b')
  8. bytes_writer = io.BytesIO()
  9. encoder = BinaryEncoder(bytes_writer)
  10. writer_binary = DatumWriter(my_schema)
  11. writer_binary.write({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645}, encoder)
  12. file.write(bytes_writer.getvalue())

Read BinaryDecoder

  1. import io
  2. from avro.io import DatumReader, BinaryDecoder
  3.  
  4. file = open(filename, 'rb')
  5. bytes_reader = io.BytesIO(file.read())
  6. decoder = BinaryDecoder(bytes_reader)
  7. reader = DatumReader(my_schema)
  8.  
  9. while True:
  10. try:
  11. rec = reader.read(decoder)
  12. print(rec['name_1'])
  13. print(rec['name_2'])
  14. print(rec['name_3'])
  15. except:
  16. break