Five Stars

Shared DB Connection in JobLets

Hi Talend Community,

I've got one simple question. Our job definitions are based on joblets in order to make some logic reusable. Some of the joblets are doing extractions from a database. Since the parent job may initiate a database connection on its own all the included jobs should use the connection instead to create a new one --> We want to overcome some performance issues.

 

That's the theory!

 

Here is one of these db extraction joblets:

 

joblet_1.PNG

Its very straight! You get a parameter (a "WHERE" condition) use the shared connection in the sql_e_skp_firma and execute it in order to provide the fetched output. Easy! 

One of the main unclear things is, where I can place properly tOracleConnection_1 (db_src_avsc_repo)? Right now I am not sure how to connect it within the joblet correctly. In a normal job (the parent Job) I'm using the tPreJob to establish the connection I need as shared connection. 

 

job_1.PNG

Is there something analogous to the tPreJob in context of joblets?

 

Thanks and best regards from Germany

Adam L.

 

 

 

 

 

 

 

1 ACCEPTED SOLUTION

Accepted Solutions
Five Stars

Re: Shared DB Connection in JobLets

For those who else use such an design I've got an solution (thanks to talend support):

 

  1. Delete in dhe Joblet the "tOracleConnection" (the tOracleInput will start to show erros).
  2. In the tOracleInput jump in the "Dynamic settings" section
  3. Add a new entry: Name: Component List and Value tOracleConnection_1 (Equals the name of your connection created in the parent job).

That's it. Even the tOracleInput is still decorated with an error you the Job does it as it should. I could confirm it with the trace.

 

Thanks

7 REPLIES
Five Stars

Re: Shared DB Connection in JobLets

Include a connection component in the joblet,  just as you have done.

In both the connection component in the parent, and in the joblet, check the "register a shared db connection" box

In the parent  job, and assign a value , like "CONN1" , "CONN2", etc, for each shared name.

In the parent job, create a context variable context.SharedConnectionName. 

In the joblet, use context.SharedConnectionName for the shared name.  You do not have to setup a context in the joblet, as it inherits contexts automatically from the job that calls it.

Before the joblet is called, set context.SharedConnectionName to the appropriate value in the parent job.

When the joblet runs, ir will be "synched up" with the parent to use the context.SharedConnectionName shared connection.

Five Stars

Re: Shared DB Connection in JobLets

Hi talend_guru,

 

I tried out your suggested recipe. Before I start some screenshots:

 

jira_joblet.PNG

 

job.PNG

 

The tOracleConnection_1 in the joblet uses the "context.shared_connection" value. The tOracleInput1 references the tOracleConnection_1.

 

The logs bring some interessting entries:

 

The tOracleConnection item in the Joblet will be initiated to late (after the call of the tInputOracle) component.

That seems to be the problem: the order in which the comonents will be initiated. 

 

Starting job job_connection at 16:44 23/10/2017.

