decoding Avro messages in a spark scala streaming code.

Four Stars

decoding Avro messages in a spark scala streaming code.

Hi, 

I'm trying to stream unstructured encoded avro message data from a kafka topic via scala. normal message (not avro encoded) is working fine. Problem is decoding avro encoded message. Any help will be fine.  Code part is below: 

import org.apache.kafka.common.serialization.StringDeserializer 
import org.apache.spark._ 
import org.apache.spark.sql.SparkSession 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe 
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent 
import org.apache.spark.streaming.kafka010._ 

object Consumer4 { 
  def main(args: Array[String]): Unit = { 

    val spark = SparkSession 
      .builder() 
      .appName("Kafka test") 
      .getOrCreate(); 

    val sparkContext = SparkContext.getOrCreate() 
    val streamingContext = new StreamingContext(sparkContext, Seconds(2)) 
    val preferredHosts = LocationStrategies.PreferConsistent 
    val topics = List("xyz") // 

    val kafkaParams = Map[String, Object]( 
      "bootstrap.servers" -> "xyz.xyz.com.tr:1111", 
      "key.deserializer" -> classOf[StringDeserializer], 
      "value.deserializer" -> classOf[StringDeserializer], 
      "group.id" -> "use_a_separate_group_id_for_each_stream", 
      "auto.offset.reset" -> "latest", 
      "enable.auto.commit" -> (false: java.lang.Boolean) 
    ) 
    val stream = KafkaUtils.createDirectStream[String, String]( 
      streamingContext, 
      PreferConsistent, 
      Subscribe[String, String](topics, kafkaParams) 
    ) 
    //    stream.map(record => (record.key, record.value)) 
    stream.foreachRDD { rdd => 
      //      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
      rdd.foreachPartition { iter => 

        while (iter.hasNext) { 
          val item = iter.next() 
          println(item) 
        } 

        //        val o: OffsetRange = offsetRanges(TaskContext.get.partitionId) 
        //        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}") 
      } 
    } 
    streamingContext.start() 
    streamingContext.awaitTermination() 
  } 

Four Stars

Re: decoding Avro messages in a spark scala streaming code.

There should be at least suggestions or advise etc..

Some help will be great and i will appreciate it :-) 

Six Stars

Re: decoding Avro messages in a spark scala streaming code.

is the above code is being implemented through Talend ? problem statement is not clear, please explain ?