Error with Kafka Topic

Six Stars

Error with Kafka Topic

Hello,

 

I planed to use Talend Data Pipeline with Kafka.

 

I have few troubles with error in Dataset configuration. Connection with Kafka & topic is OK

 

With Kafka console, I can read data

Command

 docker-compose -f docker-compose-avro-connector.yaml exec connect /kafka/bin/kafka-console-consumer.sh   --bootstrap-server kafka:9092     --from-beginning     --formatter io.confluent.kafka.formatter.AvroMessageFormatter    --property schema.registry.url=http://schema-registry:8081     --topic sql.Sales.SpecialDeals

Datas

{"before":null,"after":{"csldebeizumsql.Sales.SpecialDeals.Value":{"SpecialDealID":1,"StockItemID":null,"CustomerID":null,"BuyingGroupID":{"int":2},"CustomerCategoryID":null,"StockGroupID":{"int":7},"DealDescription":"10% 1st qtr USB Wingtip","StartDate":16801,"EndDate":16891,"DiscountAmount":null,"DiscountPercentage":{"bytes":"'\u0010"},"UnitPrice":null,"LastEditedBy":2,"LastEditedWhen":1451577600000000000}},"source":{"version":"1.0.0.Final","connector":"sqlserver","name":"csldebeizumsql","ts_ms":1578925452356,"snapshot":{"string":"true"},"db":"WideWorldImporters","schema":"Sales","table":"SpecialDeals","change_lsn":null,"commit_lsn":{"string":"00000275:00002ff8:0001"},"event_serial_no":null},"op":"r","ts_ms":{"long":1578925452362}

I get schema with

curl -X GET http://localhost:8081/subjects/sql.Sales.SpecialDeals-value/versions/1
{"subject":"sql.Sales.SpecialDeals-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"sql.Sales.SpecialDeals\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"SpecialDealID\",\"type\":\"int\"},{\"name\":\"StockItemID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"CustomerID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"BuyingGroupID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"CustomerCategoryID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"StockGroupID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"DealDescription\",\"type\":\"string\"},{\"name\":\"StartDate\",\"type\":{\"type\":\"int\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.Date\"}},{\"name\":\"EndDate\",\"type\":{\"type\":\"int\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.Date\"}},{\"name\":\"DiscountAmount\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":2,\"precision\":18,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"18\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"DiscountPercentage\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":3,\"precision\":18,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"3\",\"connect.decimal.precision\":\"18\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"UnitPrice\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":2,\"precision\":18,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"18\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"LastEditedBy\",\"type\":\"int\"},{\"name\":\"LastEditedWhen\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.NanoTimestamp\"}}],\"connect.name\":\"sql.Sales.SpecialDeals.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.sqlserver\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"change_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_serial_no\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.sqlserver.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"sql.Sales.SpecialDeals.Envelope\"}"}

My Topic connector 

 

{
  "name": "sqlserver-connector",  
  "config": {
    "connector.class": "XX",
    "database.hostname": "XX",
    "database.port": "XX", 
    "database.user": "XX", 
    "database.password": "XX", 
    "database.dbname": "XX", 
    "database.server.name": "XX", 
    "table.whitelist": "XX", 
    "database.history.kafka.bootstrap.servers": "kafka:9092", 
    "database.history.kafka.topic": "sql.schemas_change" ,
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverer",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081"
  }
}

In Talend Data Pipeline

 

Image 1565.png

 

In Talend Data Pipeline Remote Log

livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:332) ~[beam-runners-direct-java-2.12.0.jar:?]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:302) ~[beam-runners-direct-java-2.12.0.jar:?]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) ~[beam-runners-direct-java-2.12.0.jar:?]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) ~[beam-runners-direct-java-2.12.0.jar:?]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.talend.datastreams.beam.compiler.BeamJob.fullRun(BeamJob.java:312) ~[data-streams-streamsjob-1.6.5.jar:1.6.5]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.talend.datastreams.streamsjob.FullRunJob$.runJob(FullRunJob.scala:132) [data-streams-streamsjob-1.6.5.jar:1.6.5]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.talend.datastreams.streamsjob.FullRunJob$.main(FullRunJob.scala:46) [data-streams-streamsjob-1.6.5.jar:1.6.5]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.talend.datastreams.streamsjob.FullRunJob.main(FullRunJob.scala) [data-streams-streamsjob-1.6.5.jar:1.6.5]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout: Caused by: java.lang.ArrayIndexOutOfBoundsException: 58
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:416) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:290) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.io.parsing.Parser.advance(Parser.java:88) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:267) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:240) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[avro-1.8.1.jar:1.8.1]
livy_1                          | [2020-01-14 15:35:40,549] [INFO ] o.a.l.u.LineBufferedStream - stdout:        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.8.1.jar:1.8.1]

 

