新建scala工程
添加Spark Jar包
新建如下格式的目录/src/main/scala
编写代码
package SparkDemo002
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
/** * Created by liguodong on 12/22/15. */
object JoinDemo {
def main(args: Array[String]) {
if(args.length<2){
System.err.println("Usage:JoinDemo <file1> <file2>")
System.exit(1)
}
val conf = new SparkConf().setAppName("JoinDemo")
val sc = new SparkContext(conf)
val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
case class Register (d: java.util.Date, uuid: String, cust_id: String, lat: Float,lng: Float)
case class Click (d: java.util.Date, uuid: String, landing_page: Int)
//"hdfs://localhost:9000/liguodong/join/reg.tsv"
val reg = sc.textFile(args(0)).map(_.split("\t")).map(r =>
(r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat)))
//"hdfs://localhost:9000/liguodong/join/clk.tsv"
val clk = sc.textFile(args(1)).map(_.split("\t")).map(c =>
(c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt)))
reg.join(clk).take(2).foreach(println)
sc.stop()
}
}
package SparkDemo002
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._
/** * Created by liguodong on 12/22/15. */
object SogouQA {
def main(args: Array[String]) {
if(args.length<2){
System.err.println("Usage:SogouQA <file1> <file2>")
System.exit(1)
}
val conf = new SparkConf().setAppName("SogouQA")
val sc = new SparkContext(conf)
sc.textFile(args(0)).map(_.split("\t")).filter(_.length==0)
.map(x=>(x(1),1)).reduceByKey(_+_)
.map(x=>(x._2,x._1)).sortByKey(false)
.map(x=>(x._2,x._1)).saveAsTextFile(args(1))
sc.stop()
}
}
手动Artifacts
Build Artifacts
运行
bin/spark-submit --master spark://ubuntu:7077
--class SparkDemo002.JoinDemo --executor-memory 2g
Demo.jar hdfs://localhost:9000/liguodong/join/reg.tsv
hdfs://localhost:9000/liguodong/join/clk.tsv
----------------------
hadoop2@ubuntu:/liguodong/software/spark$ bin/spark-submit --master spark://ubuntu:7077
--class SparkDemo002.SogouQA --executor-memory 3g
Demo.jar hdfs://localhost:9000/liguodong/SogouQ1.txt.tar.gz hdfs://localhost:9000/liguodong/output1
//如果数据了多个文件,我们可以先进行merge
hadoop2@ubuntu:/liguodong/software/hadoop$ bin/hdfs dfs -getmerge hdfs://localhost:9000/liguodong/output1 /home/hadoop2/result
hadoop2@ubuntu:/liguodong/software/hadoop$ ll /home/hadoop2/
//查看前10行记录
hadoop2@ubuntu:/liguodong/software/hadoop$ head /home/hadoop2/result
(b3c94c37fb154d46c30a360c7941ff7e,676)
(cc7063efc64510c20bcdd604e12a3b26,613)
(955c6390c02797b3558ba223b8201915,391)
(b1e371de5729cdda9270b7ad09484c4f,337)
(6056710d9eafa569ddc800fe24643051,277)
(637b29b47fed3853e117aa7009a4b621,266)
(c9f4ff7790d0615f6f66b410673e3124,231)
(dca9034de17f6c34cfd56db13ce39f1c,226)
(82e53ddb484e632437039048c5901608,221)
(c72ce1164bcd263ba1f69292abdfdf7c,214)
原文链接:https://blog.csdn.net/scgaliguodong123_/article/details/50382331
本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。
还没有人抢沙发呢~