Fifteen Stars

Is there a tutorial on using Kinesis with Twitter data?

I've been having a play with this and while using the supplied data I found it very easy. I have now created a Twitter stream feeding into Kinesis. This is working. I have created a connection to my Kinesis stream. This seem to work. However, when creating a DataSet from this connection, I am limited to either CSV or AVRO. CSV does not give me the access to the data that I want (but it works), so I tried using AVRO. Unfortunately, I cannot seem to get this to work with my Twitter JSON. The AVRO schema I put together is below, but when I click on "View Sample", it tells me "sample not available" and I cannot use this data in a pipeline. I suspect I am doing something wrong (I am new to AVRO), but there is not enough documentation or feedback from the process to tell me what. Any ideas?

AVRO schema for Twitter JSON

{"type":"record",
 "name" : "twitter_schema",
 "namespace" : "com.rilhia.avro",
 "fields":[{"name":"id","type":"string"},
           {"name":"user_friends_count","type":["int","null"]},
           {"name":"user_location","type":["string","null"]},
           {"name":"user_description","type":["string","null"]},
           {"name":"user_statuses_count","type":["int","null"]},
           {"name":"user_followers_count","type":["int","null"]},
           {"name":"user_name","type":["string","null"]},
           {"name":"user_screen_name","type":["string","null"]},
           {"name":"created_at","type":["string","null"]},
           {"name":"text","type":["string","null"]},
           {"name":"retweet_count","type":["long","null"]},
           {"name":"retweeted","type":["boolean","null"]},
           {"name":"in_reply_to_user_id","type":["long","null"]},
           {"name":"source","type":["string","null"]},
           {"name":"in_reply_to_status_id","type":["long","null"]},
           {"name":"media_url_https","type":["string","null"]},
           {"name":"expanded_url","type":["string","null"]}
          ]
}

 

Rilhia Solutions
1 ACCEPTED SOLUTION

Accepted Solutions
Employee

Re: Is there a tutorial on using Kinesis with Twitter data?

Hello @rhall_2_0,

 

Indeed you have the choice between CSV and AVRO. You can achieve what you want with both options.

 

If you are using CSV you probably got your json data in a single field. What you can do is use the Python component and parse the json with this snippet of code (FLATMAP mode):

 

output = json.loads("{}")
output = input
outputList.append(output)

 And you should get your data as expected.

 

Now if you want to take advantage of AVRO you need to convert the data you are sending to your Kinesis stream in AVRO format. Meaning it's not JSON you have to send in your stream but AVRO data (binary). That is why you need to specify a schema that matches your data to be able to retrieve your data in the AVRO (binary data). 

 

I believe your are using the Twitter API. We have achieved a similar pipeline at Talend and you can watch a short video of it here. For this demo we are removing some fields from the original JSON document and converting the JSON into AVRO before sending it to Kafka (same concept for Kinesis).

 

Take a look the AVRO schema used in our case :

 

{
  "fields": [
    {
      "name": "timestamp_ms",
      "type": "string"
    },
    {
      "name": "text",
      "type": "string"
    },
    {
      "name": "user",
      "type": [
        "null",
        {
        "fields": [
          {
            "name": "screen_name",
            "type": [
              "string",
              "null"
            ]
          }
        ],
        "name": "user",
        "type": "record"
        }
      ]
    },
    {
      "name": "place",
      "type": [
        "null",
        {
          "fields": [
            {
              "name": "name",
              "type": [
                "string",
                "null"
              ]
            },
            {
              "name": "full_name",
              "type": [
                "string",
                "null"
              ]
            }
          ],
          "name": "place",
          "type": "record"
        }
      ]
    },
    {
      "name": "entities",
      "type": {
        "fields": [
          {
            "name": "hashtags",
            "type": {
              "items": {
                "fields": [
                  {
                    "name": "text",
                    "type": [
                      "string",
                      "null"
                    ]
                  }
                ],
                "name": "item",
                "type": "record"
              },
              "type": "array"
            }
          }
        ],
        "name": "entities",
        "type": "record"
      }
    }
  ],
  "name": "tweets",
  "type": "record"
}

 As an example, this schema corresponds to this type of record :

