Is there a tutorial on using Kinesis with Twitter data?

Highlighted
Community Manager

Is there a tutorial on using Kinesis with Twitter data?

NOTE: Talend Streams has been replaced with Pipeline Designer. More information about this new Talend application in Pipeline Designer introduction

 

I've been having a play with this and while using the supplied data I found it very easy. I have now created a Twitter stream feeding into Kinesis. This is working. I have created a connection to my Kinesis stream. This seem to work. However, when creating a DataSet from this connection, I am limited to either CSV or AVRO. CSV does not give me the access to the data that I want (but it works), so I tried using AVRO. Unfortunately, I cannot seem to get this to work with my Twitter JSON. The AVRO schema I put together is below, but when I click on "View Sample", it tells me "sample not available" and I cannot use this data in a pipeline. I suspect I am doing something wrong (I am new to AVRO), but there is not enough documentation or feedback from the process to tell me what. Any ideas?

AVRO schema for Twitter JSON

{"type":"record",
 "name" : "twitter_schema",
 "namespace" : "com.rilhia.avro",
 "fields":[{"name":"id","type":"string"},
           {"name":"user_friends_count","type":["int","null"]},
           {"name":"user_location","type":["string","null"]},
           {"name":"user_description","type":["string","null"]},
           {"name":"user_statuses_count","type":["int","null"]},
           {"name":"user_followers_count","type":["int","null"]},
           {"name":"user_name","type":["string","null"]},
           {"name":"user_screen_name","type":["string","null"]},
           {"name":"created_at","type":["string","null"]},
           {"name":"text","type":["string","null"]},
           {"name":"retweet_count","type":["long","null"]},
           {"name":"retweeted","type":["boolean","null"]},
           {"name":"in_reply_to_user_id","type":["long","null"]},
           {"name":"source","type":["string","null"]},
           {"name":"in_reply_to_status_id","type":["long","null"]},
           {"name":"media_url_https","type":["string","null"]},
           {"name":"expanded_url","type":["string","null"]}
          ]
}

 


Accepted Solutions
Employee

Re: Is there a tutorial on using Kinesis with Twitter data?

Hello @rhall_2_0,

 

Indeed you have the choice between CSV and AVRO. You can achieve what you want with both options.

 

If you are using CSV you probably got your json data in a single field. What you can do is use the Python component and parse the json with this snippet of code (FLATMAP mode):

 

output = json.loads("{}")
output = input
outputList.append(output)

 And you should get your data as expected.

 

Now if you want to take advantage of AVRO you need to convert the data you are sending to your Kinesis stream in AVRO format. Meaning it's not JSON you have to send in your stream but AVRO data (binary). That is why you need to specify a schema that matches your data to be able to retrieve your data in the AVRO (binary data). 

 

I believe your are using the Twitter API. We have achieved a similar pipeline at Talend and you can watch a short video of it here. For this demo we are removing some fields from the original JSON document and converting the JSON into AVRO before sending it to Kafka (same concept for Kinesis).

 

Take a look the AVRO schema used in our case :

 

{
  "fields": [
    {
      "name": "timestamp_ms",
      "type": "string"
    },
    {
      "name": "text",
      "type": "string"
    },
    {
      "name": "user",
      "type": [
        "null",
        {
        "fields": [
          {
            "name": "screen_name",
            "type": [
              "string",
              "null"
            ]
          }
        ],
        "name": "user",
        "type": "record"
        }
      ]
    },
    {
      "name": "place",
      "type": [
        "null",
        {
          "fields": [
            {
              "name": "name",
              "type": [
                "string",
                "null"
              ]
            },
            {
              "name": "full_name",
              "type": [
                "string",
                "null"
              ]
            }
          ],
          "name": "place",
          "type": "record"
        }
      ]
    },
    {
      "name": "entities",
      "type": {
        "fields": [
          {
            "name": "hashtags",
            "type": {
              "items": {
                "fields": [
                  {
                    "name": "text",
                    "type": [
                      "string",
                      "null"
                    ]
                  }
                ],
                "name": "item",
                "type": "record"
              },
              "type": "array"
            }
          }
        ],
        "name": "entities",
        "type": "record"
      }
    }
  ],
  "name": "tweets",
  "type": "record"
}

 As an example, this schema corresponds to this type of record :

{  
   "timestamp_ms":"1532078832327",
   "text":"Hello from Talend #DataStreams #data #Talend",
   "user":{  
      "screen_name":"Talend",
   },
   "place":{  
      "name":"Redwood City",
      "full_name":"Redwood City, CA",
   },
   "entities":{  
      "hashtags":[  
         {  
            "text":"DataStreams",
         },
         {  
            "text":"data",
         },
         {  
            "text":"Talend",
         }
      ]
   }
}

