decoding Avro messages in a spark scala streaming code.

Four Stars

decoding Avro messages in a spark scala streaming code.

Hi, 

I'm trying to stream unstructured encoded avro message data from a kafka topic via scala. normal message (not avro encoded) is working fine. Problem is decoding avro encoded message. Any help will be fine.  Code part is below: 

import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark._ 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010._ 

object Consumer4 { 
  def main(args: Array[String]): Unit = { 

    val spark = SparkSession 
      .builder() 
      .appName("Kafka test") 
      .getOrCreate(); 

    val sparkContext = SparkContext.getOrCreate() 
    val streamingContext = new StreamingContext(sparkContext, Seconds(2)) 
    val preferredHosts = LocationStrategies.PreferConsistent 
    val topics = List("xyz") // 

    val kafkaParams = Map[String, Object]( 
      "bootstrap.servers" -> "xyz.xyz.com.tr:1111", 
      "key.deserializer" -> classOf[StringDeserializer], 
      "value.deserializer" -> classOf[StringDeserializer], 
      "group.id" -> "use_a_separate_group_id_for_each_stream", 
      "auto.offset.reset" -> "latest", 
      "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 
    val stream = KafkaUtils.createDirectStream[String, String]( 
      streamingContext, 
      PreferConsistent, 
      Subscribe[String, String](topics, kafkaParams) 
    ) 
    //    stream.map(record => (record.key, record.value)) 
    stream.foreachRDD { rdd => 
      //      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
      rdd.foreachPartition { iter => 

        while (iter.hasNext) { 
          val item = iter.next() 
          println(item) 
        } 

        //        val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) 
        //        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") 
      } 
    } 
    streamingContext.start() 
    streamingContext.awaitTermination() 
  } 

Four Stars

Re: decoding Avro messages in a spark scala streaming code.

There should be at least suggestions or advise etc..

Some help will be great and i will appreciate it :-) 

Six Stars

Re: decoding Avro messages in a spark scala streaming code.

is the above code is being implemented through Talend ? problem statement is not clear, please explain ?

What’s New for Talend Spring ’19

Watch the recorded webinar!

Watch Now

Tutorial

Introduction to Talend Open Studio for Data Integration.

Watch

Downloads and Trials

Test drive Talend's enterprise products.

Downloads

Definitive Guide to Data Integration

Practical steps to developing your data integration strategy.

Download