Multiple Input
在map-reduce時,若不同的資料來源要塞給不同的mapper,最後再一起塞進reducer運算,就需要使用Multiple Input 的功能。如下圖,有三個不同的資料來源,先分別進入不同的mapper,然後最後要進到同一個reducer。
首先要先import需要的的類別
org.apache.hadoop.mapreduce.lib.input.MultipleInputs
然後在主程式(main)中寫入下面這一行,告訴電腦你要把哪一筆資料送進哪一個Mapper class的map函數
MultipleInputs.addInputPath(Job名稱, 輸入資料的位址, 格式, Mapper class的名字);
接下來看一個簡單到近乎無腦的例子
假設手上有三筆資料,都包含學號、科目和成績,但是長相就是不太一樣,現在我們要計算各科的平均分數
第一筆
$HADOOP/hadoop fs -cat input_1 | head -3
WUJ-360100;math;56
WPY-802007;math;98
FKT-670008;science;67
第二筆
$HADOOP/hadoop fs -cat input_2 | head -3
{Number:FJB-004150, Subject:math, Score:96}
{Number:QDG-300700, Subject:chinese, Score:90}
{Number:JVY-030140, Subject:chinese, Score:71}
第三筆
$HADOOP/hadoop fs -cat input_3 | head -3
[Number=>ITM-501806; Subject=>science; Score=>82]
[Number=>QBE-003981; Subject=>math; Score=>85]
[Number=>EUJ-017009; Subject=>chinese; Score=>63]
以上三種長相的資料要分別送給三種不同的Mapper中處理,產生(subject, score)的pair然後統一送進一個Reducer做平均數的計算,所以要準備三種Mapper
public static class Map1 extends Mapper
{
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException
{
// get the student number
String stNum = value.toString().split(";")[1];
// get score
int score = Integer.parseInt(value.toString().split(";")[2]);
con.write(new Text(stNum), new IntWritable(score));
}
}
public static class Map2 extends Mapper
{
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException
{
// "shave" the input value
String line = value.toString().replaceAll("}", "");
if(line.contains(",")){
// get the student number
String stNum = line.split(",")[1].split(":")[1];
// get score
int score = Integer.parseInt(line.split(",")[2].split(":")[1]);
con.write(new Text(stNum), new IntWritable(score));
}
}
}
public static class Map3 extends Mapper
{
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException
{
// "shave" the input value
String line=value.toString().replaceAll("[]\\[]", "");
if(line.contains(";")){
// get the student number
String stNum = line.split(";")[1].split("=>")[1];
// get score
int score = Integer.parseInt(line.split(";")[2].split("=>")[1]);
con.write(new Text(stNum), new IntWritable(score));
}
}
}
Reducer其實就只需要一個就可以了
public static class Red extends Reducer
{
public void reduce(Text stNum, Iterable scores, Context con)
throws IOException , InterruptedException
{
int numerator = 0;
int denominator = 0;
for (IntWritable v : scores){
numerator += v.get();
denominator ++;
}
int avg = numerator/denominator;
con.write(stNum, new IntWritable(avg));
}
}
然後是比較麻煩的主程式
public static void main(String[] args) throws Exception
{
Configuration conf=new Configuration();
String[] files=new GenericOptionsParser(conf,args).getRemainingArgs();
Path inPath1=new Path(files[0]);
Path inPath2=new Path(files[1]);
Path inPath3=new Path(files[2]);
Path outPath=new Path(files[3]);
FileSystem hdfs = outPath.getFileSystem(conf);
if (hdfs.exists(outPath)){
hdfs.delete(outPath, true);
};
Job exampleJob = new Job(conf,"example");
exampleJob.setJarByClass(MpInputExp.class);
exampleJob.setMapperClass(Map1.class);
exampleJob.setMapperClass(Map2.class);
exampleJob.setMapperClass(Map3.class);
exampleJob.setReducerClass(Red.class);
exampleJob.setOutputKeyClass(Text.class);
exampleJob.setOutputValueClass(IntWritable.class);
MultipleInputs.addInputPath(exampleJob, inPath1, TextInputFormat.class, Map1.class);
MultipleInputs.addInputPath(exampleJob, inPath2, TextInputFormat.class, Map2.class);
MultipleInputs.addInputPath(exampleJob, inPath3, TextInputFormat.class, Map3.class);
FileOutputFormat.setOutputPath(exampleJob, outPath);
System.exit(exampleJob.waitForCompletion(true) ? 0:1);
}
要注意MultipleInputs.addInputPath有沒有把Input和Mapper配對好
最後來看結果(打包部分省略,可以參考
這裡)
$HADOOP/hadoop fs -getmerge output_exp output_exp
cat output_exp
science 68
chinese 70
math 68
送上所有JAVA code結束這惱人的一切
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class MpInputExp
{
public static class Map1 extends Mapper
{
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException
{
// get the student number
String stNum = value.toString().split(";")[1];
// get score
int score = Integer.parseInt(value.toString().split(";")[2]);
con.write(new Text(stNum), new IntWritable(score));
}
}
public static class Map2 extends Mapper
{
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException
{
// "shave" the input value
String line = value.toString().replaceAll("}", "");
if(line.contains(",")){
// get the student number
String stNum = line.split(",")[1].split(":")[1];
// get score
int score = Integer.parseInt(line.split(",")[2].split(":")[1]);
con.write(new Text(stNum), new IntWritable(score));
}
}
}
public static class Map3 extends Mapper
{
public void map(LongWritable key, Text value, Context con)
throws IOException, InterruptedException
{
// "shave" the input value
String line=value.toString().replaceAll("[]\\[]", "");
if(line.contains(";")){
// get the student number
String stNum = line.split(";")[1].split("=>")[1];
// get score
int score = Integer.parseInt(line.split(";")[2].split("=>")[1]);
con.write(new Text(stNum), new IntWritable(score));
}
}
}
public static class Red extends Reducer
{
public void reduce(Text stNum, Iterable scores, Context con)
throws IOException , InterruptedException
{
int numerator = 0;
int denominator = 0;
for (IntWritable v : scores){
numerator += v.get();
denominator ++;
}
int avg = numerator/denominator;
con.write(stNum, new IntWritable(avg));
}
}
public static void main(String[] args) throws Exception
{
Configuration conf=new Configuration();
String[] files=new GenericOptionsParser(conf,args).getRemainingArgs();
Path inPath1=new Path(files[0]);
Path inPath2=new Path(files[1]);
Path inPath3=new Path(files[2]);
Path outPath=new Path(files[3]);
FileSystem hdfs = outPath.getFileSystem(conf);
if (hdfs.exists(outPath)){
hdfs.delete(outPath, true);
};
Job exampleJob = new Job(conf,"example");
exampleJob.setJarByClass(MpInputExp.class);
exampleJob.setMapperClass(Map1.class);
exampleJob.setMapperClass(Map2.class);
exampleJob.setMapperClass(Map3.class);
exampleJob.setReducerClass(Red.class);
exampleJob.setOutputKeyClass(Text.class);
exampleJob.setOutputValueClass(IntWritable.class);
MultipleInputs.addInputPath(exampleJob, inPath1, TextInputFormat.class, Map1.class);
MultipleInputs.addInputPath(exampleJob, inPath2, TextInputFormat.class, Map2.class);
MultipleInputs.addInputPath(exampleJob, inPath3, TextInputFormat.class, Map2.class);
FileOutputFormat.setOutputPath(exampleJob, outPath);
System.exit(exampleJob.waitForCompletion(true) ? 0:1);
}
}