Integrating Apache Zeppelin data science notebooks with Talend

Overview

This article demonstrates how to integrate Zeppelin Notebooks in a Talend DI Job by leveraging the Zeppelin RESTful API.

Use the two notebooks prepared for you, in the Archive.zip file that is attached to this article. The first notebook trains, using historical data stored on HDFS, a Machine Learning model and saves it to HDFS. The second notebook loads the trained Machine Learning model and uses it to score the new data. Then the article explains how to develop two Talend DI Jobs to integrate those notebooks. The first Job ingests historical data to HDFS, and then triggers its execution. The second Job uploads the new data from S3 to HDFS, runs the notebook that scores the new data, and saves the results to HDFS.

 

image.png

 

Assumptions

  1. Amazon Web Services (AWS):

    • You should be familiar with the AWS platform.
  2. Talend:

    • You should have a basic knowledge of Talend Studio.
  3. Restful API:
    • You should have a basic understanding of Restful API.

 

Prerequisite

  • AWS account

  • EMR 5.11.1 with Hadoop, Zeppelin, Livy, Hive, Hue, and Spark

  • AWS S3 bucket

  • IAM Role to access S3 bucket (Access Key / Secret Key)

  • EC2 machine with Talend Studio 7.0.1 and above

  • Source materials: Archive.zip file attached to this article

 

Apache Zeppelin

 

Getting started with Zeppelin on EMR

  1. Connect to your AWS Management Console, click Services, then search for EMR and select it.

    image.png

     

  2. Click Create cluster, on the following screen, select Go to advanced options.

    image.png

     

  3. On the Software and Steps page, select emr-5.11.1 from the Release pull-down menu, then select Hadoop, Zeppelin, Livy, Hive, Hue, and Spark from the Software Configuration list. Click Next.

    image.png

     

  4. On the Hardware configuration page, keep the default setting or if needed choose a specific Network and Subnet, then click Next.

  5. On the General Cluster Settings page, under General Options, provide a Cluster name for your cluster. Click Next.

    image.png

     

  6. On the Security page, under the Security Options settings, select an EC2 key pair from the drop-down menu, or create one, then click Create cluster.

    image.png

     

  7. Wait a few minutes for your cluster to be up and running.

    image.png

     

  8. Edit the network security rules by navigating to the EC2 dashboard.

    image.png

     

  9. Select the EC2 machine of your master node, then under the Description tab, click Security groups.

    image.png

     

  10. Select the Inbound tab and click Edit.

    image.png

     

  11. Create a new rule to allow all traffic from the EC2 where Talend will be installed, make sure to use the Security Group ID.

    image.png

     

  12. Create a new rule to allow all traffic from your local machine to the cluster.

    image.png

     

  13. Go back to your EMR cluster home page and select your cluster. On the Summary tab, notice that you can now access your cluster web UI connections from your local machine.

    image.png

     

  14. Click the Hue link, for the first connection, create the Hue superuser, using admin as username, and a password of your choice. Remain on this screen.

  15. Your EMR cluster is all set.

     

