冰島,隨便一拍都可以當啤酒廣告。沒錯,我在炫耀。 |
劇情設定是這樣的,假設我有三個檔案在hdfs上,分別是temp/input_1.txt, temp/input_2.txt和 temp/input_3.txt,各自格式都不一樣,然後我想要計算各科的平均分數。
$ hdfs dfs -cat temp/input_1.txt
WUJ-360100;math;56
WPY-802007;math;98
FKT-670008;science;67
$ hdfs dfs -cat temp/input_2.txt
{Number:FJB-004150, Subject:math, Score:96}
{Number:QDG-300700, Subject:chinese, Score:90}
{Number:JVY-030140, Subject:chinese, Score:71}
$ hdfs dfs -cat temp/input_3.txt
[Number=>ITM-501806; Subject=>science; Score=>82]
[Number=>QBE-003981; Subject=>math; Score=>85]
[Number=>EUJ-017009; Subject=>chinese; Score=>63]
相較於Hadoop的JAVA file總是要寫得的叨叨噓噓,Spark明顯親民多了,根本就不用什麼Multipleinputs啊啊啊很棒!所有的程式碼只有這樣:
import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.SparkConf import org.apache.hadoop.fs.FileSystem object MultiInput { def main(args: Array[String]) { val conf = new SparkConf().setAppName("Multiple Input Example") val hadoopConf = new org.apache.hadoop.conf.Configuration() val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf) val inPath1 = new org.apache.hadoop.fs.Path(args(0)) val inPath2 = new org.apache.hadoop.fs.Path(args(1)) val inPath3 = new org.apache.hadoop.fs.Path(args(2)) val outPath = new org.apache.hadoop.fs.Path(args(3)) if(fs.exists(outPath)){ fs.delete(outPath) } val sc = new SparkContext(conf) val file1 = sc.textFile(inPath1.toString) var rdd1 = file1.map(line => (line.split(";")(1), line.split(";")(2))) val file2 = sc.textFile(inPath2.toString) var rdd2 = file2.filter(_.contains(",")).map(s => ( s.split(",")(1).split(":")(1), s.split(",")(2).split(":")(1).replace("}", ""))) val file3 = sc.textFile(inPath3.toString) var rdd3 = file3.map(s => ( s.split(";")(1).split("=>")(1), s.split(";")(2).split("=>")(1).replace("]", ""))) val rdd = rdd1.union(rdd2).union(rdd3) val res = rdd.combineByKey( (x: String) => (x.toInt, 1), (acc:(Int, Int), x) => (acc._1.toInt + x.toInt, acc._2.toInt + 1), (acc1:(Int, Int), acc2:(Int, Int)) => ( acc1._1.toInt + acc2._1.toInt, acc1._2.toInt + acc2._2.toInt) ).map(k => (k._1, k._2._1.toDouble / k._2._2.toDouble)) res.saveAsTextFile(outPath.toString) } }
然後把以上程式碼存成一個叫做MultiInput.scala的檔案。
之後進入打包流程,就比較麻煩了。
首先先建立一個打包需要的資料夾,這裡就姑且叫做multiInput吧!
$ mkdir ./multiInput
$ mkdir -p ./multiInput/src/main/scala
然後把剛剛的寫好的那的scala檔案丟到/src/main/scala目錄下$ mv MultiInput.scala ./multiInput/src/main/scala/MultiInput.scala
創建一個叫做multiInput.sbt的檔案(極重要!)
$ vim ./multiInput/simple.sbt
內容如下:
$ name := "Multiple Input"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
然後進到資料夾multiInput進行一個打包的動作(要先安裝好sbt喔)
$ cd multiInput
$ sbt package
成功打包以後,會在target/scala-2.10底下搞到一個打包好的.jar檔案
$ ls target/scala-2.10/
classes multiple-input_2.10-1.0.jar
submit spark job的方法如下
$ spark-submit --class "MultiInput" target/scala-2.10/multiple-input_2.10-1.0.jar \
temp/input_1.txt temp/input_2.txt temp/input_3.txt temp/output_mp
路徑記得要打對!路徑記得要打對!路徑記得要打對!很重要!跑完之後就可以到hdfs上看結果了
$ hdfs dfs -getmerge temp/output_mp sp_test
$ cat sp_test
(math,83.75)
(chinese,74.66666666666667)
(science,74.5)
沒有留言:
張貼留言