{  
   "timestamp_ms":"1532078832327",
   "text":"Hello from Talend #DataStreams #data #Talend",
   "user":{  
      "screen_name":"Talend",
   },
   "place":{  
      "name":"Redwood City",
      "full_name":"Redwood City, CA",
   },
   "entities":{  
      "hashtags":[  
         {  
            "text":"DataStreams",
         },
         {  
            "text":"data",
         },
         {  
            "text":"Talend",
         }
      ]
   }
}

Please let me know if you need further explanation.

4 REPLIES
Employee

Re: Is there a tutorial on using Kinesis with Twitter data?

Hello @rhall_2_0,

 

Indeed you have the choice between CSV and AVRO. You can achieve what you want with both options.

 

If you are using CSV you probably got your json data in a single field. What you can do is use the Python component and parse the json with this snippet of code (FLATMAP mode):

 

output = json.loads("{}")
output = input
outputList.append(output)

 And you should get your data as expected.

 

Now if you want to take advantage of AVRO you need to convert the data you are sending to your Kinesis stream in AVRO format. Meaning it's not JSON you have to send in your stream but AVRO data (binary). That is why you need to specify a schema that matches your data to be able to retrieve your data in the AVRO (binary data). 

 

I believe your are using the Twitter API. We have achieved a similar pipeline at Talend and you can watch a short video of it here. For this demo we are removing some fields from the original JSON document and converting the JSON into AVRO before sending it to Kafka (same concept for Kinesis).

 

Take a look the AVRO schema used in our case :

 

{
  "fields": [
    {
      "name": "timestamp_ms",
      "type": "string"
    },
    {
      "name": "text",
      "type": "string"
    },
    {
      "name": "user",
      "type": [
        "null",
        {
        "fields": [
          {
            "name": "screen_name",
            "type": [
              "string",
              "null"
            ]
          }
        ],
        "name": "user",
        "type": "record"
        }
      ]
    },
    {
      "name": "place",
      "type": [
        "null",
        {
          "fields": [
            {
              "name": "name",
              "type": [
                "string",
                "null"
              ]
            },
            {
              "name": "full_name",
              "type": [
                "string",
                "null"
              ]
            }
          ],
          "name": "place",
          "type": "record"
        }
      ]
    },
    {
      "name": "entities",
      "type": {
        "fields": [
          {
            "name": "hashtags",
            "type": {
              "items": {
                "fields": [
                  {
                    "name": "text",
                    "type": [
                      "string",
                      "null"
                    ]
                  }
                ],
                "name": "item",
                "type": "record"
              },
              "type": "array"
            }
          }
        ],
        "name": "entities",
        "type": "record"
      }
    }
  ],
  "name": "tweets",
  "type": "record"
}

 As an example, this schema corresponds to this type of record :

{  
   "timestamp_ms":"1532078832327",
   "text":"Hello from Talend #DataStreams #data #Talend",
   "user":{  
      "screen_name":"Talend",
   },
   "place":{  
      "name":"Redwood City",
      "full_name":"Redwood City, CA",
   },
   "entities":{  
      "hashtags":[  
         {  
            "text":"DataStreams",
         },
         {  
            "text":"data",
         },
         {  
            "text":"Talend",
         }
      ]
   }
}

Please let me know if you need further explanation.

Fifteen Stars

Re: Is there a tutorial on using Kinesis with Twitter data?

Thanks for the quick response! I will take a look at both methods at the weekend.

 

I figured that I would have to write the data from Twitter into AVRO before sending it to Kinesis, after I had posted this. Unfortunately finding examples of this is proving to be pretty difficult. 

 

It's a shame that datasets from Kinesis cannot be in JSON format (since JSON is available elsewhere and is far superior to CSV). Will this functionality become available in the future? It is especially frustrating since one of the examples uses Twitter data in JSON, but stored in a file (not really a realistic scenario for data stream processing). My assumption was that this was done to simulate Kafka/Kinesis as the source. 

 

Regards

 

Richard

Rilhia Solutions
Four Stars

Re: Is there a tutorial on using Kinesis with Twitter data?

I also have the same question/concern. Is there any plan to enable JSON format with Kinesis Stream input?

Employee

Re: Is there a tutorial on using Kinesis with Twitter data?

Don't worry gentlemen: handling natively JSON is definitely part of the short-term product roadmap. 

 

Kind regards,

 

Cyril.