Scala-Kafka-Avro (Producing and Consuming Avro messages)

Datetime:2016-08-23 03:09:19          Topic: Apache Kafka  Scala           Share

This post will show you how to write and read messages in Avro format to/from Kafka.

Instead of using with plain-text messages, though, we will serialize our messages with Avro. That will allow us to send much more complex data structures over the wire.

Avro

Apache Avro is a language neutral data serialization format. A avro data is described in a language independent schema. The schema is usually written in JSON format and the serialization is usually to binary files although serialization to JSON is also supported.

Let’s add Avro dependency in build:

"org.apache.avro"  %  "avro"  %  "1.7.7"

We will consider schema like this:

{
    "namespace": "kakfa-avro.test",
     "type": "record",
     "name": "user",
     "fields":[
         {  "name": "id", "type": "int"},
         {   "name": "name",  "type": "string"},
         {   "name": "email", "type": ["string", "null"]}
     ]
}

You can instantiate schema as follows:

val schema: Schema = new Schema.Parser().parse(SCHEMA_STRING)

Here, SCHEMA_STRING is the JSON listed above as a Java String.

Now, we can create a Avro generic record object with instantiated schema and put user data into it.

val genericRecord: GenericRecord = new GenericData.Record(schema)

genericUser.put("id", "1")
genericUser.put("name", "singh")
genericUser.put("email", null)

After creating the generic record. Now we need to serialize the above generic record object. Here we will use Avro binary encoder to encode object into byte array.

val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(genericUser, encoder)
encoder.flush()
out.close()

val serializedBytes: Array[Byte] = out.toByteArray()

You can also use many third party API to serialize and deserialize and may be most friendly API.

So, it’s time to send serialized message to Kafka using producer. Here is entire Kafka Producer code:

Producer

import java.util.{Properties, UUID}

import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import domain.User
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import java.io.ByteArrayOutputStream

import org.apache.avro.io._
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}

import scala.io.Source

class KafkaProducer() {

  private val props = new Properties()

  props.put("metadata.broker.list", "localhost:9092")
  props.put("message.send.max.retries", "5")
  props.put("request.required.acks", "-1")
  props.put("serializer.class", "kafka.serializer.DefaultEncoder")
  props.put("client.id", UUID.randomUUID().toString())

  private val producer = new Producer[String, Array[Byte]](new ProducerConfig(props))

  //Read avro schema file
  val schema: Schema = new Parser().parse(Source.fromURL(getClass.getResource("/schema.avsc")).mkString)

  // Create avro generic record object
   val genericUser: GenericRecord = new GenericData.Record(schema)
 
 //Put data in that generic record
 genericUser.put("id", "1")
 genericUser.put("name", "sushil")
 genericUser.put("email", null)

 // Serialize generic record into byte array
 val writer = new SpecificDatumWriter[GenericRecord](schema)
 val out = new ByteArrayOutputStream()
 val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
 writer.write(genericUser, encoder)
 encoder.flush()
 out.close()

 val serializedBytes: Array[Byte] = out.toByteArray()
 
val queueMessage = new KeyedMessage[String, Array[Byte]](topic, serializedBytes) 
producer.send(queueMessage)

Now, in the same way we updated the producer to send binary message, we will create consumer which consume message from Kafka, deserialize and make generic record from it.

Consumer

import java.util.Properties

import domain.User
import org.apache.avro.Schema
import org.apache.avro.io.DatumReader
import org.apache.avro.io.Decoder
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.generic.GenericRecord
import org.apache.avro.io.DecoderFactory
import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist}
import kafka.serializer.DefaultDecoder

import scala.io.Source

class KafkaConsumer() {
  private val props = new Properties()

  val groupId = "demo-topic-consumer"
  val topic = "demo-topic"

  props.put("group.id", groupId)
  props.put("zookeeper.connect", "localhost:2181")
  props.put("auto.offset.reset", "smallest")
  props.put("consumer.timeout.ms", "120000")
  props.put("auto.commit.interval.ms", "10000")

  private val consumerConfig = new ConsumerConfig(props)
  private val consumerConnector = Consumer.create(consumerConfig)
  private val filterSpec = new Whitelist(topic)
  private val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0)

  lazy val iterator = streams.iterator()
 
//read avro schema file
  val schemaString = Source.fromURL(getClass.getResource("/schema.avsc")).mkString
  // Initialize schema
  val schema: Schema = new Schema.Parser().parse(schemaString)

 def read() =
 try {
 if (hasNext) {
 println("Getting message from queue.............")
 val message: Array[Byte] = iterator.next().message()
 getUser(message)
 } else {
 None
 }
 } catch {
 case ex: Exception => ex.printStackTrace()
 None
 }

 private def hasNext: Boolean =
 try
 iterator.hasNext()
 catch {
 case timeOutEx: ConsumerTimeoutException =>
 false
 case ex: Exception => ex.printStackTrace()
 println("Got error when reading message ")
 false
 }

private def getUser(message: Array[Byte]) = {
 
 // Deserialize and create generic record
 val reader: DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](schema)
 val decoder: Decoder = DecoderFactory.get().binaryDecoder(message, null)
 val userData: GenericRecord = reader.read(null, decoder)

 // Make user object
 val user = User(userData.get("id").toString.toInt, userData.get("name").toString, try {
 Some(userData.get("email").toString)
 } catch {
 case _ => None
 })
 Some(user)
}

}

Conclusion

In this post, we have seen how to produce messages encoded with Avro, how to send them into Kafka, how to consume with consumer and finally how to decode them. This help us to make  messaging system with complex data with the help of Kafka and Avro.

The one thing you have to note that the same Avro schema must be present on the both side (Producer and Consumer) to encode and decode message. Any change to schema, must be applied on both side. To overcome this problem, Confluent Platform comes into play with its Schema Registry which allow us to share Avro schema and handle changes of schema’s.

You can find complete code on GitHub .

References:

Kafka documentation

Avro documentation





About List