2014年10月10日 星期五

[JAVA] Big Data(4) Hadoop Multiple Input

Multiple Input

在map-reduce時,若不同的資料來源要塞給不同的mapper,最後再一起塞進reducer運算,就需要使用Multiple Input 的功能。如下圖,有三個不同的資料來源,先分別進入不同的mapper,然後最後要進到同一個reducer。



首先要先import需要的的類別
org.apache.hadoop.mapreduce.lib.input.MultipleInputs

然後在主程式(main)中寫入下面這一行,告訴電腦你要把哪一筆資料送進哪一個Mapper class的map函數

MultipleInputs.addInputPath(Job名稱, 輸入資料的位址, 格式Mapper class的名字);

接下來看一個簡單到近乎無腦的例子
假設手上有三筆資料,都包含學號、科目和成績,但是長相就是不太一樣,現在我們要計算各科的平均分數

第一筆
$HADOOP/hadoop fs -cat input_1 | head -3
WUJ-360100;math;56
WPY-802007;math;98
FKT-670008;science;67
第二筆
$HADOOP/hadoop fs -cat input_2 | head -3
{Number:FJB-004150, Subject:math, Score:96}
{Number:QDG-300700, Subject:chinese, Score:90}
{Number:JVY-030140, Subject:chinese, Score:71}
第三筆
$HADOOP/hadoop fs -cat input_3 | head -3
[Number=>ITM-501806; Subject=>science; Score=>82]
[Number=>QBE-003981; Subject=>math; Score=>85]
[Number=>EUJ-017009; Subject=>chinese; Score=>63]
以上三種長相的資料要分別送給三種不同的Mapper中處理,產生(subject, score)的pair然後統一送進一個Reducer做平均數的計算,所以要準備三種Mapper
public static class Map1 extends Mapper
{
      public void map(LongWritable key, Text value, Context con) 
        throws IOException, InterruptedException
      {
              // get the student number
              String stNum = value.toString().split(";")[1];

              // get score
              int score = Integer.parseInt(value.toString().split(";")[2]);
              con.write(new Text(stNum), new IntWritable(score));
      }
}
public static class Map2 extends Mapper
{
      public void map(LongWritable key, Text value, Context con) 
        throws IOException, InterruptedException
      {       
              // "shave" the input value
              String line = value.toString().replaceAll("}", "");

              if(line.contains(",")){
                      // get the student number
                      String stNum = line.split(",")[1].split(":")[1];

                      // get score
                      int score = Integer.parseInt(line.split(",")[2].split(":")[1]);
                      con.write(new Text(stNum), new IntWritable(score));
              }
      }
}
public static class Map3 extends Mapper
{
      public void map(LongWritable key, Text value, Context con) 
        throws IOException, InterruptedException
      {
              // "shave" the input value
              String line=value.toString().replaceAll("[]\\[]", "");


              if(line.contains(";")){
                // get the student number
                String stNum = line.split(";")[1].split("=>")[1];

                // get score
                int score = Integer.parseInt(line.split(";")[2].split("=>")[1]);
                con.write(new Text(stNum), new IntWritable(score));
              }
      }
}
Reducer其實就只需要一個就可以了
public static class Red extends Reducer
{
     public void reduce(Text stNum, Iterable scores, Context con)
      throws IOException , InterruptedException
      {
              int numerator = 0;
              int denominator = 0;
              for (IntWritable v : scores){
                  numerator += v.get();
                  denominator ++;
              }
              int avg = numerator/denominator;
              con.write(stNum, new IntWritable(avg));
      }
}
然後是比較麻煩的主程式
public static void main(String[] args) throws Exception
{
      Configuration conf=new Configuration();
      String[] files=new GenericOptionsParser(conf,args).getRemainingArgs();
      Path inPath1=new Path(files[0]);
      Path inPath2=new Path(files[1]);
      Path inPath3=new Path(files[2]);
      Path outPath=new Path(files[3]);
      FileSystem hdfs = outPath.getFileSystem(conf);
      if (hdfs.exists(outPath)){
        hdfs.delete(outPath, true);
      };

      Job exampleJob = new Job(conf,"example");
      exampleJob.setJarByClass(MpInputExp.class);
      exampleJob.setMapperClass(Map1.class);
      exampleJob.setMapperClass(Map2.class);
      exampleJob.setMapperClass(Map3.class);
      exampleJob.setReducerClass(Red.class);
      exampleJob.setOutputKeyClass(Text.class);
      exampleJob.setOutputValueClass(IntWritable.class);

      MultipleInputs.addInputPath(exampleJob, inPath1, TextInputFormat.class, Map1.class);
      MultipleInputs.addInputPath(exampleJob, inPath2, TextInputFormat.class, Map2.class);
      MultipleInputs.addInputPath(exampleJob, inPath3, TextInputFormat.class, Map3.class);
      
      FileOutputFormat.setOutputPath(exampleJob, outPath);
      System.exit(exampleJob.waitForCompletion(true) ? 0:1);
}
要注意MultipleInputs.addInputPath有沒有把Input和Mapper配對好

最後來看結果(打包部分省略,可以參考這裡)
$HADOOP/hadoop fs -getmerge output_exp output_exp
cat output_exp

science 68
chinese 70
math    68
送上所有JAVA code結束這惱人的一切
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class MpInputExp
{
  public static class Map1 extends Mapper
  {
        public void map(LongWritable key, Text value, Context con) 
          throws IOException, InterruptedException
        {
                // get the student number
                String stNum = value.toString().split(";")[1];

                // get score
                int score = Integer.parseInt(value.toString().split(";")[2]);
                con.write(new Text(stNum), new IntWritable(score));
        }
  }
  public static class Map2 extends Mapper
  {
        public void map(LongWritable key, Text value, Context con) 
          throws IOException, InterruptedException
        {       
                // "shave" the input value
                String line = value.toString().replaceAll("}", "");

                if(line.contains(",")){
                        // get the student number
                        String stNum = line.split(",")[1].split(":")[1];

                        // get score
                        int score = Integer.parseInt(line.split(",")[2].split(":")[1]);
                        con.write(new Text(stNum), new IntWritable(score));
                }
        }
  }
  public static class Map3 extends Mapper
  {
        public void map(LongWritable key, Text value, Context con) 
          throws IOException, InterruptedException
        {
                // "shave" the input value
                String line=value.toString().replaceAll("[]\\[]", "");


                if(line.contains(";")){
                  // get the student number
                  String stNum = line.split(";")[1].split("=>")[1];

                  // get score
                  int score = Integer.parseInt(line.split(";")[2].split("=>")[1]);
                  con.write(new Text(stNum), new IntWritable(score));
                }
        }
  }
  public static class Red extends Reducer
  {
       public void reduce(Text stNum, Iterable scores, Context con)
        throws IOException , InterruptedException
        {
                int numerator = 0;
                int denominator = 0;
                for (IntWritable v : scores){
                    numerator += v.get();
                    denominator ++;
                }
                int avg = numerator/denominator;
                con.write(stNum, new IntWritable(avg));
        }
   }
  public static void main(String[] args) throws Exception
  {
        Configuration conf=new Configuration();
        String[] files=new GenericOptionsParser(conf,args).getRemainingArgs();
        Path inPath1=new Path(files[0]);
        Path inPath2=new Path(files[1]);
        Path inPath3=new Path(files[2]);
        Path outPath=new Path(files[3]);
        FileSystem hdfs = outPath.getFileSystem(conf);
        if (hdfs.exists(outPath)){
          hdfs.delete(outPath, true);
        };

        Job exampleJob = new Job(conf,"example");
        exampleJob.setJarByClass(MpInputExp.class);
        exampleJob.setMapperClass(Map1.class);
        exampleJob.setMapperClass(Map2.class);
        exampleJob.setMapperClass(Map3.class);
        exampleJob.setReducerClass(Red.class);
        exampleJob.setOutputKeyClass(Text.class);
        exampleJob.setOutputValueClass(IntWritable.class);

        MultipleInputs.addInputPath(exampleJob, inPath1, TextInputFormat.class, Map1.class);
        MultipleInputs.addInputPath(exampleJob, inPath2, TextInputFormat.class, Map2.class);
        MultipleInputs.addInputPath(exampleJob, inPath3, TextInputFormat.class, Map2.class);
        
        FileOutputFormat.setOutputPath(exampleJob, outPath);
        System.exit(exampleJob.waitForCompletion(true) ? 0:1);
  }
}

2014年9月23日 星期二

[機器學習] Paper digest(1) On Multilabel Classification and Ranking with Bandit Feedback

這篇是在Journal of machine learning research上看到的
先附上原文位址

這篇文章提出一個機器學習方法在資訊不充足的情況下做出決策

所謂資訊不足指的是,我們所收集到的資料是未經排序的,但是我們要給的決策建議是卻是排序的。舉例而言,收集到的資料只有某些人的某些特質和他們各自看了哪些電影(partial feedback),卻不知道這些人對這些電影的喜好程度,我們要如何給出一個排序過的推薦清單?


Loss function for partial information

這篇文章的作者發展的loss function:

$l_{a,c} \big(Y_{t}, \widehat{Y_{t}} \big) =a |Y_{t} \backslash \widehat{Y_{t}}|+\big(1-a\big) \sum_{i \in\widehat{Y_{t}} \setminus Y_{t}} c \big(j_{i}, |\widehat{Y_{t}}| \big) $

其中$Y_{t}$是收集到的資料,$\widehat{Y_{t}}$是建議的決策,因此$Y_{t}$是沒有排序的,而$\widehat{Y_{t}}$是有排序的

$|Y_{t}, \widehat{Y_{t}}|$是指有多少$Y_{t}$裡的element沒有出現在$\widehat{Y_{t}}$裡面,概念上有點像是Hamming loss function;而$ \sum_{i \in\widehat{Y_{t}} \setminus Y_{t}} c \big(j_{i}, |\widehat{Y_{t}}| \big)$則是考量了排序,$\widehat{Y_{t}}$中,沒有出現在$Y_{t}$的元素要是排得越前面,計分就越重

舉例來說,如果

$Y_{t}=\left \{ 1,3,8 \right \}$

$\widehat{Y_{t}} = \big(4,3,6\big)$



$|Y_{t}, \widehat{Y_{t}}| = 2$ ($Y_{t}$中的$1$, $3$不屬於$\widehat{Y_{t}}$)

$\sum_{i \in\widehat{Y_{t}} \setminus Y_{t}} c \big(j_{i}, |\widehat{Y_{t}}| \big) = 3/3 + 1/3$  ($4$ 和 $6$的權重分別為$3/3$和$1/3$)


Multilabel classification

利用線性模型

$P_{t}(y_{1,t}, y_{2,t}, ...y_{k,t})=P_{t}(y_{1,t}, y_{2,t}, ...y_{k,t}|x_{t})$

$P_{t}(y_{i,t}=1) = \frac{g(-u_{i}^{T})}{g(u_{i}^{T}) + g(-u_{i}^{T})}$

最佳化上述的loss function找最好的決策,上述的loss function也可以寫成

$a \sum_{i=1}^{K}y_{i,t}+\big(1-a\big) \sum_{i \in\widehat{Y_{t}}}\big( c (j_{i}, |\widehat{Y_{t}}| ) - \big(\frac{a}{1-a} + c (j_{i}, |\widehat{Y_{t}}| ) \big)y_{i,t}\big)$

由於$\sum_{i=1}^{K}y_{i,t}$和$\widehat{Y_{t}}$基本上是無關的,所以在計算中只要最佳化

$E\left [l_{a,c} \big(Y_{t}, \widehat{Y_{t}} \big)   \right ]=\big(1-a\big) \sum_{i \in\widehat{Y_{t}}}\big( c (j_{i}, |\widehat{Y_{t}}| ) - \big(\frac{a}{1-a} + c (j_{i}, |\widehat{Y_{t}}| ) \big)p_{i,t}\big)$

就可以了


Others

作者也提出了另一個loss function:

$l_{p-rank, t}(Y, f)=\sum_{i,j\in \widehat{Y}_{t}:y_{i} <  y_{j}}\big(\left \{f_{i}(x_{t} ) < f_{j}(x_{t} )  \right \} + \frac{1}{2}\left \{ f_{i}(x_{t} ) = f_{j}(x_{t} ) \right \} \big)+ S_{t}|Y_{t}\backslash \widehat{Y}_{t}|$

$f$是ranking function而$ \left \{... \right \}$是indicator function,$S_{t}$則是$\widehat{Y}_{t}$的長度

作為評估排序表現的依據

2014年2月16日 星期日

[JAVA] JDBC 連接 SQL database

JDBC是一個蠻好用的Package,可以從這裡下載

這裡記錄一些最近摸索的基本的用法,以MS-SQL為例

在使用前記得要(搖一搖?)

import java.sql.*;

0. Connection

String conUrl = "jdbc:sqlserver://portNumber:XX;serverName=XX;databaseName=XX;user=XX;password=*****;";
Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver");
Connection con = DriverManager.getConnection(conUrl);

1. Create , table, truncate table

try{
        String query="XXX";//可以是create, drop或是truncate table的sql語句
 
        Statement stmt = con.createStatement();
        stmt.executeUpdate(query);
        stmt.close();
        con.close();

        }catch (Exception e) {
            e.printStackTrace();
        }
    }

2. Insert data

如果只是要插入一筆資料的話,可以把上面的
String query="XXX";
換成
String query="insert into xxx...";
也可以參考prepare statement的寫法,好處是可以不用重複寫sql語句,只要把問號(?)的地方取代成想要插入的值
try{
        String sql="INSERT INTO XXX (column1,column2) VALUES (?,?)";//prepare statement
 
        PreparedStatement pstmt = con.prepareStatement(sql);
        pstmt.setString(1, "插入值1");//第一個?要插入的值
        pstmt.setString(2, "插入值2");//第二個?要插入的值
        pstmt.executeUpdate();
        
        pstmt.close();
        con.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
如果要insert的資料有很多筆,其實一筆一筆塞是有點沒效率的,所以如果要塞入很多筆資料的話,可以先寫入Batch再一次執行
try{
        String sql="INSERT INTO XXX (column1,column2) VALUES (?,?)";//prepare statement
 
        PreparedStatement pstmt = con.prepareStatement(sql);

        for(String s : Data){
            pstmt.setString(1, "要插入的資料");//第一個?要插入的值
            pstmt.setString(2, "要插入的資料");//第二個?要插入的值
            pstmt.addBatch();//寫入Batch
        }

        pstmt.executeBatch();//執行Batch

        pstmt.close();
        con.close();
        }catch (Exception e) {
            e.printStackTrace();
        }
    }

3. 讀料取資

讀取SQL database的資料算是蠻常用的
try{
        String sql="SELECT XXX ...";//Query語句
     Statement stmt = con.createStatement();
 
     ResultSet rs = stmt.executeQuery(sql);//Query結果存在這裡
        ResultSetMetaData rsmd = rs.getMetaData(); //取得Query資料

        int numColumns = rsmd.getColumnCount();

        while (rs.next()){//while loop 一筆一筆iterate
           for(int i = 1; i < numColumns+1; i ++){
           System.out.println(rs.getString(rsmd.getColumnName(i)));//印出資料
        }
        }

        stmt.close();
        con.close();
        
        }catch (Exception e) {
            e.printStackTrace();
        }
    }
以上是用print當作示範,當然也可以把資料讀下來做其搭更複雜的計算,比如說塞進array裡面bla bla bla..

2014年1月13日 星期一

[Python] 小用法備忘

最近很少跟他打交道,有些小東西雖然基本,但是久不用就會忘記阿阿阿阿阿

lambda

有別於function的寫法,lamda用過即丟,用來建立簡單的function
>>> test = lambda x,y,z: x*y*z
>>> test(1,2,3)
6

filter

用filter函數把不符條件的element濾掉
>>> a = [31,545,21,2,0,231,56]
>>> func = lambda x: x>2
>>> filter(func,a)
[31, 545, 21, 231, 56]

map

用map函數計算每個member的結果
>>> a = [31,545,21,2,0,231,56]
>>> func = lambda x: x+2
>>> map(func, a)
[33, 547, 23, 4, 2, 233, 58]

reduce

用reduce計算$f(f...f(f(x_{0},x_{1}),x_{2})..,x_{n})$
>>> a = [1,2,3,4,5]
>>> func = lambda x,y: x*y
>>> reduce(func, a)
120

range & xrange

用range 函數來產生數列
>>> range(10)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

xrange函數用在迴圈中加強效能
>>> for i in xrange(10):
 print(i)

 
0
1
2
3
4
5
6
7
8
9

iterate寫法

iterate寫法是python特有的解決方案,和for loop類似
[i*5 for i in xrange(4)]
[0, 5, 10, 15]

2013年10月10日 星期四

[R] OOP(1) class的寫法

OOP就是 object-oriented programming的縮寫

中文比較常見的翻譯是「物件導向」

我其實還蠻討厭把「物件導向」不斷掛在嘴上的人,因為我覺得這四個字非常難以顧名思義

尤有甚者,一大堆屬性啦、方法啦之類完全莫名其妙的名詞就是要嚇退那些剛剛接觸程式設計的人阿!

好啦曾經的我就是被嚇得很慘的人之一(愧)




言歸正傳,說到OOP,R裡語言中當然也是有class寫法的

R語言裡主要有兩種形態的class:S3和S4

S3 Class

在R中最原始的class就是S3 class

我們可以把任何東西指定一個S3 class,比如說一個list開始

假設我今天把a這個list指定給一個叫做BodyIndex的類別
> a<- list(name="John", weight=65, height=166)
> class(a)<- "BodyIndex"
> a
$name
[1] "John"

$weight
[1] 65

$height
[1] 166

attr(,"class")
[1] "BodyIndex"

我們現在就可以幫這個特定的class寫一個他的print
print.BodyIndex<- function(l){
    BMI<- l$weight/((l$height/100)^2)
    cat(l$name,"\n")    
    cat("BMI is ", BMI)
}

當我們再請系統把它print出來的時候
> a
John 
BMI is  23.58833

S3 class當然是可以繼承的
> b<- list(name="Tom", weight=75, height=172, sex="Male")
> class(b)<- c("profile","BodyIndex")
> b
Tom 
BMI is  25.35154

隨著需求,你可以改寫更多某個類別專屬的內建函數(generic function)

S4 Class

R語言裡也提供比較安全的S4類別,但是寫法就比較複雜了

首先要用setClass()定義class

然後用new()把特定的東西指向class
> setClass("BodyIndex",
+     representation(
+         name = "character",
+         weight = "numeric",
+         height = "numeric"
+     )
+ )
> a<- new("BodyIndex",name="John", weight=65, height=166)

在S4 class裡的member我們都叫做slot

我們可以用@把他們叫出來
> a@name
[1] "John"
> a@weight
[1] 65
> a@height
[1] 166

然後不同於S3 class,S4 class是用show()讓我們看到它裡面的slot
> show(a)
An object of class "BodyIndex"
Slot "name":
[1] "John"

Slot "weight":
[1] 65

Slot "height":
[1] 166

當然我們也可以修改它,只是要用setMethod()
> setMethod("show", "BodyIndex",
+     function(obj){
+         BMI<- obj@weight/((obj@height/100)^2)
+         cat(obj@name,"\n")    
+         cat("BMI is ", BMI)
+     }
+ )

> a
John 
BMI is  23.58833

S4 class的架構下可以用setGeneric()設定自己的generic function
> setGeneric("Warnning",
+     function(obj){
+         BMI<- obj@weight/((obj@height/100)^2)
+         if(BMI>35){
+             cat("You are over-weight")
+         }else{cat("You are normal")}
+     } 
+ )

> Warnning(a)
You are normal

當然也可以用setMethod()客制化

> setMethod("Warnning", "BodyIndex",
+     function(obj){
+         BMI<- obj@weight/((obj@height/100)^2)
+         if(BMI>35){
+             cat("You are over-weight")
+         }else{cat("Good for you~~")}
+     } 
+ )

> Warnning(a)
Good for you~~

2013年10月7日 星期一

[Mahout] Big Data(3) 天殺的馬浩如何跑kmeans??

最近在研究用Mahout跑Kmeans clustering

個人認為眉眉角角很多

網路上找得到的幾乎都是文檔分群的範例

讓在嘗試的時候非常吃力


請看步驟:




1. 轉檔(.csv => sequence file)

個人認為這裡很關鍵

mahout裡似乎是沒有可以直接用的commend line

必須要自己寫JAVA轉檔

這裡附上我的JAVA code

獻醜了

import java.io.*;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
import org.apache.mahout.math.*;
 
 
public class Convert2Seq {
 
  public static void main(String args[]) throws Exception{
  
        //轉好的檔案命名為point-vector
        String output = "~/point-vector";
        FileSystem fs = null;
        SequenceFile.Writer writer;
        Configuration conf = new Configuration();
        fs = FileSystem.get(conf);
        Path path = new Path(output);
        writer = new SequenceFile.Writer(fs, conf, path, Text.class, VectorWritable.class);
        
        //所有的csv檔都在factory路徑底下
        File folder = new File("~/factory");
        File[] listOfFiles = folder.listFiles();
            
        //分批把factor路徑底下的csv轉成name-vector格式之後寫進point-vector裡面
        for (int i=0; i<listOfFiles.length; i++){
        
            String input = listOfFiles[i].toString();
        
            VectorWritable vec = new VectorWritable();
            try {
                FileReader fr = new FileReader(input);
                BufferedReader br = new BufferedReader(fr);
                String s = null;
                while((s=br.readLine())!=null){    
                    String spl[] = s.split(",");
                    String key = spl[0];
                    Integer val = 0;
                    double[] colvalues = new double[1000];
                    for(int k=1;k<spl.length;k++){
                                colvalues[val] = Double.parseDouble(spl[k]);
                                val++;
                    }
                    NamedVector nmv = new NamedVector(new DenseVector(colvalues),key);
                    vec.set(nmv);
                    writer.append(new Text(nmv.getName()), vec);
                }
            } catch (Exception e) {
                System.out.println("ERROR: "+e);}
       }writer.close();
    }    
}

接下來在compile的時候要記得指定classpath

javac -classpath/{hadoop_home}/hadoop-core-0.20.2-cdh3u1.jar
    :/{mahout_home}/{mahout_version}/core/target/mahout-core-0.5.jar
        :/{mahout_home}/{mahout_version}/core/target/mahout-core-0.5-job.jar
        :/{mahout_home}/{mahout_version}/core/target/mahout-core-0.5-sources.jar
        :/{mahout_home}/{mahout_version}/math/target/mahout-math-0.5.jar
        :/{mahout_home}/{mahout_version}/math/target/mahout-math-0.5-sources.jar
        :/{mahout_home}/{mahout_version}/math/target/mahout-math-0.5-tests.jar Convert2Seq.java
 
 
java -Djava.library.path=/{hadoop_home}/lib/native/Linux-amd64-64 
     -cp .:/usr/local/hadoop-0.20.2-cdh3u1/hadoop-core-0.20.2-cdh3u1.jar
     :/{mahout_home}/{mahout_version}/core/target/mahout-core-0.5.jar
     :/{mahout_home}/{mahout_version}/core/target/mahout-core-0.5-job.jar
     :/{mahout_home}/{mahout_version}/core/target/mahout-core-0.5-sources.jar
     :/{mahout_home}/{mahout_version}/math/target/mahout-math-0.5.jar
     :/{mahout_home}/{mahout_version}/math/target/mahout-math-0.5-sources.jar
     :/{mahout_home}/{mahout_version}/math/target/mahout-math-0.5-tests.jar Convert2Seq

基本上這樣就可以了

忍不住再怨一下,轉這個超煩!

2. Canopy clustering

在做kmeans的時候,mahout會要求要有initial cluster

mahout裡的canopy cluster就可以幫我們生出來


$mahout canopy 
-i point-vector 
-o center-vector #結果存在center-vector裡 
-dm org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure 
-t1 500 -t2 250 
-ow 
-cl

然後就可以用他的結果(就是center-vector啦!)做kmeans clustering了~

t1、t2參數的設定和相關原理請參考線上mahout文件


3. Kmeans clustering

input是轉好的point-vector data
center是canopy生出來的initial clusters
output就取一個自己喜歡的名字囉~

$mahout kmeans 
-i point-vector 
-c center-vector 
-o clusters 
-dm org.apache.mahout.common.distance.SquaredEuclideanDistanceMeasure 
-x 20 
-cl 
-k 15 
-ow


4. dumping clustering result

跑完之後

結果是存在~clusters/clusterPoints裡面

因為是sequence file格式,得動用到seqdumper把他轉成看得懂的格式

檔案通常不止一個,用for-loop打他們通通打回原形~~~

typeset -i i
let i=1
for file in `$hadoop fs -ls clusters/clusteredPoints | grep 'part' | awk '{print $8}'`
do
 
 $mahout seqdumper -s $file -o ~/$i.txt
 let i++
done

2013年10月2日 星期三

[無關程式] AIC和統計建模


這篇是我最近在讀這本書的一些些筆記,主題是AIC,除非你這個人夠無聊又極富耐性,不然請慎入(是說這個網誌除了我以外大概也沒有其他人會來吧吧吧吧)

沒耐性的就早早洗洗睡了吧

AIC就是Akaike Information Criterion的縮寫,在使用各種常見的統計方法進行建模時,常常都會用它來作為評估模型好壞的指標

顧名思義,這個指標是由一個叫做Akaike的人提出的,基本上不要試著用英文去唸這個字,因為他是日本人。這個人的名字應該要唸成『阿-咖-咿-擠』

噢扯遠了

言歸正傳,AIC基本上長這樣:

$-2log(L)+2p $

其中$log(L)$就是的log-likelihood然後$p$是參數的數目


先來看看看最大概似估計(maximum likelihood estimator)吧:

假設我們現在有一筆資料$x_n$,我們假設有一個模型$f(x|\theta)$是這筆資料最好的描述,我們必須解出$\theta$!

maximum likelihood estimator幫我們找到的是這樣的解 $\widehat{\theta}_n$:

$\widehat{\theta}_n = \operatorname*{arg\,max}_{\theta  \in  \Theta } log\big(f(x_n|\theta)\big)$

然而,夠大的log-likelihood就行了嗎?其實不然,在maximum likelihood estimate的架構下,一味追求把$log\big(f(x_n|\theta)\big)$變大其實會帶來非常不合理的結果

舉個非常簡單的例子,線性迴歸模型(linear regression model):

$Y=\bf{X^T} \bf{\beta}+\epsilon$       $\epsilon\sim N(0,\sigma^2)$

likelihood function長這樣:


$f(y_i|\bf{x_i};\beta) = \frac{1}{2\pi\sigma^2} exp\big\{ -\frac{1}{2\sigma^2}(y_i-\bf{x^T_i} \bf{\beta})^2\big\}$


log-likelihood如下:


$l(\widehat{\beta}) = \sum_{i=1}^n logf(y_i|x_i;\beta) $

                                        $= - \frac{n}{2} 2log(\pi \sigma^2)  -\frac{1}{2\sigma^2} \sum_{i=1}^n(y_i-\bf{x^T_i}\beta)^2 $ 


如果我們的目的只是要讓$l(\widehat{\beta})$變小,我們就只要無止盡的增加$\bf{x^T_i}\beta $的維度,就可以讓每個$y_i-(x^T_{i1} \beta_1+x^T_{i2}\beta_2+x^T_{i3}\beta_3+...)$變得超小。但是這樣反而會使model裡有太多沒用的預測子,造成over-fitting的問題


因此,單靠夠大的log-likelihood是不夠的!!!


接下來插播Kullback-Leibler Divergence ($KL_{D}$):

$KL_{D}$是一個表示兩個pdf距離的量,定義是:


$KL_{D} =\int g(z)\frac{g(z)}{f(z)} dx$

其中$g(z)$是true probability 
$f(z)$則是model

其實也就G(Z)是真實pdf的情況下,一個log-likehood的概念

$E_{G}  \left[log \frac{G(Z)}{F(Z)} \right]$


