Five Stars

Iterate all X rows and not for each row

hi,

 

I used talend for data extraction and insertion into OTSDB. But I need to cut my file, and a classic iteration take too much time (40 rows/s and I have 90 millions rows).

Do you know how to send for example 50 rows by 50 rows instead of each row individually?

 

Best regards,

 

terreprime.

  • Big Data
  • Data Integration
1 ACCEPTED SOLUTION

Accepted Solutions
Ten Stars

Re: Iterate all X rows and not for each row

OK, I have put together an example you will need to extrapolate from. It is quite simple. The layout for your job will be.....

 

tFileInputDelimited ---> tJavaFlex ---> tFileterRow ---> tFlowToIterate --->tRest

 

1) You read the file as normal with the tFileInputDelimited. 

2) The magic happens in the tJavaFlex. The code below shows what I did with my example. You will need to extrapolate from this to put in your JSON build (and combine) code....

Start Code

//Used to count the rows
int count = 0;
//Used to concatenate your Strings
String myConcatenatedVal = "";

Main Code

//Append 1 to each incoming row
count++;

//Concatenate your code (adjust this to concatenate your computed JSON Strings
myConcatenatedVal = myConcatenatedVal+row1.newColumn;

//A modulus operation to fire on every 50th row. It sets the output "newColumn" to the concatenated value, then resets the myConcatenatedVal and count variables.
if(count%50==0){
	row2.newColumn = myConcatenatedVal;
	myConcatenatedVal = "";
	count=0;
}else{
//The output "newColumn" column is set to null when not the 50th row
	row2.newColumn = null;
}

This code will build up your records and only output a value every 50th record. It will output a null value for every other row. To handle this null value (to filter it out), we use the tFilterRow. Use the Advanced Mode and then set the code to ....

input_row.newColumn!=null

Your tRest will now only run once for every 50 records. 

 

I hope that helps

 

Rilhia Solutions
16 REPLIES
Two Stars

Re: Iterate by group of 50 rows

 

Hi,
I have a similar problem, I want to treat a big CSV all the "x" rows.
I try to cut my file in several sub files, but isn't feasible for biggest files.
Thanks.

 

Nine Stars TRF
Nine Stars

Re: Iterate all X rows and not for each row

Hi,
Is your opentsdb installation tuned and the server dimensioned to delivery the expected throughput? Did you validate this point from outside of Talend? What if you replace the access to opentsdb by something else, mets say for example, by an output csv file?

TRF
Five Stars

Re: Iterate all X rows and not for each row

Hi,

 

Thanks for your answer.

 

Normally, yes, I think.

The principal problem is the iteration for each row in talend.

Maybe it's the treatment of my file because i need to create a JSON file for sending data in OTSDB and I have a lot of components.

 

I can screen and explain my job if you want.

 

I'm a beginner in talend and database so I don't know all the possible method in talend.

 

Terreprime

Nine Stars TRF
Nine Stars

Re: Iterate all X rows and not for each row

Usually Talend process very fast with files. Try to isolate the component you suspect to be slow.
Maybe share a picture of the job.

TRF
Five Stars

Re: Iterate all X rows and not for each row

Without the tHttpRequest talend execute 240 row/s so maybe it's OTSDB ..

 

This is my job:

Job_Talend.PNG

The first sub-job create the JSON for OTSDB and in the second, I return and modify the JSON in the flux because ttFileOutputJSON replace some characters like " by /" 

Nine Stars TRF
Nine Stars

Re: Iterate all X rows and not for each row

If I understand, the 2nd subjob is the part with a low throughput.

You may expect thousands rows / second when just reading a delimited file and write the content to another file, so you are fare away from a "standard" result.

Can you share the tJavaFlex_3 code, it may explain this result, depending of what happens in this component.

Also, explain the reason why you produce a JSON file but start the following subjob with a delimited file.

Is it the same you used for the 1st subjob?

In this case you may avoid to read the file one more time, adding a tReplicate after tFileInputDelimited_1.

 

For the initial question concerning low performances of OTSDB, can you give more details?

How do you proceed to push data to OTSDB?

What is the component you use for that?

 


TRF
Five Stars

Re: Iterate all X rows and not for each row

Hi,

 

To send data in OTSDB, i use the component tHttpRequest with post method, and the file for post method contains:

[
{"metric":"test_3","value":"299","timestamp":1493805152,"tags":{"Spec":"Matthieu"}}
]

 I need to create a file with exactly this format, but at the end of my first sub-job the file create is:

[
{"metric":"test_3","value":"299","timestamp":1493805152,"tags":"{\"Spec\":\"Matthieu\"}"}
]

