case class ApacheAccessLog(
ipAddress: String, // IP地址
clientId: String, // 客户端唯一标识符
userId: String, // 用户唯一标识符
serverTime: String, // 服务器时间
method: String, // 请求类型/方式
endpoint: String, // 请求的资源
protocol: String, // 请求的协议名称
responseCode: Int, // 请求返回值:比如:200、401
contentSize: Long // 返回的结果数据大小
) {
}
object ApacheAccessLog {
// regex
// 64.242.88.10 - - [ 07/Mar/2004:16:05:49 -0800 ]
// "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1"
// 401 12846
val PARTTERN =
"""^(\S+) (\S+) (\S+) \[([\w:/]+\s[+|-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r
def isValidatelogLine(log: String): Boolean = {
val res = PARTTERN.findFirstMatchIn(log)
if (res.isEmpty) {
false
} else {
true
}
}
def parseLogLine(log: String): ApacheAccessLog = {
val res = PARTTERN.findFirstMatchIn(log)
if (res.isEmpty) {
throw new RuntimeException("Cannot parse log line: " + log)
}
val m = res.get
ApacheAccessLog(
m.group(1),
m.group(2),
m.group(3),
m.group(4),
m.group(5),
m.group(6),
m.group(7),
m.group(8).toInt,
m.group(9).toLong
)
}
}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}
/**
* Created by Administrator on 2017/4/25.
*/
object LogAnalysis {
def main(args: Array[String]): Unit = {
//sqlContext
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("log-analysis-sparksql")
val sc = SparkContext.getOrCreate(conf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._ //如果不写,下面的转换不成功
//transform
val path = "E:\\newcode\\MyFirstProject\\data\\test.log"
val rdd = sc.textFile(path)
val apacheAccessDataFrame = rdd
.filter(line => ApacheAccessLog.isValidatelogLine(line))
.map(line => {
ApacheAccessLog.parseLogLine(line)
}).cache().toDF() //rdd转换为DataFrame
//register temptable
apacheAccessDataFrame.registerTempTable("log_analysis_temp_table")
sqlContext.sql("select * from log_analysis_temp_table limit 1").show()
//需求一:求contentSize的平均值,最大值以及最小值
val resultDataFrame1 = sqlContext.sql(
"""
|SELECT
|AVG(contentSize) as avg_contentSize,
|MAX(contentSize) as max_contentSize,
|MIN(contentSize) as min_contentSize
|FROM log_analysis_temp_table
""".stripMargin
)
resultDataFrame1.show()
//save //save as HDFS
val resultRdd = resultDataFrame1.map(row => {
val avgSize = row.getAs[Double]("avg_contentSize")
val minSize = row.getAs[Long]("min_contentSize")
val maxSize = row.getAs[Long]("max_contentSize")
(avgSize, minSize, maxSize)
})
resultRdd.rdd.saveAsTextFile(s"E:/newcode/MyFirstProject/data/output/sql_${System.currentTimeMillis()}")
//需求二:求各个返回值出现的数据个数
val resultDataFrame2 = sqlContext.sql(
"""
|SELECT
|responseCode AS code,
|COUNT(1) AS count
|FROM log_analysis_temp_table
|GROUP BY responseCode
""".stripMargin
)
resultDataFrame2.show()
resultDataFrame2.repartition(1).write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\output\\responseCode")
//需求三:求访问次数大于N的IP地址,并对黑名单进行限制
val blackIP = Array("200-55-104-193.ds1.prima.net.ar", "10.0.0.153", "208-38-57-205.ip.cal.radiant.net")
val N = 10
val resultDataFrame3 = sqlContext.sql(
s"""
|SELECT
|ipAddress AS ip,
|COUNT(1) AS count
|FROM log_analysis_temp_table
|WHERE not(ipAddress in(${blackIP.map(ip => s"'${ip}'").mkString(",")}))
|GROUP BY ipAddress
|HAVING count>${N}
""".stripMargin)
resultDataFrame3.show()
//需求四:求访问次数最多的前k个endpoint的值
val k = 50
val resultDataFrame4 = sqlContext.sql(
s"""
|SELECT
| t.endpoint,
| t.count
|FROM(
|SELECT
| endpoint,
| COUNT(1) AS count
|FROM log_analysis_temp_table
|GROUP BY endpoint) t
|ORDER BY t.count DESC
|limit ${k}
""".stripMargin)
resultDataFrame4.show()
resultDataFrame4.repartition(1).write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\output\\maxendpoint")
}
}
更多spark实例请看:Spark实例精选
原文链接:https://blog.csdn.net/linweidong/article/details/85060575
本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。
评论前必须登录!
立即登录