Automate S3 file processing with Talend Cloud and AWS Lambda

Overview

The integration between AWS S3 and Lambda is very common in the Amazon world, and many examples include executing the Lambda function upon S3 file arrival.

 

This article explains how to use AWS to execute a Talend Cloud Job. The diagram shows the workflow setup:

architecture.png

 

  1. A file is uploaded to an S3 bucket.

  2. S3 triggers the Lambda function.

  3. The Lambda function calls a Talend Flow.

  4. The Talend Flow retrieves the S3 file to process it based on the parameters sent by the Lambda function.

 

Prerequisites

  • A valid AWS account with access to the following:

    • S3

    • Lambda

  • A Talend Cloud account or trial account

 

Configuring Amazon S3

 

Creating a Bucket

  1. Sign in to your Amazon account and open the Amazon S3 page.

  2. Click Create bucket.

    Capture d’écran 2018-04-26 à 10.49.26.png

     

  3. Configure the following fields, then click Next:
    1. Bucket name: The bucket name must be unique across all AWS.

    2. Region: Select the region where your bucket resides, in this case, Ireland.

    Capture d’écran 2018-04-26 à 10.50.05.png

     

  4. Keep the default settings. Click Next.

    Capture d’écran 2018-04-26 à 10.58.16.png

     

  5. Keep the default permissions. Review the configuration, then click Create bucket.

    Capture d’écran 2018-04-26 à 10.58.42.png

 

Creating a Policy and a User

When accessing S3 with a remote Job, you need to give a user programmatic access (no access to the S3 console) and you need to create a policy limiting the user/application access to only this bucket.

 

Creating a Policy

  1. In the AWS console, navigate to the IAM (Identity and Access Management) page.

  2. Navigate to the Policies section, then click Create policy.

    Capture d’écran 2018-04-26 à 11.08.27.png

     

    Capture d’écran 2018-04-26 à 11.08.54.png

     

  3. Using the visual editor, configure the policy as shown below:

    1. Service: Select S3.

    2. Action: Select GetObject and GetObjectVersion. GetObject allows you to retrieve the file in your Job.

    3. Resources: Point to your S3 bucket using ARN (Amazon Resource Name). The * at the end means all objects in your S3 bucket.

    4. Request conditions: Leave as is.

      Capture d’écran 2018-04-26 à 11.15.31.png

       

    5. Click JSON to see your policy in a JSON format, as shown below:

      Capture d’écran 2018-04-26 à 11.15.42.png

       

  4. Review your policy, then click Create policy.

    Capture d’écran 2018-04-26 à 11.15.53.png

     

Giving a User Programmatic Access

  1. In IAM, navigate to the Users section, then click Add user.

    Capture d’écran 2018-04-26 à 11.19.22.png

     

  2. Select a User name, select the Programmatic access check box, then click Next: Permissions.

    Capture d’écran 2018-04-26 à 11.19.57.png

     

  3. Select Attach existing policies directly, and choose the policy you created in the previous section.

    Capture d’écran 2018-04-26 à 11.20.18.png

     

  4. Review your settings, then click Create user.

    Capture d’écran 2018-04-26 à 11.20.42.png

     

  5. Well done, your user is created. Do not forget to download and save the Access and Secret keys.

    Capture d’écran 2018-04-26 à 11.21.00.png

     

Creating and Publishing a Talend Job in Talend Cloud

In this section, you learn how to create and publish a Talend Job in Talend Cloud.

 

Creating a Job

Create a Job that retrieves a file from S3, and displays the data in the console. Of course, a real Job will be more complex.

 

In Amazon S3, upload a file to test your Job.

 

  1. Create a folder and name it connections.

    Capture d’écran 2018-04-26 à 14.34.23.png

     

  2. Create a file, in this example connections_012018.csv, then upload the file to the connections folder.

    Capture d’écran 2018-04-26 à 14.35.43.png

     

  3. In Studio, create a new context group called S3Parameters, then click Next.

    Capture d’écran 2018-04-26 à 14.37.55.png

     

  4. Configure the following parameters using the information from your S3 bucket, then click Finish:

    • parameter_accessKey: the access key used by your application to connect to Amazon S3

    • parameter_secretKey: the secret key used by your application to connect to Amazon S3

    • parameter_bucketName: the bucket name on S3

    • parameter_bucketKey: the file key—on S3, there is no folder so the path is considered the file key

    • parameter_tempFolder: the temporary folder where you will store the file for processing—on Talend Cloud Engine, it is /tmp/

      Capture d’écran 2018-04-26 à 14.41.13.png

       

  5. Create a new Job, and name it S3Read. The Job is composed of three stages:

    1. Create an object connection to S3.
    2. Get the file from S3.
    3. Read the file.

    Capture d’écran 2018-04-26 à 14.50.08.png

     

  6. Configure the tS3Connection component to a specific region, and the context variables for Access and Secret keys.

    Capture d’écran 2018-04-26 à 14.50.19.png

     

  7. Configure the tS3Get component to retrieve the file based on the context parameters, and store it in the temp folder.

    Capture d’écran 2018-04-26 à 14.50.29.png

     

  8. Configure the tFileInputDelimited component to read the file stored in the temp folder.

    Capture d’écran 2018-04-26 à 14.50.39.png

     

  9. Test the Job locally to see if it connects and reads the file correctly.

    Capture d’écran 2018-04-26 à 14.51.10.png

     

  10. Next, upload the Job to Talend Cloud. Navigate to Window > Preferences > Talend > Integration Cloud and configure your access to Talend Cloud.

    Capture d’écran 2018-04-26 à 14.52.05.png

     

  11. Once a connection is established, right-click the Job and select Publish to Cloud.

    Capture d’écran 2018-04-26 à 15.09.37.png

     

  12. Click Finish.

    Capture d’écran 2018-04-26 à 15.10.39.png

     

  13. When the Job has finished uploading, click Open Job Flow.

    Capture d’écran 2018-04-26 à 15.11.35.png

     

