Multiple Input
在map-reduce時,若不同的資料來源要塞給不同的mapper,最後再一起塞進reducer運算,就需要使用Multiple Input 的功能。如下圖,有三個不同的資料來源,先分別進入不同的mapper,然後最後要進到同一個reducer。
首先要先import需要的的類別
org.apache.hadoop.mapreduce.lib.input.MultipleInputs
然後在主程式(main)中寫入下面這一行,告訴電腦你要把哪一筆資料送進哪一個Mapper class的map函數
MultipleInputs.addInputPath(Job名稱, 輸入資料的位址, 格式, Mapper class的名字);
接下來看一個簡單到近乎無腦的例子
假設手上有三筆資料,都包含學號、科目和成績,但是長相就是不太一樣,現在我們要計算各科的平均分數
第一筆
org.apache.hadoop.mapreduce.lib.input.MultipleInputs
然後在主程式(main)中寫入下面這一行,告訴電腦你要把哪一筆資料送進哪一個Mapper class的map函數
MultipleInputs.addInputPath(Job名稱, 輸入資料的位址, 格式, Mapper class的名字);
接下來看一個
假設手上有三筆資料,都包含學號、科目和成績,但是長相就是不太一樣,現在我們要計算各科的平均分數
第一筆
$HADOOP/hadoop fs -cat input_1 | head -3 WUJ-360100;math;56 WPY-802007;math;98 FKT-670008;science;67第二筆
$HADOOP/hadoop fs -cat input_2 | head -3 {Number:FJB-004150, Subject:math, Score:96} {Number:QDG-300700, Subject:chinese, Score:90} {Number:JVY-030140, Subject:chinese, Score:71}第三筆
$HADOOP/hadoop fs -cat input_3 | head -3 [Number=>ITM-501806; Subject=>science; Score=>82] [Number=>QBE-003981; Subject=>math; Score=>85] [Number=>EUJ-017009; Subject=>chinese; Score=>63]以上三種長相的資料要分別送給三種不同的Mapper中處理,產生(subject, score)的pair然後統一送進一個Reducer做平均數的計算,所以要準備三種Mapper
public static class Map1 extends MapperReducer其實就只需要一個就可以了{ public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException { // get the student number String stNum = value.toString().split(";")[1]; // get score int score = Integer.parseInt(value.toString().split(";")[2]); con.write(new Text(stNum), new IntWritable(score)); } } public static class Map2 extends Mapper { public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException { // "shave" the input value String line = value.toString().replaceAll("}", ""); if(line.contains(",")){ // get the student number String stNum = line.split(",")[1].split(":")[1]; // get score int score = Integer.parseInt(line.split(",")[2].split(":")[1]); con.write(new Text(stNum), new IntWritable(score)); } } } public static class Map3 extends Mapper { public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException { // "shave" the input value String line=value.toString().replaceAll("[]\\[]", ""); if(line.contains(";")){ // get the student number String stNum = line.split(";")[1].split("=>")[1]; // get score int score = Integer.parseInt(line.split(";")[2].split("=>")[1]); con.write(new Text(stNum), new IntWritable(score)); } } }
public static class Red extends Reducer{ public void reduce(Text stNum, Iterable scores, Context con) throws IOException , InterruptedException { int numerator = 0; int denominator = 0; for (IntWritable v : scores){ numerator += v.get(); denominator ++; } int avg = numerator/denominator; con.write(stNum, new IntWritable(avg)); } }
public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); String[] files=new GenericOptionsParser(conf,args).getRemainingArgs(); Path inPath1=new Path(files[0]); Path inPath2=new Path(files[1]); Path inPath3=new Path(files[2]); Path outPath=new Path(files[3]); FileSystem hdfs = outPath.getFileSystem(conf); if (hdfs.exists(outPath)){ hdfs.delete(outPath, true); }; Job exampleJob = new Job(conf,"example"); exampleJob.setJarByClass(MpInputExp.class); exampleJob.setMapperClass(Map1.class); exampleJob.setMapperClass(Map2.class); exampleJob.setMapperClass(Map3.class); exampleJob.setReducerClass(Red.class); exampleJob.setOutputKeyClass(Text.class); exampleJob.setOutputValueClass(IntWritable.class); MultipleInputs.addInputPath(exampleJob, inPath1, TextInputFormat.class, Map1.class); MultipleInputs.addInputPath(exampleJob, inPath2, TextInputFormat.class, Map2.class); MultipleInputs.addInputPath(exampleJob, inPath3, TextInputFormat.class, Map3.class); FileOutputFormat.setOutputPath(exampleJob, outPath); System.exit(exampleJob.waitForCompletion(true) ? 0:1); }要注意MultipleInputs.addInputPath有沒有把Input和Mapper配對好
最後來看結果(打包部分省略,可以參考這裡)
$HADOOP/hadoop fs -getmerge output_exp output_exp cat output_exp science 68 chinese 70 math 68
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.MultipleInputs; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.GenericOptionsParser; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; public class MpInputExp { public static class Map1 extends Mapper{ public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException { // get the student number String stNum = value.toString().split(";")[1]; // get score int score = Integer.parseInt(value.toString().split(";")[2]); con.write(new Text(stNum), new IntWritable(score)); } } public static class Map2 extends Mapper { public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException { // "shave" the input value String line = value.toString().replaceAll("}", ""); if(line.contains(",")){ // get the student number String stNum = line.split(",")[1].split(":")[1]; // get score int score = Integer.parseInt(line.split(",")[2].split(":")[1]); con.write(new Text(stNum), new IntWritable(score)); } } } public static class Map3 extends Mapper { public void map(LongWritable key, Text value, Context con) throws IOException, InterruptedException { // "shave" the input value String line=value.toString().replaceAll("[]\\[]", ""); if(line.contains(";")){ // get the student number String stNum = line.split(";")[1].split("=>")[1]; // get score int score = Integer.parseInt(line.split(";")[2].split("=>")[1]); con.write(new Text(stNum), new IntWritable(score)); } } } public static class Red extends Reducer { public void reduce(Text stNum, Iterable scores, Context con) throws IOException , InterruptedException { int numerator = 0; int denominator = 0; for (IntWritable v : scores){ numerator += v.get(); denominator ++; } int avg = numerator/denominator; con.write(stNum, new IntWritable(avg)); } } public static void main(String[] args) throws Exception { Configuration conf=new Configuration(); String[] files=new GenericOptionsParser(conf,args).getRemainingArgs(); Path inPath1=new Path(files[0]); Path inPath2=new Path(files[1]); Path inPath3=new Path(files[2]); Path outPath=new Path(files[3]); FileSystem hdfs = outPath.getFileSystem(conf); if (hdfs.exists(outPath)){ hdfs.delete(outPath, true); }; Job exampleJob = new Job(conf,"example"); exampleJob.setJarByClass(MpInputExp.class); exampleJob.setMapperClass(Map1.class); exampleJob.setMapperClass(Map2.class); exampleJob.setMapperClass(Map3.class); exampleJob.setReducerClass(Red.class); exampleJob.setOutputKeyClass(Text.class); exampleJob.setOutputValueClass(IntWritable.class); MultipleInputs.addInputPath(exampleJob, inPath1, TextInputFormat.class, Map1.class); MultipleInputs.addInputPath(exampleJob, inPath2, TextInputFormat.class, Map2.class); MultipleInputs.addInputPath(exampleJob, inPath3, TextInputFormat.class, Map2.class); FileOutputFormat.setOutputPath(exampleJob, outPath); System.exit(exampleJob.waitForCompletion(true) ? 0:1); } }