A repeatable process for running Spark Jobs on Amazon EMR using Talend

Overview

Amazon’s Elastic Map Reduce (EMR) clusters provide a set of components to enable Big Data type processing in the Amazon Web Services (AWS) cloud. EMR hosts the traditional Hadoop type ecosystem including HDFS, Hive, Map-Reduce, and Yarn, with common Hadoop ecosystem clients for user interfaces such as Hue, Resource Manager, and so on. EMR also supports Spark under Yarn. With the complex security, cluster, and networking requirements it has been a challenge to enable the development and running of Spark Jobs on an EMR cluster from Talend Studio. This article presents a repeatable process to build an EMR cluster, place it in a Virtual Private Cloud (VPC), and enable all necessary security, cluster, and networking requirements to allow Talend Studio to run a Spark Job in the EMR cluster.

 

The architecture designed to support this process does not require the placement of a copy of Studio in the cloud, and does not require the opening (or shutting down) of the Studio workstation’s firewall. The architecture is described in detail in the next section.

 

To facilitate the processes and procedures presented in this article, a companion Talend project archive is attached with all Jobs shown in this article.

 

Architecture

The basic architecture for Talend Spark on EMR is show below:

architecture.pngTalend Spark on EMR Architecture

 

The key to this architecture is the creation of the EMR cluster in an AWS VPC, then the creation of an edge node in the VPC. A Talend JobServer is added to the edge node and started. Studio is configured to execute Jobs on the JobServer on this edge node.

 

This document covers the development and testing process only. Installing a remote JobServer into a TAC is desirable, and is used for non-development (usually) that may have dependencies and are scheduled. Many Talend customers will have seamless integration from their in-house networks to Amazon’s VPCs. This may change the methods used to obtain cluster information slightly, but the overall program as described is sound. The program as described in this article was developed with Amazon’s EMR cluster secured and accessed using the public internet.

 

Prerequisites

  • Talend Studio for Big Data release 6.4.1
  • Talend JobServer release 6.4.1 for Linux
  • Access to an AWS Account with the ability to create, run, and maintain:

    • S3 file bucket storage
    • Public AMI’s to build EC2 instances
    • VPC
    • Security Groups
    • EMR Clusters
  • Access to Talend EMR project files (attached to this article)
  • The EMR Java code library (downloaded from a site such as AWS SDK for Java)

 

EMR Java code library

As there is quite a bit of manual processing required to get EMR clusters started, a Java class was created to provide useful utility methods for querying EMR clusters, based on Amazon’s published API for EMR. At the time of this writing, the Java class contained the following public methods:

  • AmazonEMR

    • getPublicIPAddr: attempt to find the Public IP Address of the master cluster node
    • getPrivateIPAddr: attempt to find the Private IP Address of the master cluster node
    • getPublicDNS: attempt to find the Public IP Address of the master cluster node
    • getEMRPrivateHosts: attempt to build strings necessary for the Studio workstation's hosts file for the AWS private cluster IP/Hostnames

      • Suitable for an edge node
    • getEMRTalendPrivateHosts: attempt to build the strings necessary for a Studio /etc/hosts file for the AWS public cluster IP address and private hostnames
    • getEMRPublicHosts: attempt to build the strings necessary for the Studio workstation's hosts file for the AWS public cluster IP/Hostnames
    • getAmazonEMRInfo: Get a list of EMR IDs, cluster Name, and status for user in a region
  • The EMR Java Code Library requires the following JAR files to function. These should be added by right-clicking the code routine and clicking edit routine libraries.

    • aws-java-sdk-1.10.52.jar
    • aws-java-sdk-emr-1.11.226.jar
    • commons-codec-1.6.jar
    • commons-logging-1.1.3.jar
    • httpclient-4.3.6.jar
    • httpcore-4.3.3.jar
    • jackson-annotations-2.5.3.jar
    • jackson-core-2.5.3.jar
    • jackson-databind-2.5.3.jar
    • joda-time-2.8.1.jar

 

Procedure

The goal is to run a Spark Job in an EMR cluster from Studio. To accomplish this, you will do the following:

  1. Create an EMR cluster using Talend Studio.
  2. Create an edge node in the EMR cluster’s VPC.
  3. Install a Talend JobServer on the edge node.
  4. Configure Studio to use the new remote JobServer.
  5. Run a Spark batch test Job on the EMR Cluster using the remote JobServer.

 

EMR cluster

While it is possible (and may be necessary the first time) to start an EMR cluster from the AWS web front end, or from the AWS command line, the preferred method for this purpose is to use Talend out of the box components in Talend Studio. The figure below shows a basic Talend Job to configure and start an EMR cluster.