Kafka connector need a specific format or information ?

 

Json format is planed or only Avro ?

 

Tks for your help

TKs for your help

Employee

Re: Error with Kafka Topic

Hello @pierre,

 

Thanks for trying Talend Pipeline Designer Smiley Happy

 

If I understood it right, you have avro messages coming into your Kafka topic. You also retrieved the Avro schema through your Schema registry.

 

I would say that your schema looks correct as it comes directly from the schema registry. However, by seeing the indentation, did you copy-paste the whole result of your command ? 

This one: 

{"subject":"sql.Sales.SpecialDeals-value","version":1,"id":2,"schema":"{\"type\":\"record\",\"name\":\"Envelope\",\"namespace\":\"sql.Sales.SpecialDeals\",\"fields\":[{\"name\":\"before\",\"type\":[\"null\",{\"type\":\"record\",\"name\":\"Value\",\"fields\":[{\"name\":\"SpecialDealID\",\"type\":\"int\"},{\"name\":\"StockItemID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"CustomerID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"BuyingGroupID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"CustomerCategoryID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"StockGroupID\",\"type\":[\"null\",\"int\"],\"default\":null},{\"name\":\"DealDescription\",\"type\":\"string\"},{\"name\":\"StartDate\",\"type\":{\"type\":\"int\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.Date\"}},{\"name\":\"EndDate\",\"type\":{\"type\":\"int\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.Date\"}},{\"name\":\"DiscountAmount\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":2,\"precision\":18,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"18\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"DiscountPercentage\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":3,\"precision\":18,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"3\",\"connect.decimal.precision\":\"18\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"UnitPrice\",\"type\":[\"null\",{\"type\":\"bytes\",\"scale\":2,\"precision\":18,\"connect.version\":1,\"connect.parameters\":{\"scale\":\"2\",\"connect.decimal.precision\":\"18\"},\"connect.name\":\"org.apache.kafka.connect.data.Decimal\",\"logicalType\":\"decimal\"}],\"default\":null},{\"name\":\"LastEditedBy\",\"type\":\"int\"},{\"name\":\"LastEditedWhen\",\"type\":{\"type\":\"long\",\"connect.version\":1,\"connect.name\":\"io.debezium.time.NanoTimestamp\"}}],\"connect.name\":\"sql.Sales.SpecialDeals.Value\"}],\"default\":null},{\"name\":\"after\",\"type\":[\"null\",\"Value\"],\"default\":null},{\"name\":\"source\",\"type\":{\"type\":\"record\",\"name\":\"Source\",\"namespace\":\"io.debezium.connector.sqlserver\",\"fields\":[{\"name\":\"version\",\"type\":\"string\"},{\"name\":\"connector\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":\"long\"},{\"name\":\"snapshot\",\"type\":[{\"type\":\"string\",\"connect.version\":1,\"connect.parameters\":{\"allowed\":\"true,last,false\"},\"connect.default\":\"false\",\"connect.name\":\"io.debezium.data.Enum\"},\"null\"],\"default\":\"false\"},{\"name\":\"db\",\"type\":\"string\"},{\"name\":\"schema\",\"type\":\"string\"},{\"name\":\"table\",\"type\":\"string\"},{\"name\":\"change_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"commit_lsn\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"event_serial_no\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"io.debezium.connector.sqlserver.Source\"}},{\"name\":\"op\",\"type\":\"string\"},{\"name\":\"ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null}],\"connect.name\":\"sql.Sales.SpecialDeals.Envelope\"}"}

 

Because this result contains more than the schema. The schema is in fact only what's in the field "schema". So for me it would be:

 

{
   "type":"record",
   "name":"Envelope",
   "namespace":"sql.Sales.SpecialDeals",
   "fields":[
      {
         "name":"before",
         "type":[
            "null",
            {
               "type":"record",
               "name":"Value",
               "fields":[
                  {
                     "name":"SpecialDealID",
                     "type":"int"
                  },
                  {
                     "name":"StockItemID",
                     "type":[
                        "null",
                        "int"
                     ],
                     "default":null
                  },
                  {
                     "name":"CustomerID",
                     "type":[
                        "null",
                        "int"
                     ],
                     "default":null
                  },
                  {
                     "name":"BuyingGroupID",
                     "type":[
                        "null",
                        "int"
                     ],
                     "default":null
                  },
                  {
                     "name":"CustomerCategoryID",
                     "type":[
                        "null",
                        "int"
                     ],
                     "default":null
                  },
                  {
                     "name":"StockGroupID",
                     "type":[
                        "null",
                        "int"
                     ],
                     "default":null
                  },
                  {
                     "name":"DealDescription",
                     "type":"string"
                  },
                  {
                     "name":"StartDate",
                     "type":{
                        "type":"int",
                        "connect.version":1,
                        "connect.name":"io.debezium.time.Date"
                     }
                  },
                  {
                     "name":"EndDate",
                     "type":{
                        "type":"int",
                        "connect.version":1,
                        "connect.name":"io.debezium.time.Date"
                     }
                  },
                  {
                     "name":"DiscountAmount",
                     "type":[
                        "null",
                        {
                           "type":"bytes",
                           "scale":2,
                           "precision":18,
                           "connect.version":1,
                           "connect.parameters":{
                              "scale":"2",
                              "connect.decimal.precision":"18"
                           },
                           "connect.name":"org.apache.kafka.connect.data.Decimal",
                           "logicalType":"decimal"
                        }
                     ],
                     "default":null
                  },
                  {
                     "name":"DiscountPercentage",
                     "type":[
                        "null",
                        {
                           "type":"bytes",
                           "scale":3,
                           "precision":18,
                           "connect.version":1,
                           "connect.parameters":{
                              "scale":"3",
                              "connect.decimal.precision":"18"
                           },
                           "connect.name":"org.apache.kafka.connect.data.Decimal",
                           "logicalType":"decimal"
                        }
                     ],
                     "default":null
                  },
                  {
                     "name":"UnitPrice",
                     "type":[
                        "null",
                        {
                           "type":"bytes",
                           "scale":2,
                           "precision":18,
                           "connect.version":1,
                           "connect.parameters":{
                              "scale":"2",
                              "connect.decimal.precision":"18"
                           },
                           "connect.name":"org.apache.kafka.connect.data.Decimal",
                           "logicalType":"decimal"
                        }
                     ],
                     "default":null
                  },
                  {
                     "name":"LastEditedBy",
                     "type":"int"
                  },
                  {
                     "name":"LastEditedWhen",
                     "type":{
                        "type":"long",
                        "connect.version":1,
                        "connect.name":"io.debezium.time.NanoTimestamp"
                     }
                  }
               ],
               "connect.name":"sql.Sales.SpecialDeals.Value"
            }
         ],
         "default":null
      },
      {
         "name":"after",
         "type":[
            "null",
            "Value"
         ],
         "default":null
      },
      {
         "name":"source",
         "type":{
            "type":"record",
            "name":"Source",
            "namespace":"io.debezium.connector.sqlserver",
            "fields":[
               {
                  "name":"version",
                  "type":"string"
               },
               {
                  "name":"connector",
                  "type":"string"
               },
               {
                  "name":"name",
                  "type":"string"
               },
               {
                  "name":"ts_ms",
                  "type":"long"
               },
               {
                  "name":"snapshot",
                  "type":[
                     {
                        "type":"string",
                        "connect.version":1,
                        "connect.parameters":{
                           "allowed":"true,last,false"
                        },
                        "connect.default":"false",
                        "connect.name":"io.debezium.data.Enum"
                     },
                     "null"
                  ],
                  "default":"false"
               },
               {
                  "name":"db",
                  "type":"string"
               },
               {
                  "name":"schema",
                  "type":"string"
               },
               {
                  "name":"table",
                  "type":"string"
               },
               {
                  "name":"change_lsn",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               },
               {
                  "name":"commit_lsn",
                  "type":[
                     "null",
                     "string"
                  ],
                  "default":null
               },
               {
                  "name":"event_serial_no",
                  "type":[
                     "null",
                     "long"
                  ],
                  "default":null
               }
            ],
            "connect.name":"io.debezium.connector.sqlserver.Source"
         }
      },
      {
         "name":"op",
         "type":"string"
      },
      {
         "name":"ts_ms",
         "type":[
            "null",
            "long"
         ],
         "default":null
      }
   ],
   "connect.name":"sql.Sales.SpecialDeals.Envelope"
}

Please, can you confirm that you tried with this ? If yes, we will dig into the details to understand where the issue come from.

 

Thanks,

 

Thibaut

 

 

2019 GARNER MAGIC QUADRANT FOR DATA INTEGRATION TOOL

Talend named a Leader.

Get your copy

OPEN STUDIO FOR DATA INTEGRATION

Kickstart your first data integration and ETL projects.

Download now

What’s New for Talend Summer ’19

Watch the recorded webinar!

Watch Now

Generating a Heat Map with Twitter data using Pipeline Designer – Part 3

Part 3 of a series of blogs on showing you how to generate a Heat Map with Pipeline Designer

Blog

Generating a Heat Map with Twitter data using Pipeline Designer – Part 2

Part 2 of a series of blogs on showing you how to generate a Heat Map with Pipeline Designer

Blog

Generating a Heat Map with Twitter data using Pipeline Designer – Part 1

Part 1 of a series of blogs on showing you how to generate a Heat Map with Pipeline Designer

Blog