纵有疾风起
人生不言弃

使用Spark SQL来分析网络日志

case class ApacheAccessLog(
                            ipAddress: String, // IP地址
                              clientId: String, // 客户端唯一标识符
                              userId: String, // 用户唯一标识符
                              serverTime: String, // 服务器时间
                              method: String, // 请求类型/方式
                              endpoint: String, // 请求的资源
                              protocol: String, // 请求的协议名称
                              responseCode: Int, // 请求返回值:比如:200、401
                              contentSize: Long // 返回的结果数据大小
                          ) {

}

object ApacheAccessLog {
  // regex
  // 64.242.88.10 - - [   07/Mar/2004:16:05:49 -0800       ]
  // "GET /twiki/bin/edit/Main/Double_bounce_sender?topicparent=Main.ConfigurationVariables HTTP/1.1"
  // 401 12846
  val PARTTERN =
  """^(\S+) (\S+) (\S+) \[([\w:/]+\s[+|-]\d{4})\] "(\S+) (\S+) (\S+)" (\d{3}) (\d+)""".r

  def isValidatelogLine(log: String): Boolean = {
    val res = PARTTERN.findFirstMatchIn(log)
    if (res.isEmpty) {
      false
    } else {
      true
    }

  }

  def parseLogLine(log: String): ApacheAccessLog = {
    val res = PARTTERN.findFirstMatchIn(log)
    if (res.isEmpty) {
      throw new RuntimeException("Cannot parse log line: " + log)
    }
    val m = res.get

    ApacheAccessLog(
      m.group(1),
      m.group(2),
      m.group(3),
      m.group(4),
      m.group(5),
      m.group(6),
      m.group(7),
      m.group(8).toInt,
      m.group(9).toLong
    )
  }
}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkContext, SparkConf}

/**
  * Created by Administrator on 2017/4/25.
  */
object LogAnalysis {
  def main(args: Array[String]): Unit = {
    //sqlContext
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("log-analysis-sparksql")
    val sc = SparkContext.getOrCreate(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._ //如果不写,下面的转换不成功

    //transform
    val path = "E:\\newcode\\MyFirstProject\\data\\test.log"
    val rdd = sc.textFile(path)
    val apacheAccessDataFrame = rdd
      .filter(line => ApacheAccessLog.isValidatelogLine(line))
      .map(line => {
        ApacheAccessLog.parseLogLine(line)
      }).cache().toDF() //rdd转换为DataFrame

    //register temptable
    apacheAccessDataFrame.registerTempTable("log_analysis_temp_table")
      sqlContext.sql("select * from log_analysis_temp_table limit 1").show()

      //需求一:求contentSize的平均值,最大值以及最小值
      val resultDataFrame1 = sqlContext.sql(

      """
        |SELECT
        |AVG(contentSize) as avg_contentSize,
        |MAX(contentSize) as max_contentSize,
        |MIN(contentSize) as min_contentSize
        |FROM log_analysis_temp_table
      """.stripMargin
      )
      resultDataFrame1.show()

      //save                                         //save as HDFS
      val resultRdd = resultDataFrame1.map(row => {
        val avgSize = row.getAs[Double]("avg_contentSize")
        val minSize = row.getAs[Long]("min_contentSize")
        val maxSize = row.getAs[Long]("max_contentSize")
        (avgSize, minSize, maxSize)
      })
      resultRdd.rdd.saveAsTextFile(s"E:/newcode/MyFirstProject/data/output/sql_${System.currentTimeMillis()}")


    //需求二:求各个返回值出现的数据个数
    val resultDataFrame2 = sqlContext.sql(
      """
        |SELECT
        |responseCode AS code,
        |COUNT(1) AS count
        |FROM log_analysis_temp_table
        |GROUP BY responseCode
      """.stripMargin
    )
    resultDataFrame2.show()
    resultDataFrame2.repartition(1).write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\output\\responseCode")


    //需求三:求访问次数大于N的IP地址,并对黑名单进行限制
    val blackIP = Array("200-55-104-193.ds1.prima.net.ar", "10.0.0.153", "208-38-57-205.ip.cal.radiant.net")
    val N = 10
    val resultDataFrame3 = sqlContext.sql(
      s"""
         |SELECT
         |ipAddress AS ip,
         |COUNT(1) AS count
         |FROM log_analysis_temp_table
         |WHERE not(ipAddress in(${blackIP.map(ip => s"'${ip}'").mkString(",")}))
         |GROUP BY ipAddress
         |HAVING count>${N}
     """.stripMargin)
    resultDataFrame3.show()


    //需求四:求访问次数最多的前k个endpoint的值
    val k = 50
    val resultDataFrame4 = sqlContext.sql(
      s"""
         |SELECT
         |  t.endpoint,
         |  t.count
         |FROM(
         |SELECT
         |  endpoint,
         |  COUNT(1) AS count
         |FROM log_analysis_temp_table
         |GROUP BY endpoint) t
         |ORDER BY t.count DESC
         |limit ${k}
      """.stripMargin)
    resultDataFrame4.show()
    resultDataFrame4.repartition(1).write.format("com.databricks.spark.csv").save("E:\\newcode\\MyFirstProject\\data\\output\\maxendpoint")

  }
}

更多spark实例请看:Spark实例精选

原文链接:https://blog.csdn.net/linweidong/article/details/85060575

本站声明:网站内容来源于网络,如有侵权,请联系我们,我们将及时处理。

未经允许不得转载:起风网 » 使用Spark SQL来分析网络日志
分享到: 生成海报

评论 抢沙发

评论前必须登录!

立即登录