Training a Machine Learning model

  1. Before getting into Zeppelin, you need to upload the training data. Select Files, under Browser, from the menu on the left, and navigate to /user/admin.

    image.png

     

  2. Click the New button, and select Directory. Name the directory, Input_Data, then click Create.

    image.png

     

  3. Navigate to Input_Data, click Upload > Files, and browse to the TrainingData.csv file and select it.

    image.png

    image.png

     

  4. Return to the EMR Management Console, and click the Zeppelin link.

    image.png

     

  5. From the Welcome to Zeppelin page, select Create new note to open the properties box.

    1. In the Note Name field, name the note Model_Training.

    2. Select spark from the Default Interpreter pull-down menu.

    3. Click Create Note.

    image.png

     

  6. In the first paragraph, import the TrainingData.csv file and create a Spark DataFrame, by copying and pasting the following code into the note:

    val df_data = spark.
    read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
        option("header", "true").
        option("inferSchema", "true").
        load("/user/admin/Input_Data/TrainingData.csv")
    

     

  7. Run the paragraph by clicking the Play button on the right-hand side of the paragraph. You can observe the results output below your code.

    image.png

     

  8. In the second paragraph, split your data in to three parts, training_data, test_data, and prediction_data, using the code below, then run the paragraph.

    val splits = df_data.randomSplit(Array(0.8, 0.18, 0.02), seed = 24L)
    val training_data = splits(0).cache()
    val test_data = splits(1)
    val prediction_data = splits(2)
    

    image.png

     

  9. In the third paragraph, import machine necessary libraries, create features using indexers, and assemble your features in a vector, using the code below, then run the paragraph.

    import org.apache.spark.ml.classification.RandomForestClassifier
    import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, IndexToString, VectorAssembler}
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    import org.apache.spark.ml.{Model, Pipeline, PipelineStage, PipelineModel}
    import org.apache.spark.sql.SparkSession
    
    val stringIndexer_label = new StringIndexer().setInputCol("PRODUCT_LINE").setOutputCol("label").fit(df_data)
    val stringIndexer_prof = new StringIndexer().setInputCol("PROFESSION").setOutputCol("PROFESSION_IX")
    val stringIndexer_gend = new StringIndexer().setInputCol("GENDER").setOutputCol("GENDER_IX")
    val stringIndexer_mar = new StringIndexer().setInputCol("MARITAL_STATUS").setOutputCol("MARITAL_STATUS_IX")
    
    val vectorAssembler_features = new VectorAssembler().setInputCols(Array("GENDER_IX", "AGE", "MARITAL_STATUS_IX", "PROFESSION_IX")).setOutputCol("features")
    

    image.png

     

  10. In the last paragraph, select your model, create the pipeline, train it, and save it to disk, using the code below, then run the paragraph. For permission reasons, the model is saved under Hadoop.

    val rf = new RandomForestClassifier().setLabelCol("label").setFeaturesCol("features").setNumTrees(5)
    val labelConverter = new IndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(stringIndexer_label.labels)
    val pipeline_rf = new Pipeline().setStages(Array(stringIndexer_label, stringIndexer_prof, stringIndexer_gend, stringIndexer_mar, vectorAssembler_features, rf, labelConverter))
    val model_rf = pipeline_rf.fit(training_data)
    model_rf.write.overwrite().save("/user/hadoop/Model/MyModel")
    

    image.png

     

  11. If you want to check the predicted results on the test data split, add the following code to the next paragraph and run it. Don't forget to remove this paragraph after testing.

    val prediction_test= model_rf.transform(test_data)
    prediction_test.show(10)
    

    image.png

     

  12. Your training model notebook is all set. Go to Hue and remove the training data because you will upload it from S3 to the cluster using a Talend Job.

    image.png

     

Scoring a Machine Learning model

  1. Create a new note, by clicking Notebook and selecting Create new note.

    image.png

     

  2. Name the note Model_Scoring, then select spark from the Default Interpreter pull-down menu. Click Create Note.

    image.png

     

  3. From Hue, upload the NewData.csv file into the /user/admin/Input_Data/ folder.

    image.png

     

  4. Go back to the notebook, and read the NewData.csv file from HDFS, store the data in a Spark DataFrame, and filter out the product line, by copying and pasting the following code into the note, then run the paragraph.

    val df_data = spark.
       read.format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat").
        option("header", "true").
        option("inferSchema", "true").
        load("/user/admin/Input_Data/NewData.csv")
    
    val df_newdata = df_data.select("GENDER","AGE","MARITAL_STATUS","PROFESSION")

    image.png

     

  5. Import the necessary libraries, and load the trained model, using the code below, then run the paragraph.

    import org.apache.spark.ml.classification.RandomForestClassifier
    import org.apache.spark.ml.feature.{OneHotEncoder, StringIndexer, IndexToString, VectorAssembler}
    import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
    import org.apache.spark.ml.{Model, Pipeline, PipelineStage, PipelineModel}
    import org.apache.spark.sql.SparkSession
    
    val model_rf = PipelineModel.read.load("/user/hadoop/Model/MyModel")
    

    image.png

     

  6. Score your new data using the trained model, and export result to HDFS, using the code below, then run the paragraph.

    val newprediction = model_rf.transform(df_newdata)
    val output_pred = newprediction.select("GENDER","AGE","MARITAL_STATUS","PROFESSION","predictedLabel")
    output_pred.coalesce(1).write.mode("overwrite").csv("/user/hadoop/Output_Data/output.csv")
    

    image.png

     

  7. Using File Browser, navigate to /user/Hadoop/Output_Data/output.csv and check the results.

    image.png

     

  8. Your model scoring notebook is all set, go to Hue and remove the NewData.csv because you will upload it from S3 to the cluster using a Talend Job.

     

Talend Studio

 