The tags contains some wrong characters (the \ and " at the beginning and the end of the tags parameter).

So I use tFileInputDelimited_3 to return the JSON in the flux and use tJavaFlex_3 to replace those characters.

tJavaFlex_3:

row5.data=row5.data.replace("\\\"","\"");
row5.data=row5.data.replace("\"{","{");
row5.data=row5.data.replace("}\"","}");
row6.data=row5.data;

after I create a text file with the good format and the tHttpRequest_1 recover it.

Ten Stars

Re: Iterate all X rows and not for each row

I may not understand the full problem here, but I have some concerns over the job structure.

 

You are iterating over a file and producing a new JSON file for each row of the file. Why is this? Is this just to produce some JSON? If so, creating a new file for the JSON you have demonstrated is incredibly inefficient. Why not create the JSON (if it is as simple as shown) as a String in a flow rather than iterating?


From what I am seeing your job could look like below....

tFileInputDelimited --> tJavaFlex (to produce JSON String)---> tFlowToIterate -->tRest (to send the JSON)

The file reads the data and sends it to the tJavaFlex. You carry out the JSON creation there. The tFlowToIterate will cause the tRest to be fired for each row. The tRest's Http Body would be set to the value of the JSON string. Let's say the JSON String is set to column "myJSON" and the row connecting the tJavaFlex to the tFlowToIterate is called "row1", then you would put the following in the tRest's Http Body.....

((String)globalMap.get("row1.myJSON"))

I see you are having an issue with the JSON format. Given what you have shown, it could be created as follows....

String myJSON = "[ {\"metric\":\"test_3\",\"value\":\"299\",\"timestamp\":1493805152,\"tags\":{\"Spec\":\"Matthieu\"}}]";


Of course, you won't want to hard code the values, so if the values are coming in columns from your input file (as I assume), your code might look like this....

 

String myJSON = "[ {\"metric\":\""+row.column1+""\",\"value\":\""+row.column2+"\",\"timestamp\":"+row.column3+",\"tags\":{\""+row.column4+"\":\""+row.column5+"\"}}]";

This is code I have put together without testing, so there may be some bugs, but essentially you just need to be aware of quotes ("). If your String needs quotes, you need to escape them using a backslash (\). So if you want the following String exactly....

Hello "John" how are you?

You would create it.....

String myString = "Hello \"John\" how are you?";


I may have missed something a bit more complex (I can't see inside your component config) but I think the file creation and reading is probably where your code is slowing down. Calling the service is also going to slow things down a little. Is there any way to batch up your requests?

Rilhia Solutions
Five Stars

Re: Iterate all X rows and not for each row

Hi,

 

The problem is just sending data fast in OTSDB.

 

I tried your solution, but the tRest doesn't send data in OTSDB, I think it's a wrong parameter, this is my job and the tRest:

job_Talend_Opti.PNGtRest.PNG

Talend tells me they are no error, but OTSDB not received the data.

 

I will try to solve the problem tomorrow, but if you have an idea, I'm listening to you.

Ten Stars

Re: Iterate all X rows and not for each row

Can you show the JSON that was created using a System.out.println call?
Are you sure that the Rest service method should be PUT? I would have opted for POST if I wasn't given a direction here.

Rilhia Solutions
Five Stars

Re: Iterate all X rows and not for each row

Hi,

 

This is the JSON:

JSON.PNG

 

Actually, I changed the put method by post and it works !

But the treatment is really slow .... 20 rows/s.

 

job.PNG

How can I accelerate the treatment of file ?

 

Terreprime

Ten Stars

Re: Iterate all X rows and not for each row

A web service call is always going to be slow by comparison to loading to an on-premise database. What you need to do is figure out if you can batch up your calls. If you can, you then need to work out how many rows can be sent at a time. There is absolutely no way you will get it going faster than this unless you can send more than one row per web service call or send parallel service calls. I suspect you can send more than one row in each web service call.

Rilhia Solutions
Five Stars

Re: Iterate all X rows and not for each row

Yes, I know, this is the initial subject of this topic. I'll like sending data by group of 50 rows for example. But I don't know how to proceed.

Ten Stars

Re: Iterate all X rows and not for each row

OK, I have put together an example you will need to extrapolate from. It is quite simple. The layout for your job will be.....

 

tFileInputDelimited ---> tJavaFlex ---> tFileterRow ---> tFlowToIterate --->tRest

 

1) You read the file as normal with the tFileInputDelimited. 

2) The magic happens in the tJavaFlex. The code below shows what I did with my example. You will need to extrapolate from this to put in your JSON build (and combine) code....

Start Code

//Used to count the rows
int count = 0;
//Used to concatenate your Strings
String myConcatenatedVal = "";

Main Code

//Append 1 to each incoming row
count++;

//Concatenate your code (adjust this to concatenate your computed JSON Strings
myConcatenatedVal = myConcatenatedVal+row1.newColumn;

//A modulus operation to fire on every 50th row. It sets the output "newColumn" to the concatenated value, then resets the myConcatenatedVal and count variables.
if(count%50==0){
	row2.newColumn = myConcatenatedVal;
	myConcatenatedVal = "";
	count=0;
}else{
//The output "newColumn" column is set to null when not the 50th row
	row2.newColumn = null;
}

This code will build up your records and only output a value every 50th record. It will output a null value for every other row. To handle this null value (to filter it out), we use the tFilterRow. Use the Advanced Mode and then set the code to ....

input_row.newColumn!=null

Your tRest will now only run once for every 50 records. 

 

I hope that helps

 

Rilhia Solutions
Five Stars

Re: Iterate all X rows and not for each row

Your code is really great !

 

I adapt your code for OTSDB:

 

Start code:

//Used to count the rows
int count = 0;
//Used to concatenate your Strings
String myConcatenatedVal = "[";   

 Main code;

 

count++;
String myJSON = "{\"metric\":\""+context.metric+"\",\"value\":" + row1.myJSON+",\"timestamp\":"+context.timestamp+",\"tags\":{"+ "\"" + context.tags1 + "\"" +":"+"\"" + context.tags2+ "\"" + "}}";
myConcatenatedVal += myJSON;
context.timestamp++;

if(count%50==0){
	row2.myJSON=myConcatenatedVal + "]";
	myConcatenatedVal = "[";
	count = 0;
}else{
myConcatenatedVal += ",";
row2.myJSON= null;
}

Result:

 

Job_2.PNG

 

I am between 850 and 1000 row/s it's much better !

 

Thank you very much !

 

Best regards

 

Terreprime

Ten Stars

Re: Iterate all X rows and not for each row

No problem. You may be able to tweak this to get even better performance by adjusting the number of rows you are merging.

Rilhia Solutions