emrCluster.pngJob tdye_emr_start

 

This Job uses tAmazonEMRManage to start an EMR cluster, and a tFixedFlowInput component to map the global output variables set by the tAmazonEMRManage and send them on to the tLogRow. Finally, a tJava component calls certain methods out of the AmazonEMR Java utility class and prints the results in the console window.

 

tAmazonEMRManage

Basic settings for the tAmazonEMRManage component are shown in the screenshot below:

tAmazonEMRManage.pngtAmazonEMRManage Basic Settings

 

Most of this is straightforward. For Cluster version and Application, you are taking the newest EMR version supported by Talend 6.4.1, and a Spark-only installation. You must create Service and Job flow roles in the AWS web front end, and may need to create an initial cluster using that tool, instead of Talend Studio. Ideally, logging is done to an S3 file bucket. This will persist the logs after the cluster is terminated. An EC2 key pair is required to enable SSH access to the cluster. This will be needed by the edge node. You can create these key pairs in the AWS web front end and download them, or create them locally and upload them to AWS.

 

Advanced Settings of the tAmazonEMRManage component are shown below:

tAmazonEMRManage.pngtAmazonEMRManage Advanced Settings

 

Key configuration items on the Advanced Settings tab are the Subnet ID, and the Master and Slave security groups. These allow the EMR Cluster to be created in a VPC. The key here is that the Master and Slave security groups must belong to the VPC, which is indicated by the Subnet ID. The Subnet ID is part of the VPC as configured in the AWS web front end.

 

To enable access to the cluster, add a security group to the Additional master security groups and Additional slave security groups fields. This security group should enable access to and from your edge node, and optionally, SSH access. If you don’t add security groups configured for your access, you will have to follow the instructions in the Enable Access section below.

 

tFixedFlowInput

The tFixedFlowInput component captures the global output variables set by tAmazonEMRManange and sends them to the tLogRow component. This is to capture the Cluster Final ID and Cluster Final Name upon successful completion of the EMR Cluster startup. For more information about global output variables, see the Talend Studio help or documentation for the tAmazonEMRManage component.

tFixedFlowInput.pngtFixedFlowInput Basic Settings

 

tJava

The tJava component calls several methods in the AmazonEMR Java library for demonstration purposes. The method you are most interested in is getEMRPrivateHosts. This will pull the EMR cluster’s private IP addresses, cluster node names, and cluster node fully qualified domain names (FQDNs) in a format that can be copied and pasted into a /etc/hosts file. This will be done to the new edge node’s /etc/hosts file.

tJava.pngtJava Basic Settings

 

Job output

When successfully run, the Job produces the following output:

jobOutput.pngtdye_emr_start Job Output

 

The AmazonEMR Java library methods produce the following useful information:

  • Cluster ID: This is how AWS identifies the EMR cluster. This can be used by Talend components to modify or stop the cluster using Studio EMR components.
  • Master Node information: This includes public and private IP addresses, and the public DNS FQDN. This identifies the master node in the cluster.
  • A number of hosts file type entries: For your purposes, the most useful contains the private IP address, private node name, and private FQDN. These are internal AWS names and not reachable outside the cluster or VPC. The two remaining hosts entries are useful for the use case where a copy of Studio is placed in the cloud on a VPC. This use case is out of scope for this discussion.

 

Copy and paste the entire Job output to a text editor. The information contained in the output will be useful in later steps of this repeatable procedure. Alternatively, the Job could be edited to write the Job output to persistent storage.

 

Enable access

If you did not provide your own security groups when configuring the cluster (see the tAmazonEMRManage section above), you need to complete the instructions in this section to enable access to the EMR Cluster. Access the security groups in the AWS web console:

 

enableAccess.pngEMR Cluster Configuration

 

Click the security group links and edit the inbound rules for both master and slave:

securityGroups.pngEdit Security Groups

 

Add inbound entries for your client (for example, Studio) workstation and a Classless Inter-Domain Routing (CIDR) block for your VPC subnet.

 

Edge Node

Next, you will create an edge node instance in AWS EC2 in the same Virtual Private Cloud as your EMR cluster.

 

Using the AWS web front end, or the AWS CLI, create an EC2 instance. For this demonstration, you will use the following Amazon Machine Image (AMI):

  • ami-10503820 – Centos 6.5, 20GB EBS Storage, Paravirtualized

The key parts of the EC2 instance configuration are show in the screenshot below:

AMI.pngEdge Node EC2 Configuration

 

  1. EC2 Instance Type should be at least a C3.Large for a Talend Remote JobServer.
  2. Be sure you assign the same VPC ID that was assigned to the EMR Cluster.
  3. Be sure you assign the same Subnet ID that was assigned to the EMR Cluster.
  4. The Security groups assigned must belong to the VPC. Click the Security group link and add an inbound entry for your Studio workstation’s IP address and a CIDR for your VPC subnet (if not already present).
  5. Use the same SSH key pair that was used in the creation of the EMR Cluster.
  6. The Talend JobServer prefers to run on a paravirtualized AMI. If you assign this remote JobServer to TAC, it must be paravirtualized. Hypervirtualized AMIs won’t work.

 

Configure EC2 edge node

SSH into the edge node. You should be able to ping any of the nodes in your EMR cluster by IP address. If not, review the Enable Access section above. Next, edit the hosts file on the edge node. Copy and paste the hosts entry for Private hosts entries (Edge Node) that was saved from the Talend EMR start Job previously. Include an entry for the edge node itself (last entry in the screenshot below).

etcHosts.png/etc/hosts on Edge Node

 

Talend JobServer

Upload the manual installer for release 6.4.1 of the Talend JobServer for Linux to your edge node (use SCP). This install is a simple unzip, so unzip to /opt/Talend. Rename the unzipped JobServer directory to JobServer, as shown below:

jobServer.pngJobServer Directory Listing

 

Start the JobServer by running the start_rs.sh script. Running in the foreground is fine for now.

 

When started successfully, you will see output similar to the screenshot below (did you remember to install Java?)

jobServerStart.pngJobServer Successful Start

 

Configure Talend Studio

Add the JobServer to your copy of Talend Studio as a remote execution environment. This is found in Studio under Window > Preferences > Talend > Run/Debug > Remote.

rjssandbox.pngStudio Remote Server

 

Spark Batch Job

At this point, your EMR cluster, edge node and remote server are all set up and running. Studio is configured to allow remote execution of Jobs on the edge node. Now you will attempt to start a simple demo Spark Job in Studio that will execute on the edge node, and act as the Spark client for Spark on the EMR cluster.

 

The Spark Job is a batch Job that generates three sets of ‘log’ data, unites the rows into a single stream, partitions the rows for Spark, then writes the rows to a Parquet file in HDFS on the Spark cluster. With this Job, you show that you can execute a Spark Job in EMR using a JobServer on an edge node.

generateLogsBatch.pngGenerateLogsBatch Spark Job

 

Be sure you have selected the remote JobServer in the Target Exec side tab on the Job’s Run tab:

generateLogsBatchTarget.pngGenerateLogsBatch Target Exec

 

Configure the Job to run on the EMR Spark service as shown below. Create a metadata entry for the EMR cluster, and configure the tHDFSConfiguration component to point to the EMR cluster.

generateLogsBatchSpark.pngGenerateLogsBatch Spark Configuration

 

Run the Job

Click Run to run the Job. If successful, you will see output in the Studio Execution log as shown below:

generateRawLogs.pngGenerateRawLogs Output

 

Finally, you can view the parquet file created by the Job in Hue (the link is found in the AWS EMR configuration listing for the cluster).

parquet.pngGenerateRawLogs Parquet File Output

 

Conclusion

You have seen a repeatable process for running Spark Jobs in an Amazon EMR cluster. This method does not require placing a copy of Studio in the cloud, or a complex and often impossible Studio workstation configuration (editing of a Studio workstation hosts file, or disabling of a Studio workstation firewall). Almost all configuration for this method is done in the cloud:

  • Create an AWS EMR Cluster in a VPC
  • Create an edge node in the same VPC
  • Enable network access between the cluster and the edge node
  • Install a Talend Remote JobServer on the edge node
  • Run Talend Spark batch Jobs in Studio to be executed on the edge Node remote server

 

Glossary

Term Definition
AWS Amazon Web Services, Amazon’s cloud product
BD Big Data (usually a Hadoop ecosystem: HDFS, Hive, HBase, etc.)
Edge Node An interface between a Hadoop cluster and the outside world.
EMR Elastic Map Reduce – Amazon’s Cloud Hadoop/Spark cluster product
TAC Talend Administration Center
VPC Virtual Private Cloud – Amazon’s virtual private network for the cloud
Version history
Revision #:
11 of 11
Last update:
‎09-29-2018 12:15 AM
Updated by:
 
Labels (3)
Comments
Five Stars

A really good post, it answers multiple questions that I had. Thanks a lot.