如何进行Spark大数据分析,很多新手对此不是很清楚,可以做一下这个练习。为什么要学习Spark呢,因为我们性能测试的需求要造10亿级甚至更多的数据。普通的方式肯定不行了,得用到spark提交到yarn上运行才跑的动。所以现在我们来学习一下大数据方面的东西。同时大数据也是人工智能的基础,现在搞搞大数据的东西,也为以后讨论人工智能方面的测试做做铺垫吧。

万事开头难,我刚接触大数据的那会是天天的一脸懵逼。因为以前只跟数据库打过交道,对于hadoop生态圈完全是没听过的状态。看资料的时候也根本看不懂,多做一些题目就好了。

spark数据分析练习

  1. 新数据集中需 城市名称、城市总订单、城市酒店平均用户评分、城市酒店总评论数
  2. 分别计算以下三个字段的最大值和最小值(城市总订单、城市酒店平均用户评分、城市酒店总评论数)打印输出
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

import scala.collection.mutable.ArrayBuffer

object Demo04 {
  def main(args: Array[String]): Unit = {
    System.setProperty("HADOOP_USER_NAME", "root")
    val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("4")
    val sc = new SparkContext(conf)
    val hdfsUrl = "hdfs://192.168.226.129:9000"
    val filePath: String = hdfsUrl + "/file3_1/hotel_data.csv"
    val data: RDD[Array[String]] = sc.textFile(filePath).map(_.split(",")).cache()
    val total: Long = data.count()
    val dataDrop: RDD[(String, Double, Double)] = data.map { arr: Array[String] =>
      Array(arr(4), arr(10), arr(11)) // 城市,评分,评论数
    }.filter {
      arr: Array[String] =>
        try {
          arr(1).toDouble
          arr(2).toDouble
          true
        } catch {
          case e: Exception => false
        }
    }.map { arr: Array[String] =>
      (arr(0), (arr(0), arr(1).toDouble, arr(2).toDouble))
    }.reduceByKey { (tup1: (String, Double, Double), tup2: (String, Double, Double)) =>
      (tup1._1, tup1._2 + tup2._2, tup1._3 + tup2._3)
    }.map(_._2)
    val dpCount: Long = dataDrop.count()
    val data1: RDD[(String, Double, Double, Double)] = dataDrop.map {
      tup: (String, Double, Double) =>
        (tup._1, dpCount.toDouble, tup._2 / dpCount, tup._3.toDouble)
    }

    val city = Array("广州", "北京", "上海", "阿拉善盟")
    data1.filter { arr: (String, Double, Double, Double) =>
      city.contains(arr._1)
    }

    //城市总订单,平均分,总评论数
    data1.map { tup: (String, Double, Double, Double) =>
      val bf: ArrayBuffer[String] = ArrayBuffer[String]()
      tup.productIterator.foreach { e: Any =>
        bf += e.toString
      }
      bf.toArray
    }.map(_.mkString(","))
      .saveAsTextFile(hdfsUrl + "/hotelsparktask4_1")
    val arrMM = Array(Array(data1.map(_._2).max(), data1.map(_._2).min),
      Array(data1.map(_._3).max(), data1.map(_._3).min()),
      Array(data1.map(_._4).max(), data1.map(_._4).min())
    )

    println("城市总订单(最大,最小):" + arrMM(0).mkString(","))
    println("城市酒店平均用户评分(最大,最小):" + arrMM(1).mkString(","))
    println("城市酒店总评论数(最大,最小):" + arrMM(2).mkString(","))

    sc.parallelize(arrMM).map(_.mkString(","))
      .saveAsTextFile(hdfsUrl + "/hotelsparktask4_2")


    //城市,总订单,平均分,总评论数
    val rdd1 = sc.textFile(hdfsUrl + "/hotelsparktask4_1")
      .map(_.split(","))
      .map { arr => arr(0) -> Array(arr(1), arr(2), arr(3)).map(_.toDouble) }
      .map { tup =>
        val diff = arrMM.map(_.reduce(_ - _)).map { e =>
          if (e == 0) {
            1
          } else {
            e
          }
        }
        val dd = (tup._2(0) - arrMM(0)(1)) / diff(0)
        val pf = (tup._2(1) - arrMM(1)(1)) / diff(1)
        val pl = (tup._2(2) - arrMM(2)(1)) / diff(2)
        Array(tup._1,dd,pf,pl)
      }

    val reMap = rdd1.groupBy(_(0)).collectAsMap()
    println(reMap.getOrElse("广州","NaN"))
    println(reMap.getOrElse("上海","NaN"))
    println(reMap.getOrElse("北京","NaN"))

    rdd1.map(_.mkString(","))
      .saveAsTextFile(hdfsUrl+"/hotelsparktask4_3")
    sc.stop()
  }
}

hotel_data.csv

下载数据:https://download.csdn.net/download/weixin_44018458/87437211

spark数据清洗练习:https://blog.csdn.net/weixin_44018458/article/details/128980802

Logo

更多推荐