We use MultipleInputs class which supports MapReduce jobs that have multiple input paths with a different InputFormat and Mapper for each path. To understand the concept more clearly let us take a case where user wants to take input from two input files with a similar structure. Also assume that both the input files have 2 columns, first having "Name" and second having "Age". We want to simply combine the data and sort it by "Name". What do we need to do? Just two things:
- Use two mapper classes.
- Specify the mapper classes in MultipleInputs class object in run/main method.
File 1 File 2
Aman 19 Ash 12
Tom 20 James 21
Tony 15 Punk 21
John 18 Frank 20
Johnny 19
Hugh 17
Here is the code for the same. Notice two mapper classes with same logic and only single reducer.
import java.io.IOException;
import mutipleInput.Join;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class multiInputFile extends Configured implements Tool
{
public static class CounterMapper extends Mapper
{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String[] line=value.toString().split("\t");
context.write(new Text(line[0]), new Text(line[1]));
}
}
public static class CountertwoMapper extends Mapper
{
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
String[] line=value.toString().split("\t");
context.write(new Text(line[0]), new Text(line[1]));
}
}
public static class CounterReducer extends Reducer
{
String line=null;
public void reduce(Text key, Iterable values, Context context )
throws IOException, InterruptedException
{
for(Text value:values)
{
line = value.toString();
}
context.write(key, new Text(line));
}
}
public int run(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "aggprog");
job.setJarByClass(multiInputFile.class);
MultipleInputs.addInputPath(job,new Path(args[0]),TextInputFormat.class,CounterMapper.class);
MultipleInputs.addInputPath(job,new Path(args[1]),TextInputFormat.class,CountertwoMapper.class);
FileOutputFormat.setOutputPath(job, new Path(args[2]));
job.setReducerClass(CounterReducer.class);
job.setNumReduceTasks(1);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
return (job.waitForCompletion(true) ? 0 : 1);
}
public static void main(String[] args) throws Exception {
int ecode = ToolRunner.run(new multiInputFile(), args);
System.exit(ecode);
}
}
Here is the output.
Ash 12
Tony 15
Hugh 17
John 18
Aman 19
Johnny 19
Frank 20
Tom 20
James 21
Punk 21