2013年10月1日 星期二

[JAVA] Big Data(2) 終於寫出小兒科等級的mapreduce!!!

先來個最近當紅的半澤直樹



在Big Data的時代,很多工具都可以做mapreduce的計算,甚至不用知道mapreduce的原理
像是Hive, Pig
又或者是更有彈性的R的rmr2套件

說穿了以上的解決方案底層的邏輯都還是把較簡單的程式碼轉換成java code再執行
因此,為了炫耀或是其他更重要目的,用java寫mapreduce還是有其必要性的(肯定貌)

用java寫mapreduce的架構主要有三個部分:
1.Map class
2.Reduce class
3.主程式

在建立好Map和Reduce兩個class以後
再於主程式內用job方法呼叫
另外job方法也用來設定運行時的指令和資料輸入

說了一大堆,實際開始寫code才發現太複雜的mapreduce我也hadle不了啦(逃~~)
只好從小怪開始打起
wordcount的範例在網路上實在太多了有點蘚
所以就來做個平均數計意思意思一下

首先data長這樣,已經丟到hadoop上了是用逗點分隔的
我希望計算Tom和Mary等人三科的平均分數

$hadoop fs -cat chinese.txt
Tom,90
Mary,95
Lisa,94
Luke,87
Marx,76
Teresa,89

$hadoop fs -cat math.txt:
Tom,67
Mary,68
Lisa,75
Luke,82
Marx,66
Teresa,74

$hadoop fs -cat science.txt:
Tom,56
Mary,75
Lisa,85
Luke,96
Marx,74
Teresa,69


java code在這:

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Iterator;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.StringUtils;

public class Avg{

 //map步驟
 public static class Map extends Mapper {
 
  //撰寫map方法
  public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      String line = value.toString();//數據先轉成字串(String)
      StringTokenizer script = new StringTokenizer(line, "\n");//分割數據
      while (script.hasMoreElements()) {
       StringTokenizer scriptLine = new StringTokenizer(script.nextToken());
           //將Key和Value用逗號(",")分開
              Text Name = new Text(scriptLine.nextToken(","));
              int Score = Integer.parseInt(scriptLine.nextToken(","));
              context.write(Name, new IntWritable(Score));
      }
  }
 }
 
 //reduce 步驟
 public static class Reduce extends Reducer {
         //撰寫reduce方法
   public void reduce(Text key, Iterable value, Context context) throws IOException, InterruptedException{
          int numerator = 0;//初始化分子
          int denominator = 0;//初始化分母
          for (IntWritable v : value) {
                  numerator += v.get();//分子累加
                  denominator ++;//分母每次+1
          }
          int avg = numerator/denominator;//相除
             context.write(key,new  IntWritable(avg));
         }
 }
 
 //主程式
 public static void main(String[] args) throws Exception {
 
   Configuration conf = new Configuration();
   String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
         Path dst_path = new Path (otherArgs[1]);
         FileSystem hdfs = dst_path.getFileSystem(conf);
 
         //檢查OUTPUT的路徑是否存在,有的話就宰了他!
         if (hdfs.exists(dst_path)){
          hdfs.delete(dst_path, true);
         };
 
         Job job = new Job(conf, "Avg");
            job.setJarByClass(Avg.class);
            job.setMapperClass(Map.class);
            job.setCombinerClass(Reduce.class);
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

            System.exit(job.waitForCompletion(true) ? 0 : 1);
         }
 }


寫完之後還沒結束
得先用commend line把code做成一個job

mkdir test_classes javac -classpath /{hadoop_home}/hadoop-core-0.20.2-cdh3u1.jar
:/{hadoop_home}/commons-cli-1.2.jar : -d test_classes Avg.java
jar -cvf ~/Avg.jar -C test_classes/ .

用javac編譯的時候記得要用-classpath指令把參考的library指給他(很重要)
之後再把它打包成.jar的檔案
之後就可以用hadoop jar指令執行了!
$hadoop jar ~/Avg.jar org.test.Avg {input} {output}

執行完來看看結果
$hadoop fs -cat ~/part-m-0000

Luke    88
Marx    72
Mary    79
Teresa  77
Tom     71



------ 後記:

其實這樣的東西不是很夠用 T_T 稍微進階版的Multiple input可以參考另一篇

6 則留言:

  1. 在執行最後一步,$hadoop jar ~/Avg.jar org.test.Avg {input} {output}
    會失敗且出現,java.io.IOException: Type mismatch in key from map: expected org.apache.hadoop.io.Text, recieved org.apache.hadoop.io.LongWritable
    請問板主有遇到這個問題嗎?該如何解決?謝謝

    回覆刪除
    回覆
    1. 檢查class path底下的jar有沒有漏掉

      刪除
  2. 請問我執行javac時會發生錯誤,是哪裡出了問題嗎? 謝謝
    Avg.java:46: error: incompatible types
    for (IntWritable v : value) {
    ^
    required: IntWritable
    found: Object
    Note: Avg.java uses unchecked or unsafe operations.
    Note: Recompile with -Xlint:unchecked for details.
    1 error

    回覆刪除
    回覆
    1. 再compile一次加上-Xlint:unchecked選項試試看

      刪除
  3. 作者已經移除這則留言。

    回覆刪除