[INFO ]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - TalendJob: 'job_connection' - Start.
[statistics] connecting to socket on port 3926
[statistics] connected
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tOracleConnection_1 - Start to work.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tOracleConnection_1 - Parameters:CONNECTION_TYPE = ORACLE_SERVICE_NAME | DB_VERSION = ORACLE_12 | USE_TNS_FILE = false | HOST = context.db_src_server | PORT = context.db_src_port | DBNAME = context.db_src_service_name | SCHEMA_DB = context.db_src_schema | USER = context.db_src_login | PASS = 2491... | PROPERTIES = context.db_src_additional_params | USE_SHARED_CONNECTION = true | SHARED_CONNECTION_NAME = context.shared_connection | AUTO_COMMIT = false | 
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tOracleConnection_1 - SharedDBConnection, current shared connections list is:
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tOracleConnection_1 - SharedDBConnection, can't find the key:shared_connection so create a new one and share it.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tOracleConnection_1 - Driver ClassName: oracle.jdbc.OracleDriver.
[INFO ]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tOracleConnection_1 - SharedDBConnection, Connection attempt to 'jdbc:oracle:thin:@(description=(address=(protocol=tcp)(host=avscrepo.veolink.net)(port=1521))(connect_data=(service_name=avscrepo)))' with the username 'REPO_BI'.
[INFO ]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tOracleConnection_1 - SharedDBConnection, Connection to 'jdbc:oracle:thin:@(description=(address=(protocol=tcp)(host=avscrepo.veolink.net)(port=1521))(connect_data=(service_name=avscrepo)))' has succeeded.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tOracleConnection_1 - Shared Connection with key 'shared_connection'
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tOracleConnection_1 - Connection is set auto commit to 'false'.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tOracleConnection_1 - Done.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tFlowToIterate_1 - Start to work.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tFlowToIterate_1 - Parameters:DEFAULT_MAP = true | 
[TRACE]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - row1 - |||||||
[TRACE]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tFlowToIterate_1 - Set global var, key=row1.recid, value=.
[TRACE]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tFlowToIterate_1 - Set global var, key=row1.firma, value=.
[TRACE]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tFlowToIterate_1 - Set global var, key=row1.key_1, value=.
[TRACE]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tFlowToIterate_1 - Set global var, key=row1.key_2, value=.
[TRACE]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tFlowToIterate_1 - Set global var, key=row1.key_3, value=.
[TRACE]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tFlowToIterate_1 - Set global var, key=row1.key_4, value=.
[TRACE]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tFlowToIterate_1 - Set global var, key=row1.key_5, value=.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tFlowToIterate_1 - Current iteration is: 1.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tLogRow_1 - Start to work.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - tLogRow_1 - Parameters:BASIC_MODE = true | TABLE_PRINT = false | VERTICAL = false | FIELDSEPARATOR = "|" | PRINT_HEADER = false | PRINT_UNIQUE_NAME = false | PRINT_COLNAMES = false | USE_FIXED_LENGTH = false | PRINT_CONTENT_WITH_LOG4J = true | 
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tOracleInput_1 - Start to work.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tOracleInput_1 - Parameters:USE_EXISTING_CONNECTION = true | CONNECTION = joblet_connection_1_tOracleConnection_1 | TABLE = "SKP_FIRMA" | QUERYSTORE = "" | QUERY = "SELECT a.recid    ,    a.created_date    ,    a.created_user    ,    a.changed_date    ,    a.changed_user    ,    a.origin_system    ,    a.origin_system_id    ,    a.firma    ,    a.firmaname    ,    a.art    ,    a.instanz    ,    a.ofb_firma    ,    a.ofb_filiale    ,    a.ofb_firma2    ,    a.ofb_filiale2    ,    a.zugriff    ,    a.instanz_exp    ,    a.acs_kz    ,    a.region    ,    a.konzern    ,    a.sc    ,    a.edi_kz    ,    a.firma_druck_kz    ,    a.telefon    ,    a.zentral_vertrieb    ,    a.werk    ,    a.sap_bk    ,    a.ame_export    ,    a.nsuite_mandant    ,    a.z_bef_nr    ,    a.nsuite_ext_id    ,    a.waa_job_nr    ,    a.waa_abgleich_aktiv    ,    a.waa_intervall    ,    a.waa_db_link    ,    a.waa_bereinigung_aktiv    ,    a.eanv    ,    a.zoll    ,    a.frachtzahler    ,    a.fra_artikel    ,    a.pro_artikel    ,    a.sonst_artikel    ,    a.mit_ek_bestaetigung    ,    a.crm_export    ,    a.esn    ,    a.artikel_stamm_neu    ,    a.versandberechnung    ,    a.ktr    ,    a.fir_bukkey    ,    a.fir_adrkey    ,    a.portal    ,    a.telematik    ,    a.portal_start_date    ,    a.autom_sub_zuordnung    ,    a.telematik_vorlauf    ,    a.telematik_modus    ,    a.waa_stammdaten_aktiv    ,    a.waa_stammdaten_zeitpunkt    ,    a.waa_hostname    ,    a.waa_modulowert  FROM skp_firma a   where a.recid = nvl('" + ((String)globalMap.get("param.recid")) + "', a.recid)     AND  a.firma = nvl('" +  ((String)globalMap.get("param.firma")) + "', a.firma)" | IS_CONVERT_XMLTYPE = false | USE_CURSOR = false | TRIM_ALL_COLUMN = false | TRIM_COLUMN = [{TRIM=false, SCHEMA_COLUMN=RECID}, {TRIM=false, SCHEMA_COLUMN=CREATED_DATE}, {TRIM=false, SCHEMA_COLUMN=CREATED_USER}, {TRIM=false, SCHEMA_COLUMN=CHANGED_DATE}, {TRIM=false, SCHEMA_COLUMN=CHANGED_USER}, {TRIM=false, SCHEMA_COLUMN=ORIGIN_SYSTEM}, {TRIM=false, SCHEMA_COLUMN=ORIGIN_SYSTEM_ID}, {TRIM=false, SCHEMA_COLUMN=FIRMA}, {TRIM=false, SCHEMA_COLUMN=FIRMANAME}, {TRIM=false, SCHEMA_COLUMN=ART}, {TRIM=false, SCHEMA_COLUMN=INSTANZ}, {TRIM=false, SCHEMA_COLUMN=OFB_FIRMA}, {TRIM=false, SCHEMA_COLUMN=OFB_FILIALE}, {TRIM=false, SCHEMA_COLUMN=OFB_FIRMA2}, {TRIM=false, SCHEMA_COLUMN=OFB_FILIALE2}, {TRIM=false, SCHEMA_COLUMN=ZUGRIFF}, {TRIM=false, SCHEMA_COLUMN=INSTANZ_EXP}, {TRIM=false, SCHEMA_COLUMN=ACS_KZ}, {TRIM=false, SCHEMA_COLUMN=REGION}, {TRIM=false, SCHEMA_COLUMN=KONZERN}, {TRIM=false, SCHEMA_COLUMN=SC}, {TRIM=false, SCHEMA_COLUMN=EDI_KZ}, {TRIM=false, SCHEMA_COLUMN=FIRMA_DRUCK_KZ}, {TRIM=false, SCHEMA_COLUMN=TELEFON}, {TRIM=false, SCHEMA_COLUMN=ZENTRAL_VERTRIEB}, {TRIM=false, SCHEMA_COLUMN=WERK}, {TRIM=false, SCHEMA_COLUMN=SAP_BK}, {TRIM=false, SCHEMA_COLUMN=AME_EXPORT}, {TRIM=false, SCHEMA_COLUMN=NSUITE_MANDANT}, {TRIM=false, SCHEMA_COLUMN=Z_BEF_NR}, {TRIM=false, SCHEMA_COLUMN=NSUITE_EXT_ID}, {TRIM=false, SCHEMA_COLUMN=WAA_JOB_NR}, {TRIM=false, SCHEMA_COLUMN=WAA_ABGLEICH_AKTIV}, {TRIM=false, SCHEMA_COLUMN=WAA_INTERVALL}, {TRIM=false, SCHEMA_COLUMN=WAA_DB_LINK}, {TRIM=false, SCHEMA_COLUMN=WAA_BEREINIGUNG_AKTIV}, {TRIM=false, SCHEMA_COLUMN=EANV}, {TRIM=false, SCHEMA_COLUMN=ZOLL}, {TRIM=false, SCHEMA_COLUMN=FRACHTZAHLER}, {TRIM=false, SCHEMA_COLUMN=FRA_ARTIKEL}, {TRIM=false, SCHEMA_COLUMN=PRO_ARTIKEL}, {TRIM=false, SCHEMA_COLUMN=SONST_ARTIKEL}, {TRIM=false, SCHEMA_COLUMN=MIT_EK_BESTAETIGUNG}, {TRIM=false, SCHEMA_COLUMN=CRM_EXPORT}, {TRIM=false, SCHEMA_COLUMN=ESN}, {TRIM=false, SCHEMA_COLUMN=ARTIKEL_STAMM_NEU}, {TRIM=false, SCHEMA_COLUMN=VERSANDBERECHNUNG}, {TRIM=false, SCHEMA_COLUMN=KTR}, {TRIM=false, SCHEMA_COLUMN=FIR_BUKKEY}, {TRIM=false, SCHEMA_COLUMN=FIR_ADRKEY}, {TRIM=false, SCHEMA_COLUMN=PORTAL}, {TRIM=false, SCHEMA_COLUMN=TELEMATIK}, {TRIM=false, SCHEMA_COLUMN=PORTAL_START_DATE}, {TRIM=false, SCHEMA_COLUMN=AUTOM_SUB_ZUORDNUNG}, {TRIM=false, SCHEMA_COLUMN=TELEMATIK_VORLAUF}, {TRIM=false, SCHEMA_COLUMN=TELEMATIK_MODUS}, {TRIM=false, SCHEMA_COLUMN=WAA_STAMMDATEN_AKTIV}, {TRIM=false, SCHEMA_COLUMN=WAA_STAMMDATEN_ZEITPUNKT}, {TRIM=false, SCHEMA_COLUMN=WAA_HOSTNAME}, {TRIM=false, SCHEMA_COLUMN=WAA_MODULOWERT}] | NO_NULL_VALUES = false | 
Exception in component joblet_connection_1_tOracleInput_1 (job_connection)
java.lang.NullPointerException
	at de_veolia_dwh_etl_esb.job_connection_0_1.job_connection.tFixedFlowInput_1Process(job_connection.java:2943)
	at de_veolia_dwh_etl_esb.job_connection_0_1.job_connection.runJobInTOS(job_connection.java:5345)
	at de_veolia_dwh_etl_esb.job_connection_0_1.job_connection.main(job_connection.java:4993)