Publishing the Job in Talend Cloud

  1. In Talend Cloud, you can see the required parameters.

    Capture d’écran 2018-04-26 à 15.12.05.png

     

  2. Update the configuration based on your own bucket, then click Save.

    Capture_d’écran_2018-04-26_à_15_46_50.png

     

  3. Select your runtime, for this example, use a cloud engine.

    Capture d’écran 2018-04-26 à 15.47.50.png

     

  4. Because you configured with an existing file, you can test your Job by clicking Run Now.

    Capture d’écran 2018-04-26 à 15.48.10.png

    Capture d’écran 2018-04-26 à 15.59.01.png

     

  5. You will see the content of your file in the log.

    Capture d’écran 2018-04-26 à 15.59.23.png

     

  6. Now, test your Job using a remote call with Talend Cloud API.

    Capture d’écran 2018-04-26 à 16.15.49.png

     

  7. Confirm that you are using v1.1 API, then click Authorize.

    Capture d’écran 2018-04-26 à 16.16.15.png

     

  8. Log in using your Talend Cloud account credentials.

    Capture d’écran 2018-04-26 à 16.16.40.png

     

  9. Now, find the Flow Id. In Talend Cloud > Integration Cloud > Flows > the Flow Id is in the upper left corner of your flow.

    Capture d’écran 2018-04-26 à 16.26.36.png

    Capture d’écran 2018-04-26 à 16.27.01.png

     

  10. For this example, use the POST /executions operations.

    Capture d’écran 2018-06-26 à 11.58.48.png

     

  11. Create a body with:

    • executable: your Flow Id

    • parameters: all context variables you want to overwrite. In this example, specify the bucket name.

    Capture d’écran 2018-04-26 à 16.33.55.png

     

  12. Scroll down, then click Try it out!

    Capture d’écran 2018-04-26 à 16.34.08.png

     

  13. Review the results.

    Capture d’écran 2018-04-26 à 16.34.36.png

     

  14. Check your flow and notice that a second execution appears.

    Capture d’écran 2018-04-26 à 16.34.55.png

     

AWS Lambda

At this stage, you have deployed your Job to Talend Cloud and tested a call with the API. Now, create the Lambda function, which is triggered through the API for each new file and call in your Job.

 

  1. Connect to your AWS console, and in the Lambda section, select Create a function.

    Capture d’écran 2018-04-26 à 17.05.49.png

     

  2. Give your function a name. Select the runtime Python 3.6. In the Role section, select Create custom role.

    Capture d’écran 2018-05-02 à 11.27.50.png

     

  3. Create a new Role, it will create a role and a new role policy.

    newrole.png

     

  4. Review the configuration, then click Create function.

    Capture d’écran 2018-05-02 à 11.28.30.png

     

  5. To create the trigger, select an S3 trigger on the left under Designer.

    Capture d’écran 2018-05-02 à 11.29.25.png

     

  6. Configure the trigger with your bucket name and a prefix (in this example, the connections folder). Select Enable trigger, then click Add.

    Capture d’écran 2018-05-02 à 11.30.58.png

     

  7. Verify that the new trigger was added.

    Capture d’écran 2018-05-02 à 11.31.09.png

     

  8. Copy the code from the function in the lambda_function.py file attached to this article.

    Capture d’écran 2018-05-02 à 11.35.38.png

     

  9. Configure the environment variables:

    • TCLOUD_API_ENDPOINT: URL to call the API

    • TCLOUD_USER: User that has the right to call the API

    • TCLOUD_PWD: the TCLOUD_USER password

    • TCLOUD_FLOWID: Talend Flow Id of the Job

    Capture d’écran 2018-05-02 à 11.36.55.png

     

  10. Add tags to identify your function.

    Capture d’écran 2018-05-02 à 11.37.07.png

     

  11. Save your function. Now you can add a new file to your folder in S3, and you will see an execution of the Lambda function.

    Capture d’écran 2018-05-02 à 11.38.43.png

     

  12. In Talend Cloud, verify there is a third execution.

    Capture d’écran 2018-05-02 à 11.39.44.png

     

  13. You will see the content of your file in the log.

    Capture d’écran 2018-05-02 à 11.40.08.png

     

For more information, see the AWS documentation, Using AWS Lambda with Amazon S3 page.

Version history
Revision #:
21 of 21
Last update:
‎09-29-2018 12:10 AM
Updated by: