One Star

Split Kafka consumer flow into multiple files

I need to be able to periodically write data from a tKafkaInput component to a file, since there is no end to a Kafka stream I am unsure how to proceed.  I have some flexibility in how to set this up... I could split based on time, number of rows or size of transfer.
Thanks in advance. 
3 REPLIES
Employee

Re: Split Kafka consumer flow into multiple files

Hello!  I'm assuming that you are using the tKafkaInput component in DI mode in Talend Studio 6.0.1 (as opposed to Spark Streaming).
You are correct that there is no end to a Kafka stream.  For 6.0.1, the only stopping condition that we support in DI is "max time between messages" (also known as consumer timeout).  This is found in the Advanced Settings and is "-1" by default, meaning that the component streams forever.  By setting it to "1000", the component will stop when 1 second passes without any messages received.  This is somewhat useful, but still pretty limited.
The starting offset logic in 6.0.1 is a bit complicated.  The component takes a consumer group name, but if you specify "from beginning", the committed offsets for that group are reset and you will always process from the first messages in the topic.  If you specify "from latest", you will either start from the last committed offsets (if the consumer group exists) or skip existing messages and only process newly arrived messages (if the consumer group does not exist). There is no way for a single job to start from the beginning for a new consumer group, then use the latest committed offset on subsequent runs. 
If you want to periodically write data from a tKafkaInput to a file in 6.0.1, are you going to periodically run a job?  I suggest using one "bootstrap" job to set the consumer group offsets where you want them (either *from beginning* or *from latest*), and then periodically run another job with the same consumer group and *from latest* and a very small consumer timeout (1ms) to grab messages since the last offset commit.
The good news is that in the upcoming 6.1.0, we've largely fixed the limitations of tKafkaInput in DI (see https://jira.talendforge.org/browse/TBD-2639).  The starting offset logic has been refined to support more scenarios and we've added more stopping conditions for jobs, including max time from the component start, max time between messages and max number of messages.  The "consumer offset" in 6.1.0 is no longer directly set, but calculated based on the stopping condition(s).
I hope this helps, Ryan
One Star

Re: Split Kafka consumer flow into multiple files

I was using the tKafkaInput.
Thanks for the feedback, extremely helpful.  The scenario I am trying to address is trying to limit the number of files written (and size) written out to HDFS as an endpoint.  Ideally I'd have two conditions that could be met to trigger a write... Size of file and/or time elapsed from last write (in the scenario where writes have slowed down significantly).
I'll take a look at 6.1.0.
Thanks.
One Star

Re: Split Kafka consumer flow into multiple files

I need write data periodically from a tKafkaInput component to a HDFS file.i'm connecting tKafkaInput  with tHDFSOutput but am unable to write data into hdfs file,but i could see the change in the date if the file with current  date when i run the job and HDFS file is empty.
When I try to do the same with tLogRow instead of tHDFSOutput i could see the streaming data in Console.Pls help