[FATAL]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tOracleInput_1 null
java.lang.NullPointerException
	at de_veolia_dwh_etl_esb.job_connection_0_1.job_connection.tFixedFlowInput_1Process(job_connection.java:2943)
	at de_veolia_dwh_etl_esb.job_connection_0_1.job_connection.runJobInTOS(job_connection.java:5345)
	at de_veolia_dwh_etl_esb.job_connection_0_1.job_connection.main(job_connection.java:4993)
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tOracleConnection_1 - Start to work.[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tOracleConnection_1 - Parameters:CONNECTION_TYPE = ORACLE_SERVICE_NAME | DB_VERSION = ORACLE_12 | USE_TNS_FILE = false | HOST = context.db_src_server | PORT = context.db_src_port | DBNAME = context.db_src_service_name | SCHEMA_DB = context.db_src_schema | USER = context.db_src_login | PASS = 2491... | PROPERTIES = context.db_src_additional_params | USE_SHARED_CONNECTION = true | SHARED_CONNECTION_NAME = context.shared_connection | AUTO_COMMIT = false | 
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tOracleConnection_1 - SharedDBConnection, current shared connections list is: shared_connection
[INFO ]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tOracleConnection_1 - SharedDBConnection, find the key: shared_connection it is OK.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tOracleConnection_1 - Connection is set auto commit to 'false'.
[DEBUG]: de_veolia_dwh_etl_esb.job_connection_0_1.job_connection - joblet_connection_1_tOracleConnection_1 - Done.
[statistics] disconnected
Job job_connection ended at 16:44 23/10/2017. [exit code=1]

But how can I configure the joblet to initate the tOracleConnection just before it uses the tInputOracle component? 

 

Thanks

Five Stars

Re: Shared DB Connection in JobLets

You don't need tOracleConnection again. You can directly get the shared connection in tOracleInput.

Five Stars

Re: Shared DB Connection in JobLets

Hi talend_guru,

don't think that I can do it. The tOracleInput (with Use an existing connection checked) demands a selected Connection component in the Component List. I highlighted the mentioned property in the screenshot.

 

joblet_db_connect.PNG

 

Any ohter suggestions Smiley Wink?

Five Stars

Re: Shared DB Connection in JobLets

Use OnSubjobok on tOracleConnection component and connect to tFlowtoIterate component. That should take care of it.

Five Stars

Re: Shared DB Connection in JobLets

You are neither able to connect the tOracleConnection to the tFlowToIterate nor any other components in the joblet. Sorry for the lag.

 

Any other suggestions?

 

Five Stars

Re: Shared DB Connection in JobLets

For those who else use such an design I've got an solution (thanks to talend support):

 

  1. Delete in dhe Joblet the "tOracleConnection" (the tOracleInput will start to show erros).
  2. In the tOracleInput jump in the "Dynamic settings" section
  3. Add a new entry: Name: Component List and Value tOracleConnection_1 (Equals the name of your connection created in the parent job).

That's it. Even the tOracleInput is still decorated with an error you the Job does it as it should. I could confirm it with the trace.

 

Thanks