啊這到底和統計建模有筍磨關係?


$KL_{D}$、統計建模和log-likelihood

其實,統計建模的精神就是,我們永遠不知道真實的分配$G(Z)$長怎樣,所以我們就去創造一個叫做$F(Z)$的model來模擬$G(Z)$。換句話說,$F(Z)$和$G(Z)$越接近,我們對$F(Z)$就越有把握

所以我們希望$KL_{D}\big(G,F\big)$很小,最好小到不能再小,這樣就表示我們的$F(Z)$非常完美

$KL_{D}\big(G,F\big)= E_{G}\left[\textrm{log} \frac{G(Z)}{F(Z)} \right]$

$=E_{G}\left[\textrm{log} g(Z)\right]-E_{G}\left[\textrm{log}f(Z)\right]$

$E_{G}\left[\textrm{log} g(Z)\right]$是一個常數,基本上我們對他無能為力。我們只能盡量讓$E_{G}\left[\textrm{log}f(Z)\right]$變大,只要$E_{G}\left[\textrm{log}f(Z)\right]$夠大,就可以讓$KL_{D}\big(G,F\big)$就夠小,我們的model就越完美!

所以,請記得,越好的model的可以使得

$E_{G}\left[\textrm{log}f(Z)\right]$夠大!



log_likelihood的誤差估計

我們回到熟悉的log-likelihood:

$l(\widehat{\theta}) = \sum_i^n \textrm{log}f(x_i|\widehat{\theta})$

$\frac{1}{n} \sum_i^n \textrm{log}f(x_i|\widehat{\theta}) \rightarrow \int \textrm{log}f(z|\widehat{\theta})d \widehat{G}(z) = E_\widehat{G}\left[{log}f(Z|\widehat{\theta})\right]$

所以照理來說,$l(\widehat{\theta})$應該是$nE_\widehat{G}[{log}f(Z|\widehat{\theta})]$)的估計值,但是因為使用了同一筆資料兩次(先估計$\theta$再用來估計$nE_\widehat{G}[{log}f(Z|\widehat{\theta})]$)使得這樣的估計出現了偏誤(bias)

我們把這個bias寫成

$b(G) = E_G( \bf{x_n})\left[\textrm{log}f(\bf{X_n}|\widehat{\theta}(\bf{X_n}))-nE_G(z) \left[\textrm{log} f(Z|\widehat{\theta}(\bf{X_n}))\right]\right]$

假設當$n\rightarrow\infty$時$\theta_n$會逼近真實的值$\theta_0$

我們把剛剛的$b(G)$改寫一下,改成$D1$、$D2$和$D3$的相加


$b(G) = E_G( \bf{x_n} )\left[\textrm{log}f(\bf{X_n}|\widehat{\theta}(\bf{X_n}))-nE_G(z) \left[\textrm{log}f(Z|\widehat{\theta}(\bf{X_n}))\right]\right]$

$=E_G( \bf{x_n})\left[\textrm{log}f(\bf{X_n}| \widehat{\theta}( \bf{X_n} ))-\textrm{log}f(\bf{X_n}|\theta_0)\right]...D1$

$+E_G( \bf{x_n})\left[\textrm{log}f(\bf{X_n}|\theta_0)-nE_G(z) [\textrm{log}f(Z|\theta_0)]\right]...D2$

$E_G( \bf{x_n})\left[nE_G(z) \left[\textrm{log}f(Z|\theta_0)\right]-nE_G(z) [\textrm{log}f(Z|\widehat{\theta}(\bf{X_n}))]\right]...D3$

計算$D1$:

先把$logf(\bf{X_n}|\theta_0)$寫成$l(\theta_0)$

然後搬出泰勒展開式

$l(\theta_0) \sim l(\widehat{\theta}) + (\theta_0-\widehat{\theta})^T\frac{\partial l(\widehat{\theta})}{\partial \theta} + \frac{1}{2}(\theta_0-\widehat{\theta})^T\frac{\partial^2 l(\widehat{\theta})}{\partial \theta \partial \theta^T} (\theta_0-\widehat{\theta})$

因為maximum likelihood estimator基本上就是在找可以使$\frac{\partial l(\widehat{\theta})}{\partial \theta}$為0的$\theta$,所以

$(\theta_0-\widehat{\theta})^T\frac{\partial l(\widehat{\theta})}{\partial \theta}=0$

maximum likelihood estimator在大數法則下:

$\frac{\partial^2 l(\widehat{\theta})}{\partial \theta\partial \theta^T} \rightarrow nJ(\theta_0)$

然後$D1$就可以寫成

$E_G(x_n)\left[l(\widehat{\theta}) - l(\theta)\right]$

$=\frac{n}{2}E_G(x_n)\left[(\theta_0-\widehat{\theta})J(\theta_0)(\theta_0-\widehat{\theta})^T\right]$

