Avro & Python: How to Schema, Write, Read

(Last Updated On: )

I have been experimenting with Apache Avro and Python. Below is what I have learned thus far.

Pip Install

At the time of this writing I am using 1.8.2.

pip install avro-python3

Schema

There are so many different ways to work with the schema definition. There are primitive and complex types. You can find way more documentation on the schema definition here.

import json
import avro.schema

my_schema = avro.schema.Parse(json.dumps(
{
    'namespace': 'test.avro',
    'type': 'record',
    'name': 'MY_NAME',
    'fields': [
        {'name': 'name_1', 'type': 'int'},
        {'name': 'name_2', 'type': {'type': 'array', 'items': 'float'}},
        {'name': 'name_3', 'type': 'float'},
    ]
}))

Method 1

Write

from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import io

#write binary
file = open(filename, 'wb')

datum_writer = DatumWriter()
fwriter = DataFileWriter(file, datum_writer, my_schema)
fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
fwriter.close()

Write Deflate

from avro.datafile import DataFileWriter
from avro.io import DatumWriter

#write binary
file = open(filename, 'wb')

datum_writer = DatumWriter()
fwriter = DataFileWriter(file, datum_writer, my_schema, codec = 'deflate')
fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
fwriter.close()

Append

from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import io

#append binary
file = open(filename, 'a+b')

datum_writer = DatumWriter()
#Notice that the schema is not added the the datafilewriter. This is because you are appending to an existing avro file
fwriter = DataFileWriter(file, datum_writer)
fwriter.append({'name_1': 645675, 'name_2': [5.6,34.9], 'name_3': 649.5645})
fwriter.close()

Read Schema

from avro.datafile import DataFileReader
from avro.io import DatumReader

file = open(filename, 'rb')
datum_reader = DatumReader()
file_reader = DataFileReader(file, datum_reader)

print(file_reader .meta)

Read

from avro.datafile import DataFileReader
from avro.io import DatumReader

#read binary
fd = open(filename, 'rb')
datum_reader = DatumReader()
file_reader = DataFileReader(fd, datum_reader)

for datum in file_reader:
	print(datum['name_1'])
	print(datum['name_2'])
	print(datum['name_3'])
file_reader.close()

Method 2

Write/Append BinaryEncoder

import io
from avro.io import DatumWriter, BinaryEncoder

#write binary
file = open(filename, 'wb')
#append binary
file = open(filename, 'a+b')
bytes_writer = io.BytesIO()
encoder = BinaryEncoder(bytes_writer)
writer_binary = DatumWriter(my_schema)
writer_binary.write({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645}, encoder)
file.write(bytes_writer.getvalue())

Read BinaryDecoder

import io
from avro.io import DatumReader, BinaryDecoder

file = open(filename, 'rb')
bytes_reader = io.BytesIO(file.read())
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(my_schema)

while True:
	try:
		rec = reader.read(decoder)
		print(rec['name_1'])
		print(rec['name_2'])
		print(rec['name_3'])
	except:
		break