Last week I had a run in with Apache Avro. A data serialization method which I have used a couple of times in the past. Most of the times in combination with Kafka, but I remembered also being pleasantly surprised when using it on it's own.
The thing I realized this time, is that there are a lot of different ways of using Avro and I wanted to write them down for myself. So here we are, and do not expect this to be a complete overview or guide. It is just some notes. I will focus mostly on the Java side here but Avro supports more programming languages.
Methods of defining you data
The primary method for defining your data is to create ".avsc" files. These are Json files which can be used to generate classes, encoders and decoders. Always make sure that the definition files are shared between projects. Either in a dependency or by using a git submodule. You can do some other manual bookkeeping and copy the files around but this can become difficult to track over time. To generate those classes in java an external tool can be used. One tool which can be easily integrated is a maven plugin which runs automatically during your compile cycle. This way your implementation is always checked against the data definition.
Note however, if you are using Intellij1, that this is not integrated with the plugin.
This means, doing an Intellij build will fail. If you do a mvn compile
then you will notice that
the ".java" files are being created in the namespace you denoted in the definition.
The classes you get for free will contain a Builder for builder/flow style object
creation, plus a whole lot more.
If the interacting projects are all written on the JVM then the reflective API is also an option for defining your data. This means defining your classes in normal Java POJOs and creating encoders and decoders using Avro's ReflectiveDatum-Reader/-Writer. The disadvantage is clear since you have to rely on Java. It is best to have some common library in this case on which all projects can depend. The upside is that there is no need to write raw Json specifications or understand "avsc" files.
There are also other languages in the Avro ecosystem like IDL and .avpr files which allow you to describe whole remote-procedure-call (RPC) schemes. A complete example can be found here
Methods of turning objects into bytes
First off, I am not going to go into Json encoding and decoding but that's also a possibility with Avro. This blog-post will be concerning binary serialization and deserialization.
Once we have some plugin-generated class files there is a convenient build in avroObject.toByteBuffer()
method.
It does not say what specific avro-encoding is used for these bytes, but I think it adheres to the
single-object encoding since it will have a header and 10 bytes total in front of the object itself.
For decoding these objects a simple AvroObject.fromByteBuffer
will do.
There is also a method which delivers just the bytes of the object and nothing more.
In this example BottleMessage
is defined in an .avsc file, and I am passing
around byte[]
but I could be using something else for the encoder input
depending on the application.
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
public class DatumTransformer {
DatumWriter<BottleMessage> bottleMessageDatumWriter =
new SpecificDatumWriter<>(BottleMessage.class);
DatumReader<BottleMessage> bottleMessageDatumReader =
new SpecificDatumReader<>(BottleMessage.class);
EncoderFactory encoderFactory = EncoderFactory.get();
DecoderFactory decoderFactory = DecoderFactory.get();
BinaryEncoder reuseEncoder = null;
BinaryDecoder reuseDecoder = null;
public byte[] encode(BottleMessage bottleMessage) {
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
reuseEncoder = encoderFactory.binaryEncoder(byteStream, reuseEncoder);
try {
bottleMessageDatumWriter.write(bottleMessage, reuseEncoder);
reuseEncoder.flush();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException();
}
return byteStream.toByteArray();
}
public BottleMessage decode(byte[] bytes) {
reuseDecoder = decoderFactory.binaryDecoder(bytes, reuseDecoder);
try {
// Reuse variable not used in this example
return bottleMessageDatumReader.read(null, reuseDecoder);
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException();
}
}
}
Note that you can pass in reuse objects. In that case the reuse object's properties are set,
instead of a new object created. This could be useful if you change the common-interface object
into project specific objects immediately anyway and you do not want to create (more) garbage-collection
pressure. I did not do any performance tests however to see if this really helps in this case.
In general, the performance of decoding single objects is very similar for both methods.
Decoding multiple objects from a stream of data is easier
using the EncoderFactory
approach and is also a bit faster
than transforming single objects with the toByteBuffer
method.
Then there is a special method for writing to files. It can be used to save many rows of
data and it can be read without knowing the schema. Probably parquet-files or other
columnar-based storage would be better for large sets of data. For heavily nested, record
based data however, it works quite well. There is a possibility to add
optional compression codecs. Code-wise, encoding to a file and decoding from a file
looks very similar to the previous binary-encoding method, but with a DataFileWriter
instead
of a EncoderFactory
+ BinaryEncoder
. For reading without schema a GenericDatumReader<GenericRecord>
is used.
Other software
The whole reason why I ran into trouble to begin with was because the AvroCoder of Apache Beam
was producing messages which the AvroObject.fromByteBuffer
could not decode. Furthermore the
Avro integration into Kafka generally only works with single-object encoding. So when using
higher-level libaries which produce or read Avro data it is important to inspect what kind of encoding
and decoding it is doing.
Reading the avro data afterwards in Python turned out to be difficult. I did try to do it again for this blog but had to resort to changing some bytes / cutting some bytes to make it work. But this could also be because I used Base64 to copy over the pure byte array. Many of the specialized Python libraries available which make reading avro faster focus on reading avro files (with the schemas attached like fastavro). To read single-object encoded byte data the confluent_kafka library is probably needed.
Other links
The code for this blogpost can be found here
Finally I want to point to some other links which might be useful
- Baeldung tutorial
- Gist with examples of using Generic-Encoder/-Decoder and a reference to Scala's Avro4s library
-
I think this holds for LSP implementations too but I haven't checked ↩