纵有疾风起
人生不言弃

Spark案例练习-UV的统计

Spark案例练习-UV的统计

2020-05-08 23:43
来源:分享电脑学习

原标题:Spark案例练习-UV的统计

关注公众号:分享电脑学习

回复”百度云盘” 可以免费获取所有学习文档的代码(不定期更新)

云盘目录说明:

tools目录是安装包

res 目录是每一个课件对应的代码和资源等

doc 目录是一些第三方的文档工具

承接上一篇文档《Spark案例练习-PV的统计》

参数说明:

Spark案例练习-UV的统计插图

继续上面的PV代码编写即可

思路:UV的计算

1.数据进行过滤清洗,获取两个字段(时间、guid)

2.guid非空,时间非空,时间字符串的长度必须大于10

3.将同一天的数据放在一起,根据guid去重,统计去重的结果

代码:

val rdd2 = rdd.map(line => line.split(“\t”))

.filter(arr => {

//保留正常数据

arr.length >=3 && arr(2).trim.nonEmpty && arr(0).trim.length > 10

})

.map(arr => {

val date = arr(0).trim.substring(0,10)

val guid = arr(2).trim

(date,guid) // (date,url)

})

继续编写代码

Spark案例练习-UV的统计插图1

有两种方式:

1. 基于groupByKey进行UV的统计

2. 基于reduceByKey实现UV的统计

先看基于groupByKey进行UV的统计

val uvRdd = rdd2.groupByKey()

.map(t => {

val date = t._1

val guids = t._2

val uv = guids.toSet.size

(date,uv)

})

println(“uv——————” + uvRdd.collect().mkString(“;”))

Spark案例练习-UV的统计插图2

Spark案例练习-UV的统计插图3

再看基于reduceByKey实现UV的统计

rdd2.map(t => {

((t._1,t._2),1)

})

.reduceByKey(_+_)

.map(_._1)

val uvRDD: RDD[(String, Int)] = rdd2.distinct()

.map(t => (t._1, 1))

.reduceByKey(_+_)

println(“uv——————” + uvRDD.collect().mkString(“;”))

Spark案例练习-UV的统计插图4

Spark案例练习-UV的统计插图5

最终指标的合并

val pvuvRdd = pvRdd.fullOuterJoin(uvRdd)

.map(t => {

val date = t._1

val pv = t._2._1.getOrElse(0) //如果有值则返回对应的值,如果无值则返回0

val uv = t._2._2.getOrElse(0)

//返回结果

(date,pv,uv)

})

Spark案例练习-UV的统计插图6

打印一下,可以看到合并的数据

Spark案例练习-UV的统计插图7

数据输出(Driver、保存HDFS上,保存到RDBMS中)

数据返回给Driver

val result = pvuvRdd.collect()

Spark案例练习-UV的统计插图8

保存到HDFS上

pvuvRdd.saveAsTextFile(s”hdfs://master:9000/data/pv_uv/${System.currentTimeMillis()}”)

Spark案例练习-UV的统计插图9

端口注意下,如果想用域名(master)就要确保在本地hosts文件配置了(win环境下)

运行一下,可以看到hdfs上有了这个文件

Spark案例练习-UV的统计插图10

保存到RDBMS中、保存到非关系型数据库中

建库建表

CREATE DATABASE spark_test;

USE spark_test;

CREATE TABLE pvuv(

`date` DATE NOT NULL,

`pv` INT(11) NOT NULL,

`uv` INT(11) NOT NULL

)ENGINE=MYISAM DEFAULT CHARSET=utf8;

编写代码

其中val conn = DriverManager.getConnection(“”,””,””)这句话是url、user和password

代码

pvuvRdd.foreachPartition(iter => {

//1. 创建数据库连接对象

//2. 创建数据输出prepareStatement对象

val conn = DriverManager.getConnection(“jdbc:mysql://localhost:3306/spark_test”,”root”,”root”)

val pstmt = conn.prepareStatement(“insert into pvuv(date,pv,uv) values(?,?,?);”)

//3. 数据迭代输出

iter.foreach(t => {

val date = t._1

val pv = t._2

val uv = t._3

pstmt.setString(1,date)

pstmt.setInt(2,pv)

pstmt.setInt(3,uv)

pstmt.executeUpdate()

})

//4. 关闭连接

conn.close()

pstmt.close()

})

Spark案例练习-UV的统计插图11

运行代码,查看数据库

Spark案例练习-UV的统计插图12返回搜狐,查看更多

责任编辑:

声明:该文观点仅代表作者本人,搜狐号系信息发布平台,搜狐仅提供信息存储空间服务。
阅读 ()

未经允许不得转载:起风网 » Spark案例练习-UV的统计
分享到: 生成海报

评论 抢沙发

评论前必须登录!

立即登录