Avro & Java: Record Parsing

This tutorial will guide you through how to convert json to avro and then back to json. I suggest you first read through the documentation on Avro to familiarize yourself with it. This tutorial assumes you have a maven project already setup and a resources folder.

POM:

Add Avro Dependency

 

 

 

 

Add Jackson Dependency

Avro Schema File:

Next you need to create the avro schema file in your resources folder. Name the file “schema.avsc”. The extension avsc is the Avro schema extension.

{
    "namespace": "test.avro",
    "type": "record",
    "name": "MY_NAME",
    "fields": [
        {"name": "name_1", "type": "int"},
        {"name": "name_2", "type": {"type": "array", "items": "float"}},
        {"name": "name_3", "type": "float"}
    ]
}

Json Record to Validate:

Next you need to create a json file that conforms to your schema you just made. Name the file “record.json” and put it in your resources folder. The contents can be whatever you want as long as it conforms to your schema above.

{ "name_1": 234, "name_2": [23.34,654.98], "name_3": 234.7}

It’s Avro Time:

Imports:

import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

Conversion to Avro and Back:

private void run() throws IOException {
	//Get the schema and json record from resources
	final ClassLoader loader = getClass().getClassLoader();
	final File schemaFile = new File(loader.getResource("schema.avsc").getFile());
	final InputStream record = loader.getResourceAsStream("record.json");
	
	//Create avro schema
	final Schema schema = new Schema.Parser().parse(schemaFile);

	//Encode to avro
	final byte[] avro = encodeToAvro(schema, record);

	//Decode back to json
	final JsonNode node = decodeToJson(schema, avro);

	System.out.println(node);
	System.out.println("done");
}

/**
 * Encode json to avro
 * 
 * @param schema the schema the avro pertains to
 * @param record the data to convert to avro
 * @return the avro bytes
 * @throws IOException if decoding fails
 */
private byte[] encodeToAvro(Schema schema, InputStream record) throws IOException {
	final DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
	final DataInputStream din = new DataInputStream(record);
	final Decoder decoder = new DecoderFactory().jsonDecoder(schema, din);
	final Object datum = reader.read(null, decoder);
	final GenericDatumWriter<Object> writer = new GenericDatumWriter<>(schema);
	final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
	final Encoder encoder = new EncoderFactory().binaryEncoder(outputStream, null);
	writer.write(datum, encoder);
	encoder.flush();

	return outputStream.toByteArray();
}

/**
 * Decode avro back to json.
 * 
 * @param schema the schema the avro pertains to
 * @param avro the avro bytes
 * @return the json
 * @throws IOException if jackson fails
 */
private JsonNode decodeToJson(Schema schema, byte[] avro) throws IOException {
	final ObjectMapper mapper = new ObjectMapper();
	final DatumReader<GenericData.Record> reader = new GenericDatumReader<>(schema);
	final Decoder decoder = new DecoderFactory().binaryDecoder(avro, null);
	final JsonNode node = mapper.readTree(reader.read(null, decoder).toString());

	return node;
}

Avro & Python: How to Schema, Write, Read

I have been experimenting with Apache Avro and Python. Below is what I have learned thus far.

Pip Install

At the time of this writing I am using 1.8.2.

pip install avro-python3

Schema

There are so many different ways to work with the schema definition. There are primitive and complex types. You can find way more documentation on the schema definition here.

import json
import avro.schema

my_schema = avro.schema.Parse(json.dumps(
{
    'namespace': 'test.avro',
    'type': 'record',
    'name': 'MY_NAME',
    'fields': [
        {'name': 'name_1', 'type': 'int'},
        {'name': 'name_2', 'type': {'type': 'array', 'items': 'float'}},
        {'name': 'name_3', 'type': 'float'},
    ]
}))

Method 1

Write

from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import io

#write binary
file = open(filename, 'wb')

datum_writer = DatumWriter()
fwriter = DataFileWriter(file, datum_writer, my_schema)
fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
fwriter.close()

Write Deflate

from avro.datafile import DataFileWriter
from avro.io import DatumWriter

#write binary
file = open(filename, 'wb')

datum_writer = DatumWriter()
fwriter = DataFileWriter(file, datum_writer, my_schema, codec = 'deflate')
fwriter.append({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645})
fwriter.close()

Append

from avro.datafile import DataFileWriter
from avro.io import DatumWriter
import io

#append binary
file = open(filename, 'a+b')

datum_writer = DatumWriter()
#Notice that the schema is not added the the datafilewriter. This is because you are appending to an existing avro file
fwriter = DataFileWriter(file, datum_writer)
fwriter.append({'name_1': 645675, 'name_2': [5.6,34.9], 'name_3': 649.5645})
fwriter.close()

Read Schema

from avro.datafile import DataFileReader
from avro.io import DatumReader

file = open(filename, 'rb')
datum_reader = DatumReader()
file_reader = DataFileReader(file, datum_reader)

print(file_reader .meta)

Read

from avro.datafile import DataFileReader
from avro.io import DatumReader

#read binary
fd = open(filename, 'rb')
datum_reader = DatumReader()
file_reader = DataFileReader(fd, datum_reader)

for datum in file_reader:
	print(datum['name_1'])
	print(datum['name_2'])
	print(datum['name_3'])
file_reader.close()

Method 2

Write/Append BinaryEncoder

import io
from avro.io import DatumWriter, BinaryEncoder

#write binary
file = open(filename, 'wb')
#append binary
file = open(filename, 'a+b')
bytes_writer = io.BytesIO()
encoder = BinaryEncoder(bytes_writer)
writer_binary = DatumWriter(my_schema)
writer_binary.write({'name_1': 645645, 'name_2': [5.6,34.7], 'name_3': 644.5645}, encoder)
file.write(bytes_writer.getvalue())

Read BinaryDecoder

import io
from avro.io import DatumReader, BinaryDecoder

file = open(filename, 'rb')
bytes_reader = io.BytesIO(file.read())
decoder = BinaryDecoder(bytes_reader)
reader = DatumReader(my_schema)

while True:
	try:
		rec = reader.read(decoder)
		print(rec['name_1'])
		print(rec['name_2'])
		print(rec['name_3'])
	except:
		break