Spark大数据分析与实战(第二版) 课件 项目3 Spark RDD分析交通违章记录_第1页
Spark大数据分析与实战(第二版) 课件 项目3 Spark RDD分析交通违章记录_第2页
Spark大数据分析与实战(第二版) 课件 项目3 Spark RDD分析交通违章记录_第3页
Spark大数据分析与实战(第二版) 课件 项目3 Spark RDD分析交通违章记录_第4页
Spark大数据分析与实战(第二版) 课件 项目3 Spark RDD分析交通违章记录_第5页
已阅读5页,还剩51页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

大数据分析与实战项目3SparkRDD分析交通违章记录要求使用SparkRDD技术,完成交通违章数据的分析,为相关部门提供各类信息支持。为加强交通管理、减少交通违章行为,某地部署了数百组交通监控设备,用于采集辖区内各类交通违法行为;经数据抽取与整理,得到3张数据表格:违章行为记录表、车主信息表、违章代码表。情境导入Spark项目分解Spark序号任务任务说明1根据交通违章数据创建RDD将3个交通违章数据文件(txt格式)上传到HDFS特定目录;读取文件,创建弹性分布式数据集RDD。2找出扣分最多的交通违章条目根据违章代码表(violation.txt),找出其中扣分最多的违章条目(Top3)。3查找某车辆的违章记录根据本地违章行为记录表(record.txt)及邻市违章行为记录表(recordCityB.txt),找出某车辆在两地区的所有违章记录。4找出违章3次以上车辆统计各车辆的违章次数,找出违章次数大于3次的车牌号,并打印相关信息。5打印累积扣12分以上车辆信息根据违章数据文件,找出交通违章扣12分以上的车牌号;进而结合车主信息表,找出对应的车主姓名、手机号等信息,并模拟发短息提醒。6将处理结果写入文件整合违章数据,将“违章日期、车牌号、扣分数、罚款金额、违章内容”等5项信息写入到TSV文件中。掌握RDD元素查看及常见的转换、排序、过滤、去重等操作。了解RDD原理,熟悉RDD的创建方法。能否根据需要将RDD计算的结果输出到文件中。123学习目标Spark项目3

SparkRDD分析车辆违章记录Spark任务1根据交通违章数据创建RDD找出扣分最高的违章条目查找某车辆的违章记录任务2任务3查找违章3次以上的车辆任务4找出累计扣12分以上的车辆任务5将处理结果存储到外部文件中任务6任务分析SparkRDD是SparkCore的核心数据抽象,是进行Spark学习的基础。而使用SparkRDD进行数据分析,首先面临的问题是如何创建RDD。本任务要求读取HDFS分布式文件系统中的交通违章数据文件,生成RDD并输出相关信息。认识RDDSparkRDD(弹性分布式数据集)就是一个分布在集群多节点中存放数据的集合;物理上一个数据集可能分散于集群多个节点,但逻辑上仍然是一个整体(即RDD),数据处理人员只需对这个整体进行处理,而无需关注底层逻辑与实现方法。RDD可以看做是Spark对具体数据的抽象(封装),本质上是一个只读的、分区的记录集合;每个分区都是一个数据集片段,可由一个任务来执行。认识RDDSparkSparkRDD的计算过程可以简单抽象为:创建RDD(makeRDD)、转换(Transformation)和行动(Action)3个阶段。由内存数据创建RDDSpark针对内存中的数据(List、Array、Tuple等),Spark提供了两个操作:parallelize和makeRDD,它们创建一个可并行计算的分布式数据集RDD。scala>valnums=List(1,2,3,4,5)//包含5个整数的列表scala>valnumsRDD=sc.parallelize(nums)//创建一个RDDscala>valpeople=List("李白","王之焕","韦应物","杜牧","元慎")scala>valpeopleRDD=sc.makeRDD(people,3)//创建RDD,含3个分区scala>peopleRDD.partitions.size

//查看peoplesRDD的分区数量res3:Int=3由外部存储生成RDDSpark在生产环境中,通常根据外部存储的数据文件生成RDD。Spark提供了textFile()方法,它可以读取外部文件中的数据来创建RDD。scala>valfileRDD=sc.textFile("file:///home/hadoop/data/guide.txt")scala>fileRDD.count()//使用count方法查看RDD的元素数量res14:Long=4scala>valhdfsFileRDD=sc.textFile("hdfs://localhost:9000/user/hadoop/data/guide.txt")相关知识小结SparkRDD是分布式数据集,是分布在多节点数据的抽象;内存数据创建RDD的方法:parallelize和makeRDD;Spark提供了textFile()方法,它可以读取外部文件中的数据来创建RDD。读取交通违章数据文件,生成RDD,并查看RDD分区数、元素数量。任务实施项目3

SparkRDD分析车辆违章记录Spark任务1根据交通违章数据创建RDD找出扣分最高的违章条目查找某车辆的违章记录任务2任务3查找违章3次以上的车辆任务4找出累计扣12分以上的车辆任务5将处理结果存储到外部文件中任务6任务分析Spark现有一个文件violation.txt(违章条目对照表),内含违章代码、违章内容、扣分、罚款、附件处理等,数据之间用tab分割(\t)。将利用违章记录文件产生RDD,利用多种算子(方法),找出罚款金额最高、扣分最多的交通违章类型。查看RDD的元素Spark为了便于掌控计算过程、及时发现问题,可以使用collect操作查看RDD内元素的值;collect操作会将RDD的所有元素组成一个数组并返回给Driver端。scala>valnums=List(1,2,3,4,5)scala>valnumsRDD=sc.parallelize(nums)//根据列表nums,创建RDDscala>numsRDD.collect()//查看RDD的元素值res5:Array[Int]=Array(1,2,3,4,5)查看RDD的元素Spark可以使用take方法查看RDD的前N个元素,first操作查看RDD的第一个元素值。scala>numsRDD.take(3)//获取前3个元素,并返回一个数组Arrayres3:Array[Int]=Array(1,2,3)scala>numsRDD.first()//获取RDD的第一个元素值res6:Int=1map与flatMap操作Sparkmap操作接收一个函数作为参数,进而将RDD中的每个元素作为参数传入某个函数,函数处理完后的返回值组成一个新的RDD。scala>valdata=List(1,2,3,4,5,6)scala>valdataRDD=sc.parallelize(data)scala>valnewDataRDD=dataRDD.map(x=>x*2)//dataRDD元素乘以2scala>newDataRDD.collect()res10:Array[Int]=Array(2,4,6,8,10,12)map与flatMap操作Spark示例:用RDD中存储学生的姓名、年龄信息,使用map操作将其年龄加1,并为每个学生设置一个邮箱(姓名@)scala>valstudents=List(("Tom",20),("Jerry",18))//列表中嵌套元组scala>valstudentRDD=sc.makeRDD(students)//根据列表students创建RDDscala>valstudentRDD2=studentRDD.map(x=>(x._1,x._2+1,x._1+"@"))scala>studentRDD2.collect()res2:Array[(String,Int,String)]=Array((Tom,21,Tom@),(Jerry,19,Jerry@))map与flatMap操作Sparkflatmap是将函数应用于RDD中的每个元素,而后展平结果(去掉嵌套),最终得到一个新的RDD;scala>valtext=List("IlikeSpark","HelikesSpark","ShelikesSparkandHadoop")scala>valtextRDD=sc.makeRDD(text)scala>valrdd1=textRDD.map(x=>x.split(""))scala>rdd1.collect()res8:Array[Array[String]]=Array(Array(I,like,Spark),Array(He,likes,Spark),Array(She,likes,Spark,and,Hadoop))scala>valrdd2=textRDD.flatMap(x=>x.split(""))scala>rdd2.collect()res9:Array[String]=Array(I,like,Spark,He,likes,Spark,She,likes,Spark,and,Hadoop)sortBy排序操作SparksortBy操作可以对RDD元素进行排序,并返回排好序的新RDD。scala>valnumsRDD=sc.makeRDD(List(3,1,2,9,10,5,8,4,7,6))scala>valnewNumsRDD=numsRDD.sortBy(x=>x,false)scala>newNumsRDD.collect()res3:Array[Int]=Array(10,9,8,7,6,5,4,3,2,1)sortBy排序操作Sparkscala>valstudents=List(("Tom",20),("Jerry",19),("Bob",22),("Ken",21))//列表students的元素为元组scala>valstudentsRDD=sc.makeRDD(students)scala>valnewStudentsRDD=studentsRDD.sortBy(x=>x._2,true)//根据元素(元组)的第2个值升序排列scala>newStudentsRDD.collect()res4:Array[(String,Int)]=Array((Jerry,19),(Tom,20),(Ken,21),(Bob,22))数值型RDD的统计操作Sparkscala>valdata=sc.makeRDD(List(8,10,7,4,1,9,6,3,5,2))scala>data.max()//返回RDD中的最大值res9:Int=10scala>data.min()//返回RDD中的最小值res10:Int=1scala>data.mean()//返回RDD元素的平均值res11:Double=5.5查看RDD元素:collect、take、first等方法;map、flatMap方法可以改变RDD的元素值,产生新的RDD;RDD元素排序:sortBy方法Spark提供了max、min、sum等若干统计算子。Spark综合利用本任务中的知识储备,根据违章条目对照表,找出罚款金额最高、扣分最多的交通违章类型。任务实施相关知识小结Spark项目3

SparkRDD分析车辆违章记录Spark任务1根据交通违章数据创建RDD找出扣分最高的违章条目查找某车辆的违章记录任务2任务3查找违章3次以上的车辆任务4找出累计扣12分以上的车辆任务5将处理结果存储到外部文件中任务6任务分析Sparkrecords.txt文件记录了本市车辆违章信息(包括:日期、监控设备编号、车牌号、违章类型代码),recordsCityB.txt记录相邻的B城市车辆违章信息。根据有关部门要求,需要查找车辆MU0066在本地及临市B的交通违章记录。filter操作过滤RDD的元素Sparkfilter是一个转换操作,可用于筛选出满足特定条件元素,返回一个新的RDDscala>valnumsRDD=sc.makeRDD(List(3,1,2,9,10,5,8,4,7,6))scala>valrdd1=numsRDD.filter(x=>x%2==0)//过滤出偶数元素,组成一个新RDD并返回scala>rdd1.collect()res5:Array[Int]=Array(2,10,8,4,6)计算两个RDD的并集、交集与差集Spark使用union方法,合并两个RDD的元素,得到一个新的RDD。scala>valrdd1=sc.makeRDD(List(1,2,3))scala>valrdd2=sc.makeRDD(List(3,4,5))scala>valrdd3=rdd1.union(rdd2)//合并两个RDDscala>rdd3.collect()//合并后的RDD有重复元素“3”res8:Array[Int]=Array(1,2,3,3,4,5)计算两个RDD的并集、交集与差集Sparkintersection可以求两个RDD的交集,即两个RDD的相同元素。scala>valrdd1=sc.makeRDD(List(1,2,3,4,5))scala>valrdd2=sc.makeRDD(List(4,5,6,7,8))scala>valrdd3=rdd1.intersection(rdd2)scala>rdd3.collect()res9:Array[Int]=Array(4,5)计算两个RDD的并集、交集与差集Spark类似于数学中集合的差集运算,可以使用subtract操作求两个RDD的差集。scala>valrdd1=sc.makeRDD(List(1,2,3,4,5))scala>valrdd2=sc.makeRDD(List(4,5,6,7,8))scala>valrdd3=rdd1.subtract(rdd2)//在rdd1中、但不在rdd2中的元素scala>rdd3.collect()res12:Array[Int]=Array(1,2,3)filter方法过滤符合条件的RDD元素;distinct方法去除RDD中的重复元素;两个RDD可以执行交集、并集、差集等运算。Spark综合利用本任务中的知识储备,读取两地的车辆违章信息文件,需要查找车辆MU0066在本地及临市B的交通违章记录。任务实施相关知识小结Spark项目3

SparkRDD分析车辆违章记录Spark任务1根据交通违章数据创建RDD找出扣分最高的违章条目查找某车辆的违章记录任务2任务3查找违章3次以上的车辆任务4找出累计扣12分以上的车辆任务5将处理结果存储到外部文件中任务6任务分析Spark根据交通安全检查工作需要,查找本市违章记录数据(records.txt)中,违章次数3次以上车辆予以重点关注。键值对RDDSpark所谓键值对RDD(PairRDD)是指每个RDD元素都是(Key,Value)键值类型(即二元组)。scala>valscores=List(("张小帅",84),("孙田",80),("马莉",92))scala>valscoresRDD=sc.parallelize(scores)//生成键值对RDDscala>scoresRDD.collect()//scoresRDD的元素为二元组res2:Array[(String,Int)]=Array((张小帅,84),(孙田,80),(马莉,92))键值对RDDSpark普通RDD转为键值对RDDscala>valrdd1=sc.makeRDD(List("apple","grape","banana","watermelon"))scala>valpairRDD1=rdd1.map(x=>(x,x.length()))scala>pairRDD1.collect()res3:Array[(String,Int)]=Array((apple,5),(grape,5),(banana,6),(watermelon,10))keys、values操作得到一个新RDDSpark键值对RDD的元素为(key,value)形式的二元组,keys操作可以获取键值对RDD中所有的key,组成一个新的RDD并返回;values操作会把键值对RDD中的所有value返回,形成一个新的RDDscala>valdata=List(("Spark",1),("Hadoop",2),("Flink",3),("kafka",4))scala>valpairRDD=sc.makeRDD(data)scala>valkeysRDD=pairRDD.keys//获取所有的key,组成新RDDscala>keysRDD.collect()res6:Array[String]=Array(Spark,Hadoop,Flink,kafka)ByKey相关的操作Spark对于键值对RDD,Spark提供了groupByKey、sortByKey、reduceByKey等若干ByKey相关操作;scala>valfruits=List((apple,5.5),(orange,3.0),(apple,8.2),(banana,2.7),(orange,4.2))scala>valfruitsRDD=sc.makeRDD(fruits)scala>valgroupedRDD=fruitsRDD.groupByKey()//按照Key,对value进行分组scala>groupedRDD.collect()res22:Array[(String,Iterable[Double])]=Array((banana,CompactBuffer(2.7)),(orange,CompactBuffer(3.0,4.2)),(apple,CompactBuffer(5.5,8.2)))ByKey相关的操作SparksortByKey是根据key进行排序,即返回一个根据键排序的RDDscala>valpeoples=List((20,"Tom"),(18,"Jerry"),(21,"Bob"),(17,"Ben"))scala>valpeoplesRDD=sc.makeRDD(peoples)scala>valsortedRDD=peoplesRDD.sortByKey()//按照key进行排序scala>sortedRDD.collect()res23:Array[(Int,String)]=Array((17,Ben),(18,Jerry),(20,Tom),(21,Bob))ByKey相关的操作SparkreduceByKey(func)是根据key进行分组,使用func函数聚合同组内的value值,返回一个新RDDscala>valfruits=List(("apple",5.5),("orange",3.0),("apple",8.2),("banana",2.7),("orange",4.2))scala>valfruitsRDD=sc.makeRDD(fruits)scala>valreducedRDD=fruitsRDD.reduceByKey((a,b)=>a+b)scala>reducedRDD.collect()//reducedRDD元素为二元组res21:Array[(String,Double)]=Array((banana,2.7),(orange,7.2),(apple,13.7))mapValues对value进行处理SparkmapValues(func)功能是将RDD元组中的value交给函数func处理,而key不变scala>valfruits=List(("apple",6.5),("banana",3.8),("blueberry",19.9))scala>valfruitsRDD=sc.makeRDD(fruits)scala>valfruitsRDD2=fruitsRDD.mapValues(x=>x+5)//所有的value+5scala>fruitsRDD2.collect()res3:Array[(String,Double)]=Array((apple,11.5),(banana,8.8),(blueberry,24.9))键值对RDD的元素包括key、value两部分;键值对RDD支持:keys、values、groupByKey、sortByKey、reduceByKey、mapValues等操作。Spark综合利用本任务中的知识储备,根据本市违章记录数据(records.txt)中,找出违章3次以上车辆。任务实施相关知识小结Spark项目3

SparkRDD分析车辆违章记录Spark任务1根据交通违章数据创建RDD找出扣分最高的违章条目查找某车辆的违章记录任务2任务3查找违章3次以上的车辆任务4找出累计扣12分以上的车辆任务5将处理结果存储到外部文件中任务6任务分析Spark从本市违章记录数据文件中,找出违章扣分12分以上的车辆;进而根据车辆所有人预留电话,模拟发一条短信(打印一句话),提醒其到交管部门协助调查。join操作连接两个RDDSparkjoin是对于给定的两个键值对RDD(数据类型为(K,V1)和(K,V2)),只有两个RDD中都存在的Key才会被输出,最终得到一个(K,(V1,V2))类型的RDD。scala>valrdd1=sc.makeRDD(List(("tom",1),("jerry",2),("petter",3)))scala>valrdd2=sc.makeRDD(List(("tom",5),("ben",2),("jerry",6)))scala>rdd1.join(rdd2).collect()//rdd1、rdd2中有相同的Key:tom、jerryres5:Array[(String,(Int,Int))]=Array((tom,(1,5)),(jerry,(2,6)))rightOuterJoin右连接SparkrightOuterJoin类似于SQL中的右外关联rightouterjoin,根据两个RDD的Key进行右连接,返回结果以右边(第二个)的RDD为主,关联不上的记录为空。scala>valrdd1=sc.makeRDD(List(("tom",1),("jerry",2),("petter",3)))scala>valrdd2=sc.makeRDD(List(("tom",5),("apple",2),("jerry",6)))scala>valrdd4=rdd1.rightOuterJoin(rdd2)//两个RDD右连接scala>rdd4.collect()res8:Array[(String,(Option[Int],Int))]=Array((tom,(Some(1),5)),(apple,(None,2)),(jerry,(Some(2),6)))rightOuterJoin右连接SparkleftOuterJoin类似于SQL中的左外关联leftouterjoin,可以根据两个RDD的Key进行左连接,返回结果以左边(第一个)的RDD为主,关联不上的记录为空。scala>valrdd1=sc.makeRDD(List(("tom",1),("jerry",2),("petter",3)))scala>valrdd2=sc.makeRDD(List(("tom",5),("apple",2),("jerry",6)))scala>valrdd5=rdd1.leftOuterJoin(rdd2)//两个RDD左连接scala>rdd5.collect()res11:Array[(String,(Int,Option[Int]))]=Array((tom,(1,Some(5))),(jerry,(2,Some(6))),(petter,(3,None)))rightOuterJoin右连接SparkfullOuterJoin是全连接,会保留两个RDD的所有Key的连接结果scala>valrdd1=sc.makeRDD(List(("tom",1),("jerry",2),("petter",3)))scala>valrdd2=sc.makeRDD(List(("tom",5),("apple",2),("jerry",6)))scala>valrdd6=rdd1.fullOuterJoin(rdd2)//两个RDD全连接scala>rdd6.collect()res13:Array[(String,(Option[Int],Option[Int]))]=Array((tom,(Some(1),Some(5))),(apple,(None,Some(2))),(jerry,(Some(2),Some(6))),(petter,(Some(3),None)))对于两个RDD,Spark提供了类似于数据库表的连接操作;支持的连接包括:join、rightOuterJoin、leftOuterJoin、fullOuterJoin等Spark综合利用本任务中的知识储备,从违章记录中找到扣分超过12分的车牌,然后根据车牌查找车主信息,并模拟发出提示短信。任务实施相关知识小结Spark项目3

SparkRDD分析车辆违章记录Spark任务1根据交通违章数据创建RDD找出扣分最高的违章条目查找某车辆的违章记录任务2任务3查找违章3次以上的车辆任务4找出累计扣12分以上的车辆任务5将处理结果存储到外部文件中任务6任务分析Spark将records.txt、violation.txt中信息整合后,抽取违章日期、车牌号、扣分数、罚款金额、违章项目名称等5项信息,保存为TSV格式文件。读写文本文件Spark由文本文件创建RDD是现实中常见的需求,可以使用textFile(“文件位置”)方法读取文件的内容、生成RDDscala>valpath="file:///home/hadoop/data/myfile.txt"scala>valfileRDD=sc.textFile(path,2)//创建RDD,指定分区数量为2scala>fileRDD.saveAsTextFile("file:///home/hadoop/data/output")//fileRDD数据保存到本地文件读写文本文件Spark在文本文件中,还有两种常见的格式:CSV(commase

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论