One Star

How to re-run tKafkaInput component in a job w/o restarting the job?

Hi,
Scenario: 
                                                                                                          replicate --> Flow1
LoadingConfigFile --> tKafkaConnection ---> tKafkaInput --> globalsetvar --> replicate --> FLow2
                                                                                                          replicate --> Flow 3
Problem: Once all available messages are read using tKafkaInput and all the read messages are processed in Flow1-2-3, I would like to again read new message using tKafkaInput without actually re-starting the job or without again establishing new connection to Kafka. So, i want some control which will again go back to tKafkaInput after Flow1-Flow2-Flow3 completes.  I would like to set property of tKafkaInput "Stop after a maximum total duration (ms)" = 5000ms. This will ensure that kafkainput reads messages from kakfka topic for 5 secs and then stops and further process those messages in different flows. Then again starts and reads messages for ext 5 sec without actually establishing a new connection (as i will be using shared kafka connection).
Please suggest, how can i read new messages again and again using tKafkaInput without actually rerunning the  job?
Thanks for your help and time!
Rera
5 REPLIES
Community Manager

Re: How to re-run tKafkaInput component in a job w/o restarting the job?

Hi  
You can use a tInfiniteLoop  to rerun tKafkaInput many times unitl it reach the maximum total duration, the job looks like:
LoadingConfigFile
    |
onsubjobok
   |
tKafaConneciton
  |
onsubjobok
 |
tInfiniteLoop--iterate--tKafkaInput--.....flow....
                                      -runIf--tDie
On tInfiniteLoop, set the value of wait at each iteration, it is 1000 (ms) by default. 
Set the condition of runIf as:
((Integer)globalMap.get("tInfiniteLoop_1_CURRENT_ITERATION"))==5
//In this case, read the new message every 1000 ms,  stop the job once it reaches the maximum duration 5000 ms. 
----------------------------------------------------------
Talend | Data Agility for Modern Business
One Star

Re: How to re-run tKafkaInput component in a job w/o restarting the job?

Thank you, Chong for replying. Suggeted approach looks right for my scenario, so I am going to try to implement this.
Thanks again for your help and time!
Community Manager

Re: How to re-run tKafkaInput component in a job w/o restarting the job?

Please give us your feedback after your testing.
----------------------------------------------------------
Talend | Data Agility for Modern Business
One Star

Re: How to re-run tKafkaInput component in a job w/o restarting the job?

Hi Chong,
I am not able to connect tInfiniteLoop iterate to tKafkaInput component. Am i missing anything? 
Thanks,
Rera
Community Manager

Re: How to re-run tKafkaInput component in a job w/o restarting the job?

Hi Rera 
There was a wrong in my previous job, tKafKaInput component is not allowed to link with a iterate connector. You can move tKafraInput into a child job and use tRunJob to call it, but the child job will create a new connection with this way, because tKafkaConnection doesn't support shared connection like the general DB connection component such as tMysqlConnection. 
----------------------------------------------------------
Talend | Data Agility for Modern Business