$=\frac{1}{2}tr\big\{I(\theta_0)J(\theta_0)^{-1}\big\}$

其中:

$I(\theta_0)=\int g(x)\frac{\partial \textrm{log}f(x|\theta)}{\partial \theta}\frac{\partial \textrm{log}f(x|\theta)}{\partial \theta^T}dx$

$J(\theta_0)=\int g(x)\frac{\partial^2 \textrm{log}f(x|\theta)}{\partial \theta \partial \theta^T}dx$


計算$D2$:

$E_G( \bf{x_n})\left[\textrm{log}f(\bf{X_n}|\theta_0)-nE_G(z) [\textrm{log}f(Z|\theta_0)]\right]$

$=E_G( \bf{x_n})\left[\sum_{i=1}^n\textrm{log}f(\bf{X_i}|\theta_0)\right]-nE_G(z)\left[ [\textrm{log}f(Z|\theta_0)]\right]$

$=0$


計算$D3$:

先把$nE_G(z) \left[\textrm{log}f(Z|wide_theta{\theta})\right]$寫成$\eta(\widehat{\theta})$

然後再度搬出泰勒展開式:

$\eta(\widehat{\theta}) \sim \eta(\theta_0) + \sum_{i=1}^n(\widehat{\theta}_i-\theta_i^{(0)})\frac{\partial \eta(\theta_0)}{\partial \theta_i} +\frac{1}{2}\sum_{i=1}^n\sum_{j=1}^n(\widehat{\theta}_i-\theta_i^{(0)})(\widehat{\theta}_j-\theta_j^{(0)})\frac{\partial^2 \eta(\theta_0)}{\partial \theta_i \partial \theta_j}$


和在計算$D1$時一樣,

$\frac{\partial \eta(\theta_0)}{\partial \theta_i}=E_G(z)\left[\frac{\partial}{\partial theta_i} \textrm{log} f(Z|\theta)\right|\theta_0] = 0,$   $i=1,2,...n$

因此

$\eta(\widehat{\theta}) \sim \eta(\theta_0) - \frac{1}{2}(\theta_0-\widehat{\theta})J(\theta_0)(\theta_0-\widehat{\theta})^T$

$D3$就可以寫成

$nE_G(z) \left[\eta(\theta_0)-\eta(\widehat{\theta})\right]$

$=nE_G(z) \left[\frac{1}{2}(\theta_0-\widehat{\theta})J(\theta_0)(\theta_0-\widehat{\theta})^T\right]$

$=\frac{n}{2}tr \big\{J(\theta_0)E_G(z) \left[(\theta_0-\widehat{\theta})(\theta_0-\widehat{\theta})^T\right]\big\}$

$=\frac{1}{2}tr\big\{I(\theta_0)J(\theta_0)^{-1}\big\}$


現在我們把$D1$、$D2$和$D3$加起來

$b(G)=\frac{1}{2}tr\big\{I(\theta_0)J(\theta_0)^{-1}\big\}+0+\frac{1}{2}tr\big\{I(\theta_0)J(\theta_0)^{-1}\big\}$

$=tr\big\{I(\theta_0)J(\theta_0)^{-1}\big\}$


現在我們回過頭來看看$I(\theta_0)$和$J(\theta_0)$

$-J(\theta_0)=\int g(x)\frac{\partial^2 \textrm{log}f(x|\theta)}{\partial \theta \partial \theta^T}dx$

$=E_G\left[\frac{1}{f(x|\theta)}\frac{\partial^2 }{\partial \theta \partial \theta^T}f(x|\theta)\right]-E_G\left[\frac{\partial}{\partial \theta_i} \textrm{log}f(x|\theta) \frac{\partial}{\partial \theta_j }\textrm{log} f(x|\theta)\right]$

當$g(x)=f(x|\theta_0)$時

$E_G\left[\frac{1}{f(x|\theta)}\frac{\partial^2 }{\partial \theta \partial \theta^T}f(x|\theta)\right]$

$=\int \frac{\partial^2}{\partial \theta_i \partial \theta_j} f(x|\theta_0)dx$

$=\frac{\partial^2}{\partial \theta_i \partial \theta_j}\int  f(x|\theta_0)dx=0$

這個時候$I(\theta_0)$就會等於$J(\theta_0)$

$b(G) =tr\big\{I(\theta_0)J(\theta_0)^{-1}\big\}$

$=tr(I_p) = p$


AIC、log-likelihood和$b(G)$

AIC其實就是

$-2(\textrm{log-likelihood})+2(\textrm{estimator of }b(G))$

因此AIC越小表示model越好

log-likelihood和$b(G)$代入就完成了!

$AIC = -2log(L)+2p$

參考資料:

Konishi, Sadanori, Kitagawa, Genshiro (2007). Information criteria and statistical modeling. Springer