Getting started

  1. Before installing Studio make sure that your EC2 can access the EMR cluster, by going to the inbound network rules of your cluster and allowing all traffic from the Studio security group.

  2. To install Studio on an EC2 machine, see the Installing Talend Studio with the Talend Studio Installer page on the Talend Help Center.

  3. When the installation is complete, open Studio, and create a new local project.

  4. Provision an S3 bucket and upload the TrainingData.csv and NewData.csv files.

 

Creating a Machine Learning training Job

  1. Expand Job Designs, then right-click Standard, and select Create Standard Job.

    image.png

     

  2. Name your Job ML_training and click Finish.

    image.png

     

  3. On the Repository view, expand Metadata, and right-click Create Hadoop Cluster.

    1. Give your cluster a name, then click Next.

    2. Select the distribution, Amazon EMR, and version, EMR 5.8.0, of your Hadoop cluster.

    3. Select Enter manually Hadoop services.
    4. Click Finish.

      image.png

       

  4. Enter the connection information manually, using the admin username for authentication. Click Next.

    image.png

     

  5. Click Check Services to ensure that Studio can successfully connect to the cluster.

    image.png

     

  6. In the designer, add a tPrejob component, a tS3Connection component, and a tHDFSConnection component.

    image.png

     

  7. Connect the tPrejob to the tS3Connection using an OnComponentOK trigger, then connect the tS3Connection to tHDFSConnection using an OnSubjobOK. trigger.

    image.png

     

  8. Double-click the tS3Connection component, on the Basic settings tab, fill in the Access Key and Secret Key.

    image.png

     

  9. Double-click the tHDFSConnection component.

    1. On the Basic settings tab, select Repository as the Property Type.

    2. To the right of Repository, click the […] button, and navigate to Metadata.

    3. Select EMR_HDFS.

    4. Click OK.

    image.png

     

  10. Add a tS3Get, tHDFSPut, tREST and a tLogRow component to the canvas.

    image.png

     

  11. Using OnSubjobOK triggers, connect them as shown below:

    image.png

     

  12. Double-click the tS3Get component, on the Basic settings tab, fill in the Bucket, Key, and File fields with the appropriate settings.

    image.png

     

  13. Double-click the tHDFSPut component, on the Basic settings tab, configure the settings as shown in the screenshot below:

    image.png

     

  14. Before setting up the tRest component, retrieve the note id using the List of the notes API from the Apache Zeppelin web site.

  15. Open up a new tab on your web browser, and using the EMR IP and Zeppelin port of your instance, enter the URL as follows: http://zeppelin-server:zeppelin-port/api/notebook.

    image.png

     

  16. Double-click the tRest component, to use the method to run a note, enter the URL, for example, http://zeppelin-server:zeppelin-port/api/notebook/job/NOTE_ID, then select POST for the HTTP Method.

    image.png

     

  17. Edit and define the schema, as shown in the screenshot below:

    image.png

     

  18. Configure the HTTP Headers, by setting the name to "Content-type", and the value to "application/json".

    image.png

     

  19. Run the Job and confirm results.

    image.png

     

Creating a Machine Learning scoring Job

  1. Create a new Standard Job, and name it ML_Scoring. Copy and paste the tPreJob, tS3connection, and tHDFSConnection components from the previous Job into this Job.

    image.png

     

  2. Add a tS3Get, tHDFSPut, tREST, and a tLogRow component to the canvas and connect them as shown below:

    image.png

     

  3. Double-click the tS3Get component, and configure the Basic settings as shown below:

    image.png

     

  4. Double-click the tHDFSPut component, and configure the Basic settings as shown below:

    image.png

     

  5. Before setting up the tREST component, call the Zeppelin API, http://zeppelin-server:zeppelin-port/api/notebook, to list the notes and find the id related to the Model_Scoring.

    image.png

     

  6. Double-click the tRest component, to use the method to run a note, enter the URL, for example, http://zeppelin-server:zeppelin-port/api/notebook/job/NOTE_ID, then select POST for the HTTP Method.

    image.png

     

  7. Configure the HTTP Headers, by setting the name to "Content-type", and the value to "application/json".

    image.png

     

  8. Run the Job and confirm results.

    image.png

     

  9. Confirm your results in Hue, by navigating to /user/Hadoop/Ouput_Data/output.csv.

    image.png

     

Conclusion

You’ve learned how to integrate Zeppelin Notebooks with Talend, and you created a data science pipeline with Talend Jobs to train a Machine Learning model and to score new incoming data.

Version history
Revision #:
45 of 45
Last update:
‎02-25-2019 01:42 AM
Updated by: