spark数据分析练习
新数据集中需 城市名称、城市总订单、城市酒店平均用户评分、城市酒店总评论数。分别计算以下三个字段的最大值和最小值(城市总订单、城市酒店平均用户评分、城市酒店总评论数)打印输出
·
如何进行Spark大数据分析,很多新手对此不是很清楚,可以做一下这个练习。为什么要学习Spark呢,因为我们性能测试的需求要造10亿级甚至更多的数据。普通的方式肯定不行了,得用到spark提交到yarn上运行才跑的动。所以现在我们来学习一下大数据方面的东西。同时大数据也是人工智能的基础,现在搞搞大数据的东西,也为以后讨论人工智能方面的测试做做铺垫吧。
万事开头难,我刚接触大数据的那会是天天的一脸懵逼。因为以前只跟数据库打过交道,对于hadoop生态圈完全是没听过的状态。看资料的时候也根本看不懂,多做一些题目就好了。
spark数据分析练习
- 新数据集中需 城市名称、城市总订单、城市酒店平均用户评分、城市酒店总评论数
- 分别计算以下三个字段的最大值和最小值(城市总订单、城市酒店平均用户评分、城市酒店总评论数)打印输出
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ArrayBuffer
object Demo04 {
def main(args: Array[String]): Unit = {
System.setProperty("HADOOP_USER_NAME", "root")
val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("4")
val sc = new SparkContext(conf)
val hdfsUrl = "hdfs://192.168.226.129:9000"
val filePath: String = hdfsUrl + "/file3_1/hotel_data.csv"
val data: RDD[Array[String]] = sc.textFile(filePath).map(_.split(",")).cache()
val total: Long = data.count()
val dataDrop: RDD[(String, Double, Double)] = data.map { arr: Array[String] =>
Array(arr(4), arr(10), arr(11)) // 城市,评分,评论数
}.filter {
arr: Array[String] =>
try {
arr(1).toDouble
arr(2).toDouble
true
} catch {
case e: Exception => false
}
}.map { arr: Array[String] =>
(arr(0), (arr(0), arr(1).toDouble, arr(2).toDouble))
}.reduceByKey { (tup1: (String, Double, Double), tup2: (String, Double, Double)) =>
(tup1._1, tup1._2 + tup2._2, tup1._3 + tup2._3)
}.map(_._2)
val dpCount: Long = dataDrop.count()
val data1: RDD[(String, Double, Double, Double)] = dataDrop.map {
tup: (String, Double, Double) =>
(tup._1, dpCount.toDouble, tup._2 / dpCount, tup._3.toDouble)
}
val city = Array("广州", "北京", "上海", "阿拉善盟")
data1.filter { arr: (String, Double, Double, Double) =>
city.contains(arr._1)
}
//城市总订单,平均分,总评论数
data1.map { tup: (String, Double, Double, Double) =>
val bf: ArrayBuffer[String] = ArrayBuffer[String]()
tup.productIterator.foreach { e: Any =>
bf += e.toString
}
bf.toArray
}.map(_.mkString(","))
.saveAsTextFile(hdfsUrl + "/hotelsparktask4_1")
val arrMM = Array(Array(data1.map(_._2).max(), data1.map(_._2).min),
Array(data1.map(_._3).max(), data1.map(_._3).min()),
Array(data1.map(_._4).max(), data1.map(_._4).min())
)
println("城市总订单(最大,最小):" + arrMM(0).mkString(","))
println("城市酒店平均用户评分(最大,最小):" + arrMM(1).mkString(","))
println("城市酒店总评论数(最大,最小):" + arrMM(2).mkString(","))
sc.parallelize(arrMM).map(_.mkString(","))
.saveAsTextFile(hdfsUrl + "/hotelsparktask4_2")
//城市,总订单,平均分,总评论数
val rdd1 = sc.textFile(hdfsUrl + "/hotelsparktask4_1")
.map(_.split(","))
.map { arr => arr(0) -> Array(arr(1), arr(2), arr(3)).map(_.toDouble) }
.map { tup =>
val diff = arrMM.map(_.reduce(_ - _)).map { e =>
if (e == 0) {
1
} else {
e
}
}
val dd = (tup._2(0) - arrMM(0)(1)) / diff(0)
val pf = (tup._2(1) - arrMM(1)(1)) / diff(1)
val pl = (tup._2(2) - arrMM(2)(1)) / diff(2)
Array(tup._1,dd,pf,pl)
}
val reMap = rdd1.groupBy(_(0)).collectAsMap()
println(reMap.getOrElse("广州","NaN"))
println(reMap.getOrElse("上海","NaN"))
println(reMap.getOrElse("北京","NaN"))
rdd1.map(_.mkString(","))
.saveAsTextFile(hdfsUrl+"/hotelsparktask4_3")
sc.stop()
}
}
hotel_data.csv
下载数据:https://download.csdn.net/download/weixin_44018458/87437211
spark数据清洗练习:https://blog.csdn.net/weixin_44018458/article/details/128980802
更多推荐
所有评论(0)