Windowing question

Sixteen Stars

Windowing question

I have played around with Apache Beam before I started with Data Streams and one thing that confuses me with Apache Beam are the Windows. I have a pipeline where I slice and dice some Twitter data. Essentially I am combining hashtags from multiple Tweets within a Window, then adding them to a dict and counting the number of times a particular hashtag appears. To do this I am using a couple of Aggregate components, Python components, Filters and am outputting to S3. The problem I am having is that I need to put a Window component before each Aggregate component and the Data Target component. Logically 1 Window should suffice at the beginning. But if I do not place a Window before each Aggregate, I get the following error.....

 

 

[2018-08-07 22:15:18,679] [ERROR] o.t.d.s.FullRunJob$ - DATA_STREAMS_ERROR_MSG:Could not instantiate runtime for component Aggregate (id=5b5ba2b5ca8cf5750972a5d4)
84[2018-08-07 22:15:18,680] [ERROR] o.t.d.s.FullRunJob$ - Got original Throwable
85org.talend.datastreams.beam.compiler.exception.BeamCompilerException: Could not instantiate runtime for component Aggregate (id=5b5ba2b5ca8cf5750972a5d4)
86 at org.talend.datastreams.beam.compiler.BeamCompiler.compileComponent(BeamCompiler.java:103)
87 at org.talend.datastreams.beam.compiler.runtimeflow.RuntimeFlowBeamCompiler.lambda$compile$0(RuntimeFlowBeamCompiler.java:82)
88 at java.util.Iterator.forEachRemaining(Iterator.java:116)
89 at org.talend.datastreams.beam.compiler.runtimeflow.RuntimeFlowBeamCompiler.compile(RuntimeFlowBeamCompiler.java:82)
90 at org.talend.datastreams.beam.compiler.runtimeflow.RuntimeFlowBeamCompiler.compile(RuntimeFlowBeamCompiler.java:29)
91 at org.talend.datastreams.streamsjob.FullRunJob$.runJob(FullRunJob.scala:105)
92 at org.talend.datastreams.streamsjob.FullRunJob$.main(FullRunJob.scala:54)
93 at org.talend.datastreams.streamsjob.FullRunJob.main(FullRunJob.scala)
94 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
95 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
96 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
97 at java.lang.reflect.Method.invoke(Method.java:498)
98 at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)
99 at org.apache.spark.deploy.SparkSubmit$$anon$1.run(SparkSubmit.scala:162)
100 at org.apache.spark.deploy.SparkSubmit$$anon$1.run(SparkSubmit.scala:160)
101 at java.security.AccessController.doPrivileged(Native Method)
102 at javax.security.auth.Subject.doAs(Subject.java:422)
103 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
104 at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:160)
105 at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
106 at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
107 at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
108Caused by: java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. Invalid because: WindowFn has already been consumed by previous GroupByKey
 
I don't believe I should need to add a Window before each of these components, so is this something I am misunderstanding or is this a bug? The pipeline writes data to S3 if I add a Window before each of the components, but I think it is messing with the results and just does not feel right. Any ideas?

Accepted Solutions
Employee

Re: Windowing question

Hello!  My apologies for the delay in responding...

 

It looks like there was a bug in Beam 2.0.0 (AMI) that might be related: https://issues.apache.org/jira/browse/BEAM-3122  (In this case, they observed a similar problem, even in cases where there were windows before each aggregate).  Unfortunately, this might "muddy" the waters of your testing.  I believe there was a more-relevant fix to your use case in Beam 2.3.0, but I can't seem to find the JIRA.  For info, we track the latest versions of Beam pretty closely so we're

 

In any case, IIRC, it should be OK to have multiple aggregations between windows -- the first window should be used to define which records are related (by time) for the first aggregate and the second aggregate can be used to further process the records in the batch.  I'll check this and get back to you!

 

You might be interested : there is *no* generated code for Data Streams.  We assemble compiled component libraries instead.  The obvious disadvantage is that you can no longer step through a job like in some other Talend products, but overall it's been a net gain for quality and performance!

 


All Replies
Employee

Re: Windowing question

Hello @rhall_2_0,

 

Thanks for trying so much Talend Data Streams capabilities!

 

Windowing can get very complex very quickly. Indeed, you should be able to use two Aggregates in a row without having a window for each. Actually it is definitely possible as you can see in this pipeline :

 

retail_clothes_alerts.png

 

When using two Aggregates you need to be very careful on which key you "group" your data. In the error message you got, the important line is :

108Caused by: java.lang.IllegalStateException: GroupByKey must have a valid Window merge function. Invalid because: WindowFn has already been consumed by previous GroupByKey

Can we have screenshots of the two aggregates you have set up in your pipeline ? It would be easier for us to understand where the issue is, or if there is a bug.

 

Thank you very much!

 

Thibaut

Sixteen Stars

Re: Windowing question

Hi @tgourdel, this was a weird problem (and I have since solved my issue by refining my pipeline to use fewer components), but I certainly do get errors in *some* situations when I do not have a window per aggregate. The two screenshots below show my pipeline. The first one, is the one with a window per aggregate. This one works.

 

Screen Shot 2018-08-08 at 22.45.04.png

 

If I remove window 2, the pipeline fails with the error seen in this screenshot....

 

Screen Shot 2018-08-08 at 22.45.47.png

The aggregate referenced in the error is aggregate 2. I am not expecting you to be able to solve this from a screenshot, I just wanted you to see that it is happening and the only difference I made was to add a window to just before the aggregate, and it worked.

 

I know this might be hard, but when working with Talend DI and ESB we have access to look at the generated code. When you have a bit of Java knowledge, access to the code AND error stacks, you can often figure out the issue for yourself. Is there a plan to enable debugging like this for Data Streams? Also it should be noted that the Python errors are not aligned to the correct the lines of code. You can figure this out by shifting around the lines of Python when debugging (which helps reveal the actual line of code) but it would be nice if we are told line 36 has errored, for the error to be on line 36 and not line 29 :-)

Sixteen Stars

Re: Windowing question

Just noticed, it seems to happen when a sliding window is NOT used.

Employee

Re: Windowing question

Hello!  My apologies for the delay in responding...

 

It looks like there was a bug in Beam 2.0.0 (AMI) that might be related: https://issues.apache.org/jira/browse/BEAM-3122  (In this case, they observed a similar problem, even in cases where there were windows before each aggregate).  Unfortunately, this might "muddy" the waters of your testing.  I believe there was a more-relevant fix to your use case in Beam 2.3.0, but I can't seem to find the JIRA.  For info, we track the latest versions of Beam pretty closely so we're

 

In any case, IIRC, it should be OK to have multiple aggregations between windows -- the first window should be used to define which records are related (by time) for the first aggregate and the second aggregate can be used to further process the records in the batch.  I'll check this and get back to you!

 

You might be interested : there is *no* generated code for Data Streams.  We assemble compiled component libraries instead.  The obvious disadvantage is that you can no longer step through a job like in some other Talend products, but overall it's been a net gain for quality and performance!

 

Sixteen Stars

Re: Windowing question

Thanks for your help @rskraba and @tgourdel. As I said, I have found a workaround which suits my needs in this case. It is great to have you guys monitoring the community :-)