Firstly you need to understand the concept of mapreduce. It can be understood very easily by the following images
Traditional Way:
MapReduce Way:
Program:
package name_of_your_package
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static void main(String [] args) throws Exception
{
Configuration conf=new Configuration();
String[] files=new GenericOptionsParser(conf,args).getRemainingArgs();
Path input=new Path(files[0]);
Path output=new Path(files[1]);
Job a=new Job(conf,"wordcount");
a.setJarByClass(WordCount.class);
a.setMapperClass(MapForWordCount.class);
a.setReducerClass(ReduceForWordCount.class);
a.setOutputKeyClass(Text.class);
a.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(a, input);
FileOutputFormat.setOutputPath(a, output);
System.exit(a.waitForCompletion(true)?0:1);
}
public static class MapForWordCount extends Mapper<LongWritable, Text, Text, IntWritable>{
public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException
{
String line = value.toString();
String[] words=line.split(",");
for(String word: words )
{
Text outputKey = new Text(word.toUpperCase().trim());
IntWritable outputValue = new IntWritable(1);
con.write(outputKey, outputValue);
}
}
}
public static class ReduceForWordCount extends Reducer<Text, IntWritable, Text, IntWritable>
{
public void reduce(Text word, Iterable<IntWritable> values, Context con) throws IOException, InterruptedException
{
int sum = 0;
for(IntWritable value : values)
{
sum += value.get();
}
con.write(word, new IntWritable(sum));
}
}
}