Thursday, July 4, 2019

Serializing Spark dataframes to Avro using KafkaAvroSerializer

I recently worked on a project that used Spark Structured Streaming using Apache Spark, Confluent Schema Registry and Apache Kafka. Due to some versioning constraints between the various components, I had to write a custom implementation of the KafkaAvroSerializer class for serializing Spark Dataframes into Avro format. The serialized data was then published to Kafka. This post is based on the examples specified in the Confluent documentation here.

In newer versions of Confluent Schema Registry, lot of the implementations detailed below have been simplified and much easier to use. The standard recommended usage of the Confluent KafkaAvroSerializer is fairly simple in that it requires you to set it as one of the Kafka properties that is used when initializing a KafkaProducer:

val kafkaProperties = new Properties();
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, io.confluent.kafka.serializers.KafkaAvroSerializer.class
val producer = new KafkaProducer(props);

This abstracts out many of the implementation specifics and details. The way this works is that when the object to be published to Kafka is sent using the KafkaProducer, internally the KafkaAvroSerializer does the following: