Spark大数据分析与实战(第二版) 项目3 教案_第1页
Spark大数据分析与实战(第二版) 项目3 教案_第2页
Spark大数据分析与实战(第二版) 项目3 教案_第3页
Spark大数据分析与实战(第二版) 项目3 教案_第4页
Spark大数据分析与实战(第二版) 项目3 教案_第5页
已阅读5页,还剩6页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

课程名称Spark大数据分析选用教材Spark大数据分析与实战(第2版)出版社西安电子科技大学出版社章节项目3SparkRDD分析交通违章记录教学内容借助成熟的SparkRDD技术,分析交通违章记录文件中的数据。授课学时授课班级****专业*****班授课日期授课地点教学目标了解RDD的特性及运算的原理,了解RDD的执行流程;熟悉各种数据源创建RDD的算子,多种方法查看RDD的元素(2)熟练使用算子完成RDD的转换、排序、过滤、去重等操作;(3)能够完成键值对RDD的生成、转换等操作;(4)根据业务需求,能将RDD中数据输出到文件系统中。重点难点RDD的生成(内存数据、文件等生成)RDD的map、filter、sortBy等常用算子;键值对RDD的key、value相关操作,键值对RDD排序等;两个RDD的相关操作:join、union、zip等。教学方法R讲授£讨论或座谈£问题导向学习£分组合作学习£案例教学£任务驱动R项目教学£情景教学£演示汇报£实践教学£参观访问£引导文教学£其他(--)教学准备(教师)教材:《Spark大数据分析与实战(第2版)》硬件设备:内存8G(或以上)的计算机(2)教学资源:课件PPT、教学日历、相关软件等教学准备(学生)教材:《Spark大数据分析与实战(第2版)》硬件设备:内存8G(或以上)的计算机(3)教学资源:课件PPT、相关软件等教学环节教学内容与过程(教学内容、教学方法、组织形式、教学手段)课前组织教师通过课程教学平台或班级群发布学习预习任务及课程资源;学生提前预习相关内容,并完成课前自测等。课程内容描述任务3.1根据交通违章数据创建RDD认识RDDRDD就是一个分布在集群多节点中存放数据的集合;虽然一个数据集分散于集群多个节点,但逻辑上仍然是一个整体(即RDD),数据处理人员只需对这个整体进行处理,而无需关注底层逻辑与实现方法,从而极大降低了大数据编程的难度。其计算流程如下:内存数据创建RDD针对程序中已有的数据集合(List、Array、Tuple等),Spark提供了两个方法:parallelize和makeRDD,它们均可复制数据集合的元素后,创建一个可并行计算的分布式数据集RDD。parallelize方式适用于做简单的Spark程序测试、Spark学习;下面演示根据列表数据创建RDD:scala>valnums=List(1,2,3,4,5)//包含5个整数的列表nums:List[Int]=List(1,2,3,4,5)scala>valnumsRDD=sc.parallelize(nums)//根据列表nums,创建一个RDD(numsRDD)numsRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1]atparallelizeat<console>:26scala>valcars=Array("比亚迪","长安","奇瑞","广汽")cars:Array[String]=Array(比亚迪,长安,奇瑞,广汽)scala>valcarsRDD=sc.parallelize(cars)//根据数组cars,创建一个RDD(carsRDD)carsRDD:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[2]atparallelizeat<console>:26外部文件创建RDD由文件创建RDD,采用sc.textFile(“文件路径”)方式,路径前面需要加入“file://”以表示本地文件(Spark-shell环境下,要求所有节点的相同位置均保存该文件)。现有本地文件“/home/hadoop/data/guide.txt”,借助textFile()方法,可以生成RDD,演示代码如下:scala>valfileRDD=sc.textFile("file:///home/hadoop/data/guide.txt")//注意路径的写法fileRDD:org.apache.spark.rdd.RDD[String]=file:///home/hadoop/data/guide.txtMapPartitionsRDD[11]attextFileat<console>:25scala>fileRDD.count()//使用count方法查看RDD的元素数量,即guide.txt文件的行数。res14:Long=4任务3.2找出扣分最高的交通违法条目查看RDD的元素在学习或测试代码时,为了便于掌控计算过程、及时发现问题,可以使用collect操作查看RDD内元素的值;collect操作会将RDD的所有元素组成一个数组并返回给Driver端;其用法示例如下:scala>valnums=List(1,2,3,4,5)nums:List[Int]=List(1,2,3,4,5)scala>valnumsRDD=sc.parallelize(nums)//根据列表nums,创建RDDnumsRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:26scala>numsRDD.collect()//查看RDD的元素值res5:Array[Int]=Array(1,2,3,4,5)Map操作map操作是最常用的转换操作,该操作接收一个函数作为参数,进而将RDD中的每个元素作为参数传入某个函数,函数处理完后的返回值组成一个新的RDD;其目的是根据现有的RDD,经过函数处理,最终得到一个新的RDD。用法示例如下:scala>valdata=List(1,2,3,4,5,6)data:List[Int]=List(1,2,3,4,5,6)scala>valdataRDD=sc.parallelize(data)dataRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[10]atparallelizeat<console>:26scala>valnewDataRDD=dataRDD.map(x=>x*2)newDataRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[11]atmapat<console>:25scala>newDataRDD.collect()res10:Array[Int]=Array(2,4,6,8,10,12)scala>valpeoples=List("tom","jerry","petter","ken")peoples:List[String]=List(tom,jerry,petter,ken)scala>valpeoplesRDD=sc.makeRDD(peoples)peoplesRDD:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[7]atmakeRDDat<console>:26scala>valnewPeoplesRDD=peoplesRDD.map(x=>x.toUpperCase())newPeoplesRDD:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[8]atmapat<console>:25scala>newPeoplesRDD.collect()res8:Array[String]=Array(TOM,JERRY,PETTER,KEN)RDD的排序sortBy操作可以对RDD元素进行排序,并返回排好序的新RDD;sortBy有3个参数,其用法说明如下;defsortBy[K](f:(T)⇒

