Six Stars

[CDC]Insert only data that has changed since last run (Tos_DI)

I would like to insert in my database only the new data. So I used incremental loading by comparing my source (set of files) and my target (sql server table) with inner join but since the number of rows inserted in the database is huge this solution is not feasible.
So I thought of doing the CDC by date comparison (last date of run and my current date)
Unfortunately I don't know how to do it.
Someone can help me please !

 

  • Data Integration
1 ACCEPTED SOLUTION

Accepted Solutions
Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

It could be because - tMSSQLInput not run before tMap

In Your case - this process - independent ... somewhere in parallel world 

 

You can do like:

Screen Shot 2017-05-28 at 11.40.22 PM.png

 

it just example, but be careful about order - first read value, than use it

-----------
27 REPLIES
Nine Stars TRF
Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

Hi,

Think (to be confirmed) CDC is available only for Enterprise version, not Community.

However, what do you mean by "huge", billions rows?

These links may help:

https://www.talend.com/blog/2017/01/05/talend-job-design-patterns-best-practices-part-4/

http://bekwam.blogspot.fr/2011/07/complex-joins-out-of-tmap-in-talend.html

 

Also this topic https://community.talend.com/t5/Design-and-Development/tMap-compare-integer-before-UPDATE-in-express...


TRF
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

Thanks for your replay.

I have 80,000,000 million line in my table so when I made inner join on tMsSqlInput to capture the new line I no longer have memory (It consumes almost 90% of physical memory) and my job is crashing ! 

Nine Stars TRF
Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

The solution described here on the secttion "tMap Lookups" may helps.

It gives you the key to not have to load all 80,000,000 records at a time.


TRF
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

I applied this solution but it takes a lot of time because it looks and compares each file line by the result of my query in the composant tMsSQLInput ! 

For this reason I want to change this solution and use the comparison by timestamp! 

Nine Stars TRF
Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

Can you share your job?


TRF
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

Hi,

Here is an example of my job.
Usually tMySqlInput allows me to read the rows of the database to do lookup but as they are many lines (80000000 lines) the job crashes. So I need another soltuion for incremental loading ! 

 

image.PNG

Nine Stars TRF
Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

You should refer yourself to this discussion.

The idea is to split the large table to smallest pieces then, using a master job, to run the chld job as many times as necessary depending on the number of files generated when the table content is splited.

For example, if each file contains 10,000,000 records, you need to process 8 times.


TRF
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

The solution that you  recommended to me makes it possible to run the job in Thread but for me i need a different solution than the internal join and lookup, which allows me to insert only the new lines in my database ! 

 

Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)


INESBK wrote:


So I thought of doing the CDC by date comparison (last date of run and my current date)
Unfortunately I don't know how to do it.
Someone can help me please !

 


why in this case not read MAX date from database and than filter new data with date bigger than?

it could be also ID, or other incremental properties.

if You not store timestamp in database, but use it for lookup - You can store in additional table - last_insert, than read and use

 

in case when Lookup more complicate - try use insert ignore or insert/update

would be good if You little more explain Your Job logic

-----------
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

In fact this is what I search, in my base I have a dimestamp so I want to compare max date with the new incoming date in the files...
This comparison is going to be in the query of tmssqlinput or in the tmap filter ?

And should I delete the inner join?

 

Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

I do this in 2 steps:

 

step 1:

Input from database, something like this (AWS Redshift):

"SELECT dateadd(day, 1, date(Max(\"whendt\"))) as last_date from "+context.AWS_RS_schema+"."+"tbldatatransactions"
or

"SELECT NVL(max(TransactionID), 0)  as maxid  FROM "+context.AWS_RS_schema+".tbltransactions"

than tFlowToIterate for save this value to variable

 

than on next steps - use variable for:

- filter in SQL queries in Input components

"SELECT 
     something
FROM `tblDataTransactions`
WHERE `When` >= " + "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", (Date)globalMap.get("v_last_date")) + "'"  + " AND `When` < "  
+ "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", TalendDate.getCurrentDate()) + "'" 

- tFilter or tMap if source files csv or other format

 

 Screen Shot 2017-05-28 at 2.52.08 AM.png

-----------
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

I tried to apply your method. 

Here is a screenshot a.PNG

So in tMSSqlInput_1 i made this query "SELECT datediff(day, Max(Trends2.TS), getdate()) as last_load FROM Trends2"

than tFlowToIterate for save this value to variable "v_last_date"

After that in tMSSqlInput_2 i made this query :

"SELECT
Trends2.chrono,Trends2.name,Trends2.value,Trends2.quality
FROM Trends2
WHERE S.TS >= " + "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", (Date)globalMap.get("v_last_date")) + "'" + " AND S.TS < "
+ "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", TalendDate.getCurrentDate()) + "'"

S.TS it's a timestamp retrive from tmap_2.

But when i clik Guess schema i have this error :

b.PNG

Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

of course, schema guess - will not work with null value, and on this moment it null

if what Guess schema - change query for this period

 

but (!!!) - you do something wrong completely 

 

First:

"SELECT datediff(day, Max(Trends2.TS), getdate()) as last_load FROM Trends2"

return INT

 

but than You put INT into DateFormat

 

Second:

In my example this variable used for filter Main Input - in Your case Lecture_DBF

in my example - I take MAX from AWS Redshift and use it for filter data from MySQL, You try to use it on same table

if You want use it for filter lookup data, You must change logic - no reason request MAX value from lookup file and than try to use it for filter same lookup table - it always will return NULL result

 

"SELECT datediff(day, Max(Trends2.TS), getdate()) as last_load FROM Trends2"

"SELECT
Trends2.chrono,Trends2.name,Trends2.value,Trends2.quality
FROM Trends2
WHERE S.TS >= " + "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", (Date)globalMap.get("v_last_date")) + "'" + " AND S.TS < "
+ "'" + TalendDate.formatDate("yyyy-MM-dd 00:00:00", TalendDate.getCurrentDate()) + "'"


Double check - what You request and what want to filter

 

-----------
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

For fisrt : 

when i write this query "SELECT datediff(day, date(Max(Trends2.TS)), date(getdate())) as last_load FROM Trends2" 

i get ths error : 

c.PNG 

For this reason i change my query to :"SELECT datediff(day, Max(Trends2.TS), getdate()) as last_load FROM Trends2"

Second:

I can't do the filter in Lecture_DBF because this component use to execute script python in order te read file.dbf.

For this reason i uses S.TS from tmap to retrive date came from file.

 

My job allows me to read files.dbf to insert them into a sql server table.
Every time I have new values to insert in the database (since this is data from sensors) so I need to insert only the new lines.
I use the inner join in the beginning but it doesn't work because problem of memory so I have to use the comparison with dates.

The idea is to compare date_max in the table(t_MSSqlInput) with the date_in_file
If date_in_file> date_in_base I insert the data, else I ignore it. 

 

Excuse me if I misunderstood you...

 

Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

let go step by step

1) to close SQL problem, not depending from form

"SELECT datediff(day, date(Max(Trends2.TS)), date(getdate())) as last_load FROM Trends2" or "SELECT datediff(day, Max(Trends2.TS), getdate()) as last_load FROM Trends2"

 

by this SQL query You are request difference in days - INTEGER value, like 5 days

so this is can not help You in feature filters!!!

 

 

2) even if You request just SELECT date(Max(Trends2.TS)) - it return You MAX date already in database, so - what reason use this value for SELECT from same table - it will empty result always

 

3) for proper resolve the problem need understand what You try to do:

- is it constant flow from sensors? in this case why You worry about date? new data always newer 

- is it batch process? When You parse logs and want write only new? in this case - why not sort this files by date and than just filter all newer than requested MAX(date)?

- if it sensors - it normally mean, not only date is unique parameter, but as well sensor ID as well?

- we know size of You table - what about new data? what size (number of new rows) per iteration?

-----------
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

Thanks for your replay.

1)The flux is not constant from the sensors. The number of files that represents the sensors is constant (about 3000 file) but each file contains a different number of rows.

I agree with you that the new data always newer but in case of problems or an interruption of the execution (since I will launch the job every night) I need incremental loading to insert only the new lines.

2)Sorry but i didn't understand your question about batch process ! 

3)Id of the sensor is composed of 4 fields: value of the sensor, quality, name, chrono (number of milliseconds since 1970)

4)The number of lines differs from one file to another (it can be 2000 as 30 0000)

And since I will run the job every night to insert new values of sensor, the table size will increase ! 

I hope I was a little clearer.

Thanks !

Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

Look, this problem not related to Talend, not to databases, just logic and process organisation 

 

Example:

You have 3000 files from different sensors

You can have many file for 1 sensors, or You can have many sensors inside single file. This is variant for logic

 

Variant 1 - many file for same server, but inside single file - only 1 sensor present

in this case You just need:

1) request MAX loading time from database for this sensor

2) process files related to this sensor 1 by 1 starting from oldest and filter all data where date bigger than saved from database

if time not distributed by files - You must first merge all files for this sensor, and do the same after

 

Variant 2 - many sensors in same file

1) select file, again oldest - first

2) check - what sensors inside - tUnique by sensor name

3) request from database MAX date for each sensor inside this file

4) filter data by sensor name and date

in case of variant 2 You can:

- select all sensors from file

- run query like SELECT MAX(date) as last_date, sensor_name FROM table. It will return 1 row per sensor. You can save it into tHashInput

- than tMap with main flow from file and lookup from tHashOutput with filter on sensor name and date bigger than saved

 

as You can see in both variant - You are filter input flow, but not database

in both variant - You must organise processing of files sorted by date, or first merge files and sort them by date, than filter

I use both of this variants in real life, so could confirm - all work as expected, but always must be adjusted for real structure

 

 

 

-----------
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

For me is variant 1: Each file represents a sensor ! 

If you can explain to me a little more the logic of the first proposition:
Does this filter have to be in tmap and connected by a tmssqlinput to get max (date) from the table?

Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

keep in mind - I do not have Your data, Your structure and etc, so I more or less try adopt my experience ... but I do this blind

 

if You want have full answer - always public all information

 

Example:

sensor1_20170528_1300-1400.csv

sensor1_20170528_1400-1500.csv

sensor1_20170528_1500-1600.csv

etc

 

so we know:

- sensor1 - it part of file name

- we have 24 file per day

- file name include date time pattern

- sensors could be new, not all already in database (if not - we change logic)

 

We do:

1) tFileList - prepare list of files, extract name of sensors from filename or from first row inside each file, make list unique

Iterate over Sensor names array:

2) tMSSQLRow - SELECT COALESCE(MAX(date), yesterday) as last_date from table WHERE sensor_name = variable_sensor_name, save it to variable

3) tFileList with pattern:

variable_sensor_name+pattern_for_yesterday+.csv

- pattern could be different - even for past Hour or etc.

We are sort files by time - we can guess - earliest filename pattern have easiest create time (if not - we change algorithm)

 

4) tFileInputDelimited -> tFilter (or tMap) where we use last_date as filter condition -> tMSSQLOutput

alternative for 4) tMSSQLInput with SELECT sensor_name, date FROM table WHERE sensor_name= variable_sensor_name AND date > yesterday 

and use it as Lookup as Your original Job for INNER JOIN rejects

 

This is only 1 from many variants:

- You can already have all sensor names in database

- You can have not time patterns in filename

- You can have more than 1 message for same server with same time (for this case good to have (sensor_name, date) as primary or unique key)

- etc

 

For any of combination - we draw process diagram and look, how we can achieve result, 

 

 

 

 

-----------
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

My structure is like this :

I have a folder and subfolder structure where each subfolder is located file.dbf( the name represents the sensor).

This file contains thousands of lines that represent the values of this sensor at some point.

Here is an example of a sensor name : "c:\folder\subfolder\128.dbf" 

My need forces me to change the structure of this name in "folder.subfolder.MES".

So the form of the input file name (in tfilelist) is different from the form of file name that in database.

For this reason I used tfilelist to browse all the files, pass each filename to tSystem to read it and change the name structure with python script. Then make other transformations with tmap and finally insert in the table.

 

So , If I understood correctly, we must follow alternative 4

 

Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)


INESBK wrote:

 

So , If I understood correctly, we must follow alternative 4

 


If You ask my opinion - no, You do not need exactly go by this way :-), as well as You do not need Python for change name structure

But You can go any way of course

 

 

-----------
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

No I have to use python to read the structure of the file.dbf and add a column that contains the name of file with this form.

 

[If You ask my opinion - no, You do not need exactly go by this way :-)]

And if I am not use the alternative 4 then wich soltuion can solve my problem?

Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

 

If You want full solution - You must provide full information

There above - a lot of working ideas (from really working processes)

 

but I do not need guess - how it will work in Your case, if You not provide:

- what structure - full description! what we have in files, what we have after python script, how data organised (sorted, unsorted and etc)

- what structure of database - columns, indexes and etc

- what exclusions possible 

 

based on already presented information, You are just need:

1) request from SQL server - last time for selected sensor (taken by Python from file name)

2) read all files for this sensor and filter input data using tFilter or tMap by time column where value bigger than taken from database

 

that all

It is like a Google - as much more correct question, as much more relevant search result

You can request - "Green" or "Green Pub London", results will be little different :-)

-----------
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

Thank you very much for your advice and your time.

I try this 

fi.PNGf2.PNG

I got this error :

ere.PNG

Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

It could be because - tMSSQLInput not run before tMap

In Your case - this process - independent ... somewhere in parallel world 

 

You can do like:

Screen Shot 2017-05-28 at 11.40.22 PM.png

 

it just example, but be careful about order - first read value, than use it

-----------
Six Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

Ah finally, thank you another time and sorry if I have not explained well, I am a beginner with talend.

 

Thanks Smiley Happy 

Nine Stars

Re: [CDC]Insert only data that has changed since last run (Tos_DI)

Welcome to community! :-)

-----------