Four Stars

tKafkaInputAvro fails when deserializing Avro Message

We are trying to deserialize an Avro message from a Kafka Topic in a Big Data Streaming job. 

 

For this attempt, we are trying to use the tKafkaInputAvro component but continue to get errors the job tries to deserialize the message.

 

How do you configured the tKafkaInputAvro component so it will successfully deserialize a Avro message from a Kafka Topic?

 

Avro Schema:

 

{
  "namespace": "example",
  "type": "record",
  "name": "KafkaInputAvro",
  "fields": [
     {"name": "Name", "type": "string"},
     {"name": "Color",  "type": ["string", "null"]},
     {"name": "Number", "type": ["int", "null"]}
 ]
}

 

 

Java Publisher:

Messages are published to Kafka via a Java code.  The code has been updated to send the message with and without a key:           

Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);

KafkaProducer<String, byte[]> messageProducer = new KafkaProducer<String, byte[]>(props);
ProducerRecord<String, byte[]> producerRecord = null;
producerRecord = new ProducerRecord<String, byte[]>(TOPIC, data.toByteArray());
//producerRecord = new ProducerRecord<String, byte[]>(TOPIC, “1”, data.toByteArray());
messageProducer.send(producerRecord);

 

 

Talend Job - Consumer:

Here is what the consumer job looks like, the tKafkaInputAvro has been updated with “Use hierarchical mode” enabled and disabled, regardless the setting the deserialization fails.KafkaInputAvro1.png

 

tKafkaInputAvro Schema:KafkaInputAvro3.png

 When set to Use hierarchical mode:
KafkaInputAvro2.png

 

When the job runs and tries to deserialize the message we get the following exceptions (stack trace removed), see attached file for complete stack trace:

 

Starting job ReadMessage_tKafkaInputAvro at 07:58 06/06/2018.
[statistics] connecting to socket on port 4016
[statistics] connected
[WARN ]: org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
[WARN ]: org.apache.spark.SparkConf - In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding enable.auto.commit to false for executor
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding auto.offset.reset to none for executor
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding executor group.id to spark-executor-example.KafkaInputAvroGroup1
[WARN ]: org.apache.spark.streaming.kafka010.KafkaUtils - overriding receive.buffer.bytes to 65536 see KAFKA-3135
[WARN ]: org.apache.kafka.clients.consumer.ConsumerConfig - The configuration serializer.encoding = UTF-8 was supplied but isn't a known config.
[WARN ]: org.apache.kafka.clients.consumer.ConsumerConfig - The configuration serializer.encoding = UTF-8 was supplied but isn't a known config.
[ERROR]: org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
org.apache.kafka.common.KafkaException: Error deserializing key/value for partition example.KafkaInputAvro-0 at offset 1
…
Caused by: java.lang.ArrayIndexOutOfBoundsException: -40
at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:402)
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290)
…
at talend_proj.readmessage_tkafkainputavro_0_1.ReadMessage_tKafkaInputAvro$tKafkaInputAvro_1_KafkaAbstractInputAvro_ValueDeserializer.deserialize(ReadMessage_tKafkaInputAvro.java:423)
at talend_proj.readmessage_tkafkainputavro_0_1.ReadMessage_tKafkaInputAvro$tKafkaInputAvro_1_KafkaAbstractInputAvro_ValueDeserializer.deserialize(ReadMessage_tKafkaInputAvro.java:1)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:655)
... 41 more
[ERROR]: org.apache.kafka.clients.NetworkClient - Uncaught error in request completion:
org.apache.kafka.common.KafkaException: Error deserializing key/value for partition example.KafkaInputAvro-0 at offset 1
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:665)
…