K,

ascending:

Boolean

=

true,

numPartitions:

Int

=

this.partitions.length):

\o"org.apache.spark.rdd.RDD"RDD[T]ReturnthisRDDsortedbythegivenkeyfunction参数1:f:(T)⇒K,左边为要排序的RDD的每一个元素,右边返回要进行排序的值。参数2:ascending(可选项),升序或降序排列标识,默认为true、升序排列,若要降序排列则需写false。参数3:numPartitions(可选项),排序后新RDD的分区数量,默认分区数量与原RDD相同。针对某个RDD,将RDD的元素数据交给“f:(T)⇒K”函数进行处理;而后按照函数运算后的返回值进行排序,默认为升序排列。数值型RDD的统计对于数值元素组成的RDD,Spark提供了max、min、sum等若干统计算子,可以完成简单的统计分析;相关示例如下:scala>valdata=sc.makeRDD(List(8,10,7,4,1,9,6,3,5,2))data:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]atmakeRDDat<console>:25scala>data.max()//返回RDD中的最大值res9:Int=10scala>data.min()//返回RDD中的最小值res10:Int=1任务3.3查找某车辆的违章记录1.filter操作filter是一个转换操作,可用于筛选出满足特定条件元素,返回一个新的RDD;其用法说明如下:deffilter(f:(T)⇒

Boolean):

\o"org.apache.spark.rdd.RDD"RDD[T]ReturnanewRDDcontainingonlytheelementsthatsatisfyapredicate.其应用示例如下:scala>valnumsRDD=sc.makeRDD(List(3,1,2,9,10,5,8,4,7,6))numsRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[27]atmakeRDDat<console>:25scala>valrdd1=numsRDD.filter(x=>x%2==0)//过滤出偶数元素,组成一个新RDD并返回rdd1:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[28]atfilterat<console>:25scala>rdd1.collect()res5:Array[Int]=Array(2,10,8,4,6)scala>valtextsRDD=sc.makeRDD(List("IlikeSpark","HelikeHadoop","ShelikeSpark"))textsRDD:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[29]atmakeRDDat<console>:26scala>valrdd2=textsRDD.filter(x=>x.contains("Spark"))//过滤出含有字符串“Spark”的元素rdd2:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[30]atfilterat<console>:25scala>rdd2.collect()res6:Array[String]=Array(IlikeSpark,ShelikeSpark)2.distinct操作RDD的元素可能存在重复情况,当我们需要去掉重复元素时,可以使用distinct方法。scala>valdataRDD=sc.makeRDD(List(3,5,7,3,4,8,5))//dataRDD内有重复元素3、5dataRDD:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]atmakeRDDat<console>:25scala>valnewDataRDD=dataRDD.distinct()//去除重复元素newDataRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[3]atdistinctat<console>:25scala>newDataRDD.collect()//检查是否成功去重res2:Array[Int]=Array(4,8,5,3,7)3.union等操作union方法可将两个RDD的元素合并为一个新RDD,即得到两个RDD的并集intersection可以求两个RDD的交集,即两个RDD的相同元素类似于数学中集合的差集运算,可以使用subtract来求两个RDD的差集cartesian用于求两个RDD的笛卡尔积,将两个集合元素组合成一个新的RDD任务3.4查找违章次数3次以上车辆键值对RDD键值对RDD(PairRDD)是指每个RDD元素都是(Key,Value)键值类型(即二元组);普通RDD里面存储的数据类型是Int、String等,而“键值对RDD”里面存储的数据类型是“键值对”。下面代码中,我们首先定义一个列表scores,scores的每个元素为二元组,记录学生的姓名及考试成绩;接下来,使用parallelize方法生成键值对RDD(scoresRDD),scoresRDD元素的类型为二元组。scala>valscores=List(("张小帅",84),("孙田",80),("马莉",92))//scores的元素为二元组,例如("张小帅",84)scores:List[(String,Int)]=List((张小帅,84),(孙田,80),(马莉,92))scala>valscoresRDD=sc.parallelize(scores)//scoresRDD即为键值对RDDscoresRDD:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[0]atparallelizeat<console>:26scala>scoresRDD.collect()//scoresRDD的元素为二元组res2:Array[(String,Int)]=Array((张小帅,84),(孙田,80),(马莉,92))Lookup查找value键值对RDD的元素为(key,value)形式的二元组,keys操作可以获取键值对RDD中所有的key,组成一个新的RDD并返回;values操作会把键值对RDD中的所有value返回,形成一个新的RDD;两个操作的用法示例如下:scala>valdata=List(("Spark",1),("Hadoop",2),("Flink",3),("kafka",4))data:List[(String,Int)]=List((Spark,1),(Hadoop,2),(Flink,3),(kafka,4))scala>valpairRDD=sc.makeRDD(data)pairRDD:org.apache.spark.rdd.RDD[(String,Int)]=ParallelCollectionRDD[9]atmakeRDDat<console>:26scala>valkeysRDD=pairRDD.keys//获取所有的key,组成新RDDkeysRDD:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[10]atkeysat<console>:25scala>keysRDD.collect()res6:Array[String]=Array(Spark,Hadoop,Flink,kafka)scala>valvaluesRDD=pairRDD.values//获取所有的value,组成新的RDDvaluesRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[12]atvaluesat<console>:25scala>valuesRDD.collect()res7:Array[Int]=Array(1,2,3,4)ByKey相关操作对于键值对RDD,Spark提供了groupByKey、sortByKey、reduceByKey等若干ByKey相关操作;其中,groupByKey是根据key值,对value进行分组;用法演示如下:fruits:List[(String,Double)]=List((apple,5.5),(orange,3.0),(apple,8.2),(banana,2.7),(orange,4.2))scala>valfruitsRDD=sc.makeRDD(fruits)fruitsRDD:org.apache.spark.rdd.RDD[(String,Double)]=ParallelCollectionRDD[23]atmakeRDDat<console>:26scala>valgroupedRDD=fruitsRDD.groupByKey()//按照Key,对value进行分组groupedRDD:org.apache.spark.rdd.RDD[(String,Iterable[Double])]=ShuffledRDD[25]atgroupByKeyat<console>:25scala>groupedRDD.collect()res22:Array[(String,Iterable[Double])]=Array((banana,CompactBuffer(2.7)),(orange,CompactBuffer(3.0,4.2)),(apple,CompactBuffer(5.5,8.2)))mapValue操作实际业务中,可能遇到只对键值对RDD的value部分进行处理,而保持value不变的需求;这时,可以使用mapValues(func),它的功能是将RDD元组中的value交给函数func处理。任务3.5查找累计扣12分以上车辆信息1.zip操作将两个RDD组合成键值对RDD除了使用makeRDD等方式创建键值对RDD,还可以使用zip操作(亦称为“拉链操作”)将两个元素数量相同、分区数相同的普通RDD组合成一个键值对RDD。下面代码中,rdd1由3个元素(分区数量默认),rdd2也有3个元素;代码rdd1.zip(rdd2)将前述两个RDD组合成一个键值对新的RDD。scala>valrdd1=sc.makeRDD(List("东岳","西岳","南岳","北岳","中岳"))rdd1:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[1]atmakeRDDat<console>:25scala>valrdd2=sc.makeRDD(List("泰山","华山","衡山","恒山","嵩山"))rdd2:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[2]atmakeRDDat<console>:25scala>rdd1.zip(rdd2).collect()//rdd1、rdd2组成一个键值对RDD,并输出其元素res3:Array[(String,String)]=Array((东岳,泰山),(西岳,华山),(南岳,衡山),(北岳,恒山),(中岳,嵩山))2join连接两个RDDjoin概念来自于关系数据库领域,SparkRDD中的join的类型也包括内连接(join)、左外连接(leftOuterJoin)、右外连接(rightOuterJoin)等。其中,join是对于给定的两个键值对RDD(数据类型为(K,V1)和(K,V2)),只有两个RDD中都存在的Key才会被输出,最终得到一个(K,(V1,V2))类型的RDD;其用法示例如下:scala>rdd1.join(rdd2).collect()res5:Array[(String,(Int,Int))]=Array((tom,(1,5)),(jerry,(2,6)))scala>valrdd3=rdd1.join(rdd2)//rdd1、rdd2中有相同的Key:tom、jerryrdd3:org.apache.spark.rdd.RDD[(String,(Int,Int))]=MapPartitionsRDD[13]atjoinat<console>:263其他连接rightOuterJoin类似于SQL中的右外关联rightouterjoin,根据两个RDD的Key进行右连接,返回结果以右边(第二个)的RDD为主,关联不上的记录为空(None值)。leftOuterJoin类似于SQL中的左外关联leftouterjoin,可以根据两个RDD的Key进行左连接,返回结果以左边(第一个)的RDD为主,关联不上的记录为空(None值);其用法示例如下:scala>valrdd4=rdd1.rightOuterJoin(rdd2)//两个RDD左连接rdd4:org.apache.spark.rdd.RDD[(String,(Option[Int],Int))]=MapPa

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论