How to integrate Talend Jobs containing dynamic queries with Cloudera Navigator and Talend Data Catalog

Introduction

Talend Jobs that are developed using context variable and dynamic SQL queries are not supported; therefore, Talend Data Catalog (TDC) is unable to harvest metadata and trace data lineage from a Talend dynamic integration Job using the Talend ETL bridge.

 

This article shows you how to work around these limitations in Talend Jobs that use resources from a Cloudera Cluster using Cloudera Navigator and Talend Data Catalog bridge.

 

Sources for the project are attached to this article.

 

Prerequisites

  • Cloudera Cluster CDH 5.10 and above

  • Cloudera Navigator 2.15.1 and above

  • MySQL server to store metadata table of the dynamic integration framework

  • Talend Big Data Platform 7.1.1 and above

  • Talend Data Catalog 7.1 Advanced (or Plus) Edition and above, with latest cumulative patches

 

Setting up Talend Studio

  1. Open Talend Studio and create a new project.

  2. In the Repository, expand Metadata, right-click Hadoop Cluster, then select Create Hadoop Cluster.

  3. Using the Hadoop Cluster Connection wizard, create a connection to your Cloudera Cluster, and make sure that you select the Use cloudera navigator check box.

    image.png

     

  4. Click the ellipsis to the right of Use Cloudera Navigator, then set up your connection to Cloudera Navigator, as shown below:

    image.png

    For more information on leveraging Cloudera Navigator in Talend Jobs, see the How to set up data lineage with Cloudera Navigator page of the Talend Big Data Studio User Guide available in the Talend Help Center.

     

Building the dynamic integration Job

This use case uses MySQL to store metadata such as table source/target, queries, and filters, then stores these values in context variables that are used to build integration Jobs at runtime. The dynamic Job reads data from source tables in Hive and writes data to target tables in Hive.

 

  1. Upload the metadata for the dynamic integration Job to the MySQL server (or any other DB of your choice), using the Metadata_Demo_forMySql.xlsx file attached to this article.

    image.png

     

  2. Upload the source data, located in the employees.csv and salaries.csv files attached in this article, to Hive.

  3. Create a standard Job, then add a tDBConnection component to connect to the metadata database. Note: The complete preparation Job, located in the prepare_load_dwh_Hive.zip file, is attached to this article.

  4. Replicate all of the fields in the metadata table by creating the following Context variables:

    image.png

     

  5. Add a tDBInput component.

  6. Connect the tDBConnection component to the tDBInput component using the OnSubjobOk trigger.

  7. Double-click the tDBInput component to open the Basic settings view. Click the [...] button next to the Table name text box, then select the table name where you've uploaded the metadata, in this case, meta_tables, apply the appropriate schema, and use the following query:

    "SELECT 
      `meta_tables`.`Job`, 
      `meta_tables`.`business_name`, 
      `meta_tables`.`db_in`, 
      `meta_tables`.`tabel_in`, 
      `meta_tables`.`db_out`, 
      `meta_tables`.`table_out`, 
      `meta_tables`.`select_args`, 
      `meta_tables`.`query`, 
      `meta_tables`.`conditions`, 
      `meta_tables`.`db_lookup`, 
      `meta_tables`.`table_lookup`
    FROM `meta_tables`
    WHERE  `meta_tables`.`business_name`='Agg'"

    Notice that the value for bussiness_name is hardcoded with Agg. Depending on the type of dynamic query you want to run, you could use a context variable so that at runtime, the Job uses the context variable value and filters the metadata table on the business_name (in this case, Agg or Dwh).

  8. Add a tMap component after the tDBInput component, then connect it using a Main row. The tMap component acts as a pass-through and creates output that contains all the input fields.

    image.png

    image.png

     

  9. Connect the tMap to a tFlowToIterate component, then create a key-value pair for each of the fields in the metadata table.

    image.png

    image.png

     

  10. Add a tRunJob component. Connect the tFlowToIterate component to the tRunJob component using Row > Iterate.

    image.png

     

  11. Set up the tRunJob component to transmit the whole context to the child Job for each iteration, as shown below:

    image.png

     

Building the data integration Job

In this section, you build a Job that is triggered by the tRunJob component from the previous Job.

 

Note: The complete integration Job, located in the load_dwh_Hive.zip file, is attached to this article.

 

  1. Create a new standard Job, then add a tPreJob component and a tHiveConnection component.
  2. Connect tPreJob to tHiveConnection using the OnComponentOK trigger.

    image.png

     

  3. Add a tHiveRow component below the tPreJob component.

    image.png

     

  4. Configure the tHiveRow component, as shown below:

    image.png

     

  5. Use the context parameter transmitted by the parent Job by entering the following query in the Query text box.

    "INSERT OVERWRITE TABLE  "+context.BB_W_db_out+"."+context.BB_W_table_out+" "+context.BB_W_query+" "

    The integration Job (child Job) runs as many times as the number of rows returned by the metadata table filtered by the context business_name in the parent Job.

  6. Run the Job.

  7. Open Cloudera Navigator, then search for Hive Jobs. Locate the Talend Job and review trace the data lineage.

    image.png

     

Configuring Talend Data Catalog

  1. Open Talend Data Catalog (TDC).

  2. Create a new configuration.

    image.png

     

Cloudera Hive bridge

  1. For the bridge to work, you'll need to the Cloudera JDBC connector for Hive and the JDBC driver of your Hive metastore (in this case, Postgres).

  2. Ensure both of the drivers are accessible by TDC server or an Agent.

  3. Create a new Physical Data Model to harvest the Hive metastore.

  4. On the Properties tab, select the Cloudera Enterprise Hadoop Hive Database.

    image.png

     

  5. On the Import Setup tab, in the User and Password fields, enter your Hive metastore credentials (typically set up during the cluster installation). If you are not able to retrieve it, use the StackExchange, connect to PostgreSQL server: FATAL: no pg_hba.conf entry for host tutorial for a Hive metastore with Postgres.

  6. Review your settings, when you're finished, the Import setup tab should look like this:

    image.png

    Click Import to start the metadata import.

  7. After a successful import, navigate to Data Catalog > Metadata Explorer > Metadata Browser, and locate the harvested metadata, in this case, employee_male (Table).

    image.png

     

Cloudera Navigator bridge

  1. Create a new model, then in Model Type, select Cloudera Navigator - New Beta Bridge.

    image.png

     

  2. On the Import Setup tab, fill in the Navigator URL*, Login, Password, then filter the Operations you want to harvest, in this case, hive.

    image.png

     

  3. After a successful import, you'll find metadata harvest for Navigator, the Hive (connection), and the dynamic Data Integration Job (Di Model).

    image.png

     

Stitching

  1. Make sure that your harvested models belong to your configuration, by dragging them to the configuration you set up earlier.

  2. On the Manage menu, select Manage Contents, click the Nav model, then select the Connection tab on the right.

    image.png

     

  3. Select the Connection Name, in this case, Hive.

  4. Click Edit, then, from the Database pull-down list, select Hive (click Edit Schemas if it wasn't done before), then click OK.

    image.png

     

  5. Click Build, then click the Diagram to see the connection between models.

    image.png

     

    image.png

     

  6. Trace the data lineage of the Talend Job containing dynamic queries executed against Hive.

    image.png

     

Conclusion

This article showed you how to handle the Talend Data Catalog Harvesting process of DI Jobs using context variables and dynamic queries on a Cloudera Cluster leveraging Navigator and Hive bridges to trace data lineage.