Five Stars

Custom Spark code through tjava

Hi,

 

Is there any wordcount like example available on how to integrate our already written custom spark code in talend.I checked the knowledge base and could see tjava for spark supports it but I am not able to find an example to understand the syntax described in the component comments.

Any help will be much appreciated.

 

Best Regards,

Ojasvi Gambhir

4 REPLIES
Six Stars

Re: Custom Spark code through tjava

Hi Ojasvi

 

You can check the below code for your reference and can customize according to your requirement:

 

package PackageDemo;
 
 
import java.io.IOException;
 
import org.apache.hadoop.conf.Configuration;
 
import org.apache.hadoop.fs.Path;
 
import org.apache.hadoop.io.IntWritable;
 
import org.apache.hadoop.io.LongWritable;
 
import org.apache.hadoop.io.Text;
 
import org.apache.hadoop.mapreduce.Job;
 
import org.apache.hadoop.mapreduce.Mapper;
 
import org.apache.hadoop.mapreduce.Reducer;
 
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 
import org.apache.hadoop.util.GenericOptionsParser;
 
 
 
 
 
public class WordCount {
 
public static void main(String [] args) throws Exception
 
{
 
Configuration c=new Configuration();
 
String[] files=new GenericOptionsParser(c,args).getRemainingArgs();
 
Path input=new Path(files[0]);
 
Path output=new Path(files[1]);
 
Job j=new Job(c,"wordcount");
 
j.setJarByClass(WordCount.class);
 
j.setMapperClass(MapForWordCount.class);
 
j.setReducerClass(ReduceForWordCount.class);
 
j.setOutputKeyClass(Text.class);
 
j.setOutputValueClass(IntWritable.class);
 
FileInputFormat.addInputPath(j, input);
 
FileOutputFormat.setOutputPath(j, output);
 
System.exit(j.waitForCompletion(true)?0:1);
 
}
 
public static class MapForWordCount extends Mapper<LongWritable, Text, Text, IntWritable>{
 
public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException
 
{
 
String line = value.toString();
 
String[] words=line.split(",");
 
for(String word: words )
 
{
 
      Text outputKey = new Text(word.toUpperCase().trim());
 
  IntWritable outputValue = new IntWritable(1);
 
  con.write(outputKey, outputValue);
 
}
 
}
 
}
 
 
public static class ReduceForWordCount extends Reducer<Text, IntWritable, Text, IntWritable>
 
{
 
public void reduce(Text word, Iterable<IntWritable> values, Context con) throws IOException, InterruptedException
 
{
 
int sum = 0;
 
   for(IntWritable value : values)
 
   {
 
   sum += value.get();
 
   }
 
   con.write(word, new IntWritable(sum));
 
}
 
 
}
 
 
}
Five Stars

Re: Custom Spark code through tjava

thank you fuzzyedy  for you quick response.

Will this code work for spark job tjava component .

My requirement basically is to know how to write spark custom code in tjava component with talend specified syntax present as comments in tjava component for spark, I do have code for wordcount in java spark but not able to integrate with tjava nomenclature.

 

thanks and regards,

Ojasvi Gambhir

Six Stars

Re: Custom Spark code through tjava

yes , this should work. Go and give a try! 

 

goodluck!

Employee

Re: Custom Spark code through tjava

Hi,

these is a example code for tJava with Spark job. the code  sample of the component is wrong  ( Talend 6.4.1)

 

in the basic setting :

 

outputrdd_tJava_1 = rdd_tJava_1.map(new mapInToOut(job)).

in the advanced setting, in class  java field

	public static class mapInToOut
			implements
			org.apache.spark.api.java.function.Function<inputStruct, RecordOut_tJava_1> {

		private ContextProperties context = null;
        private java.util.List<org.apache.avro.Schema.Field> fieldsList;
		
		public mapInToOut(JobConf job) {
			this.context = new ContextProperties(job);
		}
		
		@Override
		public RecordOut_tJava_1 call(inputStruct origStruct) {		
			
		if (fieldsList == null) {
				this.fieldsList = (new inputStruct()).getSchema()
						.getFields();
			}

			RecordOut_tJava_1 value = new RecordOut_tJava_1();

			for (org.apache.avro.Schema.Field field : fieldsList) {
				value.put(field.pos(), origStruct.get(field.pos()));
			}

			return value;		
			
		}
	}