2017年3月3日 星期五

[Scala] Big Data(6) submit spark job

這篇是這篇(默默改版了)的Spark版,就這樣而已ㄏㄏ。

冰島,隨便一拍都可以當啤酒廣告。沒錯,我在炫耀。


劇情設定是這樣的,假設我有三個檔案在hdfs上,分別是temp/input_1.txt, temp/input_2.txt和 temp/input_3.txt,各自格式都不一樣,然後我想要計算各科的平均分數。
$ hdfs dfs -cat temp/input_1.txt
WUJ-360100;math;56
WPY-802007;math;98
FKT-670008;science;67
$ hdfs dfs -cat temp/input_2.txt
{Number:FJB-004150, Subject:math, Score:96}
{Number:QDG-300700, Subject:chinese, Score:90}
{Number:JVY-030140, Subject:chinese, Score:71}
$ hdfs dfs -cat temp/input_3.txt
[Number=>ITM-501806; Subject=>science; Score=>82]
[Number=>QBE-003981; Subject=>math; Score=>85]
[Number=>EUJ-017009; Subject=>chinese; Score=>63]

相較於Hadoop的JAVA file總是要寫得的叨叨噓噓,Spark明顯親民多了,根本就不用什麼Multipleinputs啊啊啊很棒!所有的程式碼只有這樣:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.hadoop.fs.FileSystem

object MultiInput {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("Multiple Input Example")
    val hadoopConf = new org.apache.hadoop.conf.Configuration()
    val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)

    val inPath1 = new org.apache.hadoop.fs.Path(args(0))
    val inPath2 = new org.apache.hadoop.fs.Path(args(1))
    val inPath3 = new org.apache.hadoop.fs.Path(args(2))
    val outPath = new org.apache.hadoop.fs.Path(args(3))

    if(fs.exists(outPath)){
        fs.delete(outPath)
    }

    val sc = new SparkContext(conf)
    val file1 = sc.textFile(inPath1.toString)
    var rdd1 = file1.map(line => (line.split(";")(1), line.split(";")(2)))

    val file2 = sc.textFile(inPath2.toString)
    var rdd2 = file2.filter(_.contains(",")).map(s => (
        s.split(",")(1).split(":")(1), 
        s.split(",")(2).split(":")(1).replace("}", "")))

    val file3 = sc.textFile(inPath3.toString)
    var rdd3 = file3.map(s => (
        s.split(";")(1).split("=>")(1), 
        s.split(";")(2).split("=>")(1).replace("]", "")))

    val rdd = rdd1.union(rdd2).union(rdd3)

    val res = rdd.combineByKey(
        (x: String) => (x.toInt, 1),
        (acc:(Int, Int), x) => (acc._1.toInt + x.toInt, acc._2.toInt + 1),
        (acc1:(Int, Int), acc2:(Int, Int)) => (
            acc1._1.toInt + acc2._1.toInt, 
            acc1._2.toInt + acc2._2.toInt)
        ).map(k => (k._1, k._2._1.toDouble / k._2._2.toDouble))

    res.saveAsTextFile(outPath.toString)
    
    }
}

然後把以上程式碼存成一個叫做MultiInput.scala的檔案。

之後進入打包流程,就比較麻煩了。
首先先建立一個打包需要的資料夾,這裡就姑且叫做multiInput吧!
$ mkdir ./multiInput
$ mkdir -p ./multiInput/src/main/scala
然後把剛剛的寫好的那的scala檔案丟到/src/main/scala目錄下
$ mv MultiInput.scala ./multiInput/src/main/scala/MultiInput.scala
創建一個叫做multiInput.sbt的檔案(極重要!)
$ vim ./multiInput/simple.sbt
內容如下:
$ name := "Multiple Input"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
然後進到資料夾multiInput進行一個打包的動作(要先安裝好sbt喔)
$ cd multiInput
$ sbt package
成功打包以後,會在target/scala-2.10底下搞到一個打包好的.jar檔案
$ ls target/scala-2.10/
classes  multiple-input_2.10-1.0.jar
submit spark job的方法如下
$ spark-submit --class "MultiInput" target/scala-2.10/multiple-input_2.10-1.0.jar \ 
temp/input_1.txt temp/input_2.txt temp/input_3.txt temp/output_mp
路徑記得要打對!路徑記得要打對!路徑記得要打對!很重要!

跑完之後就可以到hdfs上看結果了
$ hdfs dfs -getmerge temp/output_mp sp_test
$ cat sp_test
(math,83.75)
(chinese,74.66666666666667)
(science,74.5)