Please let me know if you need further explanation.


All Replies
Employee

Re: Is there a tutorial on using Kinesis with Twitter data?

Hello @rhall_2_0,

 

Indeed you have the choice between CSV and AVRO. You can achieve what you want with both options.

 

If you are using CSV you probably got your json data in a single field. What you can do is use the Python component and parse the json with this snippet of code (FLATMAP mode):

 

output = json.loads("{}")
output = input
outputList.append(output)

 And you should get your data as expected.

 

Now if you want to take advantage of AVRO you need to convert the data you are sending to your Kinesis stream in AVRO format. Meaning it's not JSON you have to send in your stream but AVRO data (binary). That is why you need to specify a schema that matches your data to be able to retrieve your data in the AVRO (binary data). 

 

I believe your are using the Twitter API. We have achieved a similar pipeline at Talend and you can watch a short video of it here. For this demo we are removing some fields from the original JSON document and converting the JSON into AVRO before sending it to Kafka (same concept for Kinesis).

 

Take a look the AVRO schema used in our case :

 

{
  "fields": [
    {
      "name": "timestamp_ms",
      "type": "string"
    },
    {
      "name": "text",
      "type": "string"
    },
    {
      "name": "user",
      "type": [
        "null",
        {
        "fields": [
          {
            "name": "screen_name",
            "type": [
              "string",
              "null"
            ]
          }
        ],
        "name": "user",
        "type": "record"
        }
      ]
    },
    {
      "name": "place",
      "type": [
        "null",
        {
          "fields": [
            {
              "name": "name",
              "type": [
                "string",
                "null"
              ]
            },
            {
              "name": "full_name",
              "type": [
                "string",
                "null"
              ]
            }
          ],
          "name": "place",
          "type": "record"
        }
      ]
    },
    {
      "name": "entities",
      "type": {
        "fields": [
          {
            "name": "hashtags",
            "type": {
              "items": {
                "fields": [
                  {
                    "name": "text",
                    "type": [
                      "string",
                      "null"
                    ]
                  }
                ],
                "name": "item",
                "type": "record"
              },
              "type": "array"
            }
          }
        ],
        "name": "entities",
        "type": "record"
      }
    }
  ],
  "name": "tweets",
  "type": "record"
}

 As an example, this schema corresponds to this type of record :

{  
   "timestamp_ms":"1532078832327",
   "text":"Hello from Talend #DataStreams #data #Talend",
   "user":{  
      "screen_name":"Talend",
   },
   "place":{  
      "name":"Redwood City",
      "full_name":"Redwood City, CA",
   },
   "entities":{  
      "hashtags":[  
         {  
            "text":"DataStreams",
         },
         {  
            "text":"data",
         },
         {  
            "text":"Talend",
         }
      ]
   }
}

Please let me know if you need further explanation.

Community Manager

Re: Is there a tutorial on using Kinesis with Twitter data?

Thanks for the quick response! I will take a look at both methods at the weekend.

 

I figured that I would have to write the data from Twitter into AVRO before sending it to Kinesis, after I had posted this. Unfortunately finding examples of this is proving to be pretty difficult. 

 

It's a shame that datasets from Kinesis cannot be in JSON format (since JSON is available elsewhere and is far superior to CSV). Will this functionality become available in the future? It is especially frustrating since one of the examples uses Twitter data in JSON, but stored in a file (not really a realistic scenario for data stream processing). My assumption was that this was done to simulate Kafka/Kinesis as the source. 

 

Regards

 

Richard

Four Stars

Re: Is there a tutorial on using Kinesis with Twitter data?

I also have the same question/concern. Is there any plan to enable JSON format with Kinesis Stream input?

Employee

Re: Is there a tutorial on using Kinesis with Twitter data?

Don't worry gentlemen: handling natively JSON is definitely part of the short-term product roadmap. 

 

Kind regards,

 

Cyril.

15TH OCTOBER, COUNTY HALL, LONDON

Join us at the Community Lounge.

Register Now

2019 GARNER MAGIC QUADRANT FOR DATA INTEGRATION TOOL

Talend named a Leader.

Get your copy

OPEN STUDIO FOR DATA INTEGRATION

Kickstart your first data integration and ETL projects.

Download now

What’s New for Talend Summer ’19

Watch the recorded webinar!

Watch Now

Why Companies Move to the Cloud: 7 Success Stories

Learn how and why companies are moving to the Cloud

Read Now

Agile Data lakes & Analytics

Accelerate your data lake projects with an agile approach

Watch

Definitive Guide to Data Quality

Create systems and workflow to manage clean data ingestion and data transformation.

Download