《Spark大数据分析与实战》课件项目五 Spark 编程进阶_第1页
《Spark大数据分析与实战》课件项目五 Spark 编程进阶_第2页
《Spark大数据分析与实战》课件项目五 Spark 编程进阶_第3页
《Spark大数据分析与实战》课件项目五 Spark 编程进阶_第4页
《Spark大数据分析与实战》课件项目五 Spark 编程进阶_第5页
已阅读5页,还剩84页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

BigDataAnalyticswithSpark项目五Spark编程进阶项目概述前期学习中,无论是SparkRDD还是SparkSQL,主要是在SparkShell下完成数据分析处理工作。但是现实业务中,为了完成一个数据分析任务,可能需要编写很多行代码,需要用到很多类,这时SparkShell可能力不从心,因而需要用到IntelliJIDEA等专业开发工具。本项目将介绍主流的IDEA开发工具的安装,并通过Sogou日志分析、疫苗流向数据分析两个项目,介绍如何在IDEA下完成大数据开发工作。项目效果通过本项目实践,可以在IntelliJIDEA环境下开发大数据应用、分析疫苗流向数据,并在IDEA控制台下输出数据分析结果如下所示:目录任务1搭建IntelliJIDEA开发环境IDEA下用SparkRDD分析Sogou日志IDEA下用SparkSQL分析疫苗流向数据使用RDD缓存机制提升效率任务3认识RDD广播变量和累加器任务5理解RDD的依赖关系任务2任务4任务6任务2任务4思维导图搭建IntelliJIDEA

开发环境任务1SparkShell具有交互式特点。集成的开发环境IntelliJIDEA、Eclipse和NightlyBuilds。安装IntelliJIDEA、安装插件,以及创建工程、编写代码等工作。(1)下载安装包在Ubuntu下,使用浏览器进入jetbrains官网/idea/,下载社区版(Community版),该版本是免费的、可满足初学者基本学习需求。1.1下载安装IntelliJIDEA(2)安装IntelliJIDEA下载的完毕后,将IDEA包解压到/usr/local目录下,并启动IDEA;打开Ubuntu终端,输入命令:解压完毕后,进入IDEA安装目录,启动IDEA,命令如下:1.1下载安装IntelliJIDEAcd~/下载sudotar-zxvfideaIC-2019.3.3.tar.gz-C/usr/localcd/usr/local/idea-IC-193.6494.35/cdbin./idea.sh进入启动界面后,IDEA让用户选择UI风格(SetUItheme),选择个人喜好的风格(如light)后,点击“Next:DesktopEntry”。1.1下载安装IntelliJIDEA在CreateDesktopEntry界面中,直接选择“Next:LauncherScript”,这样日后使用过程中可以直接在Ubuntu的应用程序搜索中找到IntelliJIDEA,而不用在Ubuntu终端中输入命令。1.1下载安装IntelliJIDEA接下来,下载插件界面中,在Scala插件介绍下的“Install”,完成Scala插件的安装后点击“StartusingIntelliJIDEA”。1.1下载安装IntelliJIDEA在图5-6中,也可以暂不安装插件,直接点击“StartusingIntelliJIDEA”;进入IDEA欢迎界面后,选择“Plugins”后,进入Plugins市场,搜索“Scala”插件,点击“Install”也可以完成Scala插件的安装。1.1下载安装IntelliJIDEA插件安装成功后,在IDEA欢迎界面,选择“StructureforNewProjects”。1.1下载安装IntelliJIDEA在弹出的“StructureforNewProjects”界面中,依次选择“Project”、“New”后,设置JDK(选择本机安装的JDK目录)。1.1下载安装IntelliJIDEA在选择“GlobalLibraries”,点击+号,选择“ScalaSDK”,在弹出的SDK选择窗口中选择与本机匹配的ScalaSDK版本;如没有合适的版本,可以选择“Download...”下载。1.1下载安装IntelliJIDEA设置好ScalaSDK后,选择刚刚添加的ScalaSDK,右键选择“CopytoProjectLibraries…”,将ScalaSDK添加到工程中。1.1下载安装IntelliJIDEA在IDEA欢迎界面,选择“CreateNewProject”;进入图5-15后,选择“Maven”后,点击“Next”。1.2创建工程在“NewProject”窗口中,输入工程名“mysparkproject”后,点击“Next”。1.2创建工程进入工程界面后,在工程区域,选择“mysparkproject”工程,展开其目录结构;选中“main”目录后,右键选择“New”——“Directory”,在弹出的“NewDirectory”窗口中输入新建的目录名称“scala”。1.2创建工程选择新建的scala目录,右键选择“MarkDirectoryas”——“SourcesRoot”,将其设置为源目录。1.2创建工程接下来,选择原有的“java”目录,右键选择删除该目录。至此,我们完成了在IDEA下Scala工程创建工作。1.2创建工程在编写代码前,需要修改mysparkproject工程的pom.xml文件,以便加载相关依赖包;注意在上述pom.xml文件中,读者要根据自己配置环境(软件版本等)进行适当修改,例如<groupId>org.example</groupId>和<artifactId>mysparkproject</artifactId>标签为创建工程时设置的groupId和artifactId;而<spark.version>2.2.3</spark.version>及<scala.version>2.11</scala.version>,则需要根据自己的Spark、Scala版本填写(作者安装的Spark版本为2.23,Scala版本为2.11);<dependency>*****</dependency>标签用于添加Maven依赖;后续开发中会根据使用的模块,陆续添加其他依赖。1.3修改pom.xml的内容修改完毕pom.xml后,在Mavenprojectstobeimported选项处,选择“EnableAuto-Import”;此过程需联网下载相关依赖包,请耐心等待(根据网络情况,可能需要较长时间);执行完毕后即可开展后续的工作(注意本项工作完成前,将无法新建scala程序)。1.3修改pom.xml的内容接下来,我们编写程序实现单词计数WordCount功能;在工程目录中,选择前面新建的文件夹“scala”,右键选择“New”后,选择“ScalaClass”;在弹出的“CreateNewScalaClass”中,填写scala类的名称为“WordCount”,在Kind类型中选择“Object”后点击“OK”键,完成scala类的创建。1.4新建scala文件、编写程序在新建的WordCount.scala中,写入单词计数的代码。该程序中,首先引入SparkConf、SparkContext类,然后定义object类WordCount(只有oject类才可以才可以访问main方法);在WordCount内部定义main方法,实现单词计数功能。注意,在IDEA环境下写Spark程序与SparkShell下有所不同;SparkShell自带一个SparkContext实例sc,但在IDEA下需要自己创建SparkContext对象(代码7~9行)。1.4新建scala文件、编写程序在WordCount.scala代码窗口内的任意位置,右键点击,在弹出的菜单中选择Run‘WordCount’。运行的结果如下:注意WordCount.scala程序要求,必须存在/usr/local/spark/mycode/wordcount/words.txt这个文件,否则会报错;因此读者需要创建words.txt文件并置于相应目录下。程序运行后,IDEA控制台输出信息较多,可以拖动寻找结果信息。1.5在IDEA中运行程序在实际项目中,需要在IDEA中对程序进行打包,以便提交到Spark集群中运行。下面演示如何打包程序。首先,将鼠标置于在IDEA的左下角灰色方块中,在弹出的菜单中选择“Maven”。1.6工程打包、提交集群运行在IDEA中,出现Mave面板,选择“Lifecycle”,点击“package”选项,开始打包工作。1.6工程打包、提交集群运行打包完成后在工程目录的target文件夹中可以看到打包好的jar包。对于该jar包,右键选择“CopyPath”获得该jar包的路径。复制得到jar包的路径后,在Linux终端中,使用命令“/usr/local/spark/bin/spark-submit--classWordCount复制得到的jar包路径”来执行程序。1.6工程打包、提交集群运行

如果要想将程序放置到集群中执行,需要做如下修改:(1)WordCount.scala程序中setMaster("local[*]"),local[*]需要修改为“spark://集群主机IP:7077”;其中7077位Spark默认的端口号(如已经修改,则根据情况调整)。(2)进入Spark的安装目录后,使用如下命令提交、运行程序:1.6工程打包、提交集群运行bin/spark-submit\--classmysparkproject.WordCount\--masterspark://主机IP:7077\--executor-memory2G\--total-executor-cores4\/****jar所在包目录******/mysparkproject-1.0-SNAPSHOT.jarIDEA下用SparkRDD分析Sogou日志任务2Sogou实验室提供了某段时间搜索引擎记录的网页查询需求及用户点击情况的日志数据;要求在IntelliJIDEA环境下,使用SparkRDD技术进行分析。考虑到学习者电脑配置,选择迷你版数据集(某1天的搜索日志数据),命名为sogouoneday.txt,共计170余万条记录。按照数据集说明,其数据格式为:访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL;其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID。在Linux终端中,使用head命令查看前10行数据。2.1数据说明进一步分析发现该“URL在返回结果中的排名”与“用户点击的顺序号”实际为空格分隔,而非\t;在这里暂时不处理,在程序代码中处理。将数据集sogouoneday.txt置于用户主目录/usr/local/hadoop下,在Linux终端中使用如下命令启动Hadoophdfs服务,并将文件上传到hdfs文件系统中。2.1数据说明cd/usr/local/hadoop/sbin./start-dfs.shcd/usr/local/hadoop/bin./hdfsdfs-mkdirsogou./hdfsdfs-put/home/hadoop/sogouoneday.txtsogou

下面逐条分析需求,找出解决问题的思路:(1)找出用户搜索量最大的3个时段top3;当前数据中的访问时间格式为HH:mm:ss,可以截取出前2个字符为小时(24小时制),以小时为统计单位,找出访问量最大的三个时段;可以考虑将RDD转换为(小时,1)类型的键值对,然后使用reduceByKey方法,计算各小时的访问量;最后使用sortBy方法排序,得到top3。(2)找出搜索次数最多的20个用户top20;处理方法与第1问类似,用户ID为用户的唯一标识,可以考虑将RDD转换为(用户ID,1)类型的键值对,然后使用reduceByKey方法,计算各用户的访问量;最后使用sortBy方法排序,得到top20。(3)求出平均每个用户的搜索量;首先使用count方法,得出总的记录数totalNum;对于用户ID,使用distinct方法得出用户数userNum;最后totalNum除以userNum。(4)用户点击的URL在返回结果中的平均排名。对于所有的“URL在返回结果中排名”加和,得到rankTotal;rankTotal除以totalNum即可。2.2需求分析在IDEA工程中,新建object类文件Sogou.scala,代码如下:importorg.apache.log4j.{Level,Logger}importorg.apache.spark.SparkContextimportorg.apache.spark.SparkConfimportscala.util.Success2.3IDEA下编写程序objectSogou{defmain(args:Array[String]):Unit={Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)//创建sparkConf实例valconf=newSparkConf().setMaster("local[2]").setAppName("SogouDataPro")//创建SparkContext实例valsc=newSparkContext(conf)//sogouoneday.txt文件的路径,可根据实际情况修改valpath="hdfs://localhost:9000/user/hadoop/sogou/sogouoneday.txt"//读取文件生成RDDvalinputRDD=sc.textFile(path)//对于inputRDD中的元素使用split方法切割成单词valsplitRDD=inputRDD.map(x=>x.split("\t"))//取出“时间”字符串中的前两个字符,即为“小时”;组成键值对(小时,1)valhourPairs=splitRDD.map(x=>(x(0).trim.substring(0,2),1))//键值对(小时,1)进行reduceByKey运算,得到各时段的访问量valhourAdd=hourPairs.reduceByKey((a,b)=>a+b)//按照访问量进行排序,降序valsortedHour=hourAdd.sortBy(x=>x._2,false)//打印结果println("访问量最多的时段Top3:")sortedHour.take(3).foreach(println)2.3IDEA下编写程序//将用户ID从splitRDD取出,组成键值对(用户ID,1)valuserPairs=splitRDD.map(x=>(x(1).trim,1))//reduceByKey方法,得到每个用户的访问量;valuserAdd=userPairs.reduceByKey((a,b)=>a+b)//按照用户的访问量降序排列valsortedUser=userAdd.sortBy(x=>x._2,false)//打印结果println("访问次数最多的用户Top10:")sortedUser.take(10).foreach(println)2.3IDEA下编写程序//计算数据集的总行数valtotalNum=splitRDD.count()//取出用户ID组成RDDvalusers=splitRDD.map(x=>x(1))//用户ID去重valdistinctUsers=users.distinct()//得到用户的数量valuserNum=distinctUsers.count()//打印结果println("平均每个用户搜索次数为:"+totalNum/(userNum+0.0))

/*“URL在返回结果中的排名”与“用户点击的顺序号”实际为空格分隔,而非\t;所以初次分割时,并未将二者分开,这里再次分割,取出“URL在返回结果中的排名”*/valsplitRDD2=splitRDD.map(x=>x(3)).map(x=>x.split("")).map(x=>x(0).trim)/*将“URL在返回结果中的排名”转换成整数,转换过程中可能抛出异常,使用case匹配;这里也可以使用模式匹配,但执行效率可能会下降。*/valrank=splitRDD2.map(x=>{scala.util.Try(x.toInt)match{caseSuccess(_)=>x.toIntcase_=>0}})//使用reduce方法,求出所有“URL在返回结果中的排名”之和rankTotal。valrankTotal=rank.reduce((a,b)=>a+b)//打印结果println("URL在返回结果中平均排名:"+(rankTotal+0.0)/totalNum)}}2.3IDEA下编写程序在IDEA中,运行程序Sogou.scala,得到所示结果;访问数量最多的3个时段分别为16时、21时、20时;访问次数最多的用户ID为“6383499980790535”,该用户当日访问385次;平均每个用户访问的次数为3.32次;URL在返回结果中的平均排名为15.67。2.4运行程序IDEA下用SparkSQL分析疫苗流向数据任务3在IDEA环境下使用SparkSQL技术分析疫苗采购数据。帮助人们了解疫苗流向等相关信息。数据文件为vaccine.csv(置于/home/hadoop/目录下),包括药品名name、来源(国产、进口)scr、生产企业company、销售省份(含直辖市)prov、记录的年份year、数量(单位千)quantity。数据样式如下所示:3.1数据说明对于疫苗流向数据用户需求,现逐条分析如下:(1)找出中标次数最多的5家企业;vaccine.csv文件中,每一行为一个中标记录;将数据转为DataFrame后,通过groupBy操作即可得出各医药公司中标次数,然后排名找出top5。(2)找出中标数量最多的5家企业按照医药公司分组,计算各公司的疫苗的销量,排序后即可找出top5。(3)对于狂犬疫苗,分析各公司的市场份额首先筛选出狂犬疫苗数据(狂犬疫苗数据包含多种数据样式,规格、制法等有所不同,均为狂犬疫苗类别,计算总销量、各公司的销量,进而求出各公司市场份额、排名情况。3.2需求分析(4)分析长生生物医药公司2018年各类疫苗的流向首先筛选出长生生物医药公司2018年的相关数据,然后根据疫苗类型分组统计,得出该公司各类疫苗的流向。3.2需求分析运行Vaccine.scala程序,相应输出如下结果:(1)中标单数前5名:(2)中标疫苗数量(订单量)前5名:3.4运行程序(3)狂犬疫苗占有率情况:3.4运行程序(4)长生医药公司所生产的疫苗流向情况:3.4运行程序使用RDD缓存机制

提升效率任务4对反复使用的RDD数据集进行缓存处理。用RDD的缓存机制提升计算效率。缓存是指将多次使用的数据长时间存储在集群各节点的内存(或磁盘等其他介质)中,以达到“随用随取、减少数据的重复计算”的目的,从而节约计算资源和时间,提升后续动作的执行速度。缓存RDD是RDD持久化方案中的一种,对于迭代算法和快速交互式分析是一个很关键的技术。4.1Spark缓存机制概述(1)为什么要缓存RDD?Spark默认情况下,为了充分利用相对有限的内存资源,RDD并不会长期驻留在内存中;如果内存中的RDD过多,当有新的RDD生成时,会按照以LRU(最近经常使用)算法移除最不常用的RDD,以便腾出空间加入新的RDD。缓存RDD目的是让后续的RDD计算速度加快(通常运行速度会加快10倍),是迭代计算和快速交互的重要工具。4.1Spark缓存机制概述(2)缓存RDD的方法开发人员可以使用RDD的persist或者cache方法记录持久化需求(cache方法可以看做是persist方法的简化版);由于RDD具有惰性计算的特点,调用persist或cache方法后,RDD并不会立即缓存起来,而是等到该RDD首次被施加action操作的时候,才会真正的缓存数据。同时,Spark的缓存也具备一定的容错性:如果RDD的任何一个分区丢失了,Spark将自动根据其原来的血统信息重新计算这个分区。Spark能够自动监控各个节点上缓存使用率,并且以LRU的方式将老数据逐出内存。开发人员也可以手动控制,调用RDD.unpersist()方法可以删除无用的缓存。4.1Spark缓存机制概述(3)缓存的级别每个持久化的RDD可以使用不同的存储级别,比如根据业务需要可以把RDD保存在磁盘上,或者以java序列化对象保存到内存里,或者跨节点多副本,或者使用Tachyon存到虚拟机以外的内存里。这些存储级别都可以由persist()的参数StorageLevel对象来控制。cache()方法本身就是一个使用默认存储级别做持久化的快捷方式,默认存储级别是StorageLevel.MEMORY_ONLY。4.1Spark缓存机制概述4.1Spark缓存机制概述缓存级别如右表所示:对于需要重复使用的RDD,建议开发人员调用persist方法缓存数据。比如,本单元任务2分析Sogou搜索日志数据中,splitRDD被多次调用,可以考虑缓存机制以提升效率。另即使用户没有调用persist,Spark也会自动持久化一些Shuffle操作(如reduceByKey)的中间数据;因为Shuffle操作需要消耗较多计算资源,Spark的自动持久化机制可以避免因某节点失败而重新计算。4.1Spark缓存机制概述如何选择存储级别?Spark的存储级别主要可于在内存使用和CPU占用之间做一些权衡。建议根据以下步骤来选择一个合适的存储级别:①如果RDD能使用默认存储级别(MEMORY_ONLY),则尽量使用默认级别。这是CPU效率最高的方式,所有RDD算子都能以最快的速度运行。②如果RDD不适用默认存储级别(MEMORY_ONLY),可以尝试MEMORY_ONLY_SER级别,并选择一个高效的序列化协议(selectingafastserializationlibrary),这将大大节省数据的存储空间,同时速度也还不错。③尽量不要把数据写到磁盘上,除非数据集重新计算的代价很大或者数据集是从一个很大的数据源中过滤得到的结果。④如果需要支持容错,可以考虑使用带副本的存储级别;虽然所有的存储级别都能够以重算丢失数据的方式来提供容错性,但是带副本的存储级别可以让你的应用持续的运行,而不必等待重算丢失的分区。4.1Spark缓存机制概述接下来,通过实例体验RDD缓存与否带来的计算性能差异;现有数据集user_view.txt记载了用户浏览店铺的日志信息,包括用户ID、店铺ID、时间戳,数据字段间用“\t”分割。(1)要实现的功能在Sparkshell中,统计所有店铺的数量(不重复)、所有用户的数量(不重复)以及所有记录数。(2)数据准备假设user_view.txt文件现位于/home/hadoop目录下,打开一个Linux终端,使用如下命令将该文件上传到HDFS文件系统中。4.2

SparkRDD缓存体验cd/usr/local/hadoop/sbin./start-all.sh//启动hadoop服务,如果服务已经开启,则本步骤可省略。cd/usr/local/hadoop/bin./hdfsdfs-put/home/hadoop/user_view.txt/user/hadoop//文件上传到HDFS(3)代码实现打开一个Linux终端,输入如下命令启动Spark并进入SparkShell环境。4.2

SparkRDD缓存体验cd/usr/local/spark/sbin./start-all.shcd/usr/local/spark/bin./spark-shell--masterlocal[*]在SparkShell环境下,输入以下命令,完成相关统计工作。valpath="hdfs://localhost:9000/user/hadoop/user_view.txt"//读取文件生成RDD,对其元素进行字符串切割后形成键值对RDDvalinput=sc.textFile(path).map(x=>x.split("\t")).map(x=>(x(0),1))//input缓存数据input.cache()//reduceByKey操作,得到(用户ID,访问数量)为元素的RDDvaluser=input.reduceByKey((a,b)=>a+b)//输出用户数量user.count//根据用户访问量进行排序,取前10名user.sortBy(x=>x._2).take(10)//数input中元素的数量(关键点)input.count4.2

SparkRDD缓存体验(4)SparkWebUI中查看结果上述代码执行完毕后,在浏览器中输入localhost:4040进入Spark监控页面,选取Stages可以看到上述代码各阶段执行的时长(受硬件、环境配置等因素影响,显示结果可能会有不同)。4.2

SparkRDD缓存体验在Storage选项卡中,还可以查看缓存情况;StorageLevel为MemoryDeserialized1xReplicated,表明数据缓存在JVM内存中,缓存有1个副本。4.2

SparkRDD缓存体验(5)不使用缓存结果分析为了演示不使用缓存效果,可以先退出Sparkshell后,再次进入;输入如下代码(取消对input的缓存)。valpath="hdfs://localhost:9000/user/hadoop/user_view.txt"valinput=sc.textFile(path).map(x=>x.split("\t")).map(x=>(x(0),1))valuser=input.reduceByKey((a,b)=>a+b)user.countuser.sortBy(x=>x._2).take(10)input.count代码执行完毕后,在浏览器中输入localhost:4040进入Spark监控页面;得到下图所示结果,发现最后的count阶段执行时间边长(增加1S),可见不缓存的情况下执行效率下降了。4.2

SparkRDD缓存体验(6)主动释放缓存对于开发者主动缓存的RDD数据,执行完毕后要予以释放,以腾出内存空间;释放空间可以使用方法RDD.unpersist(true);注意,释放缓存的RDD要找到正确的时机,释放前一般要确保该RDD不会再次频繁使用。4.2

SparkRDD缓存体验认识RDD广播变量和

累加器任务5广播变量和累计器两种机制。广播变量和累计器的基本用法。Spark在默认情况下,当集群的不同节点的多个任务上并行运行一个函数时,函数中使用的变量都会以副本的形式复制到各个机器节点上,如果更新这些变量副本的话,这些更新并不会传回到驱动器(driver)程序;有时候也需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(DriverProgram)之间共享变量。为了满足这些需求,Spark提供了两种类型的变量:广播变量(broadcastvariables)和累加器(accumulators)。广播变量可以实现变量在所有节点的内存之间进行共享;累加器则支持在不同节点之间进行累加计算(如计数或者求和)。5.1共享变量广播变量提供了一种只读的共享变量,它是在每个机器节点上保存一个缓存,而不是每个任务保存一份副本。这样不需要在不同任务之间频繁地通过网络传递数据,从而减少了网络开销,同时也减少了CPU序列化与反序列化的次数。采用广播变量时,通常可以在每个节点上保存一个较大的输入数据集,这要比常规的变量副本更高效(普通变量是每个任务一个副本,而一个节点上可能有多个任务)。5.2广播变量SparkContext提供了broadcast()方法用于创建广播变量,例如对于对于变量v,只需调用SparkContext.broadcast(v)即可得到一个广播变量;这个广播变量是对变量v的一个包装,要访问其值,可以调用广播变量的value方法。代码示例如下:scala>valbroadcastVar=sc.broadcast(Array(1,2,3))broadcastVar:org.apache.spark.broadcast.Broadcast[Array[Int]]=Broadcast(0)scala>broadcastVar.valueres0:Array[Int]=Array(1,2,3)5.2广播变量在某些关联查询场景中,可对一些公共数据进行广播;假设现有(号码段,归属地,运营商)数据,例如(1371001,广州,中国移动);要求对数据(户主姓名,电话号码)进行补全,输出户主姓名、电话号码、归属地、运营商信息;使用广播变量的实现过程如下://构造一个Map:号码段—>(归属地,运营商)scala>valtelephoneDetail=Map("1371001"->("广州","中国移动"),"1371350"->("深圳","中国移动"),"1331847电信"),"1324240"->("深圳","中国联通"))telephoneDetail:scala.collection.immutable.Map[String,(String,String)]=Map(1371001->(广州,中国移动),1371350->(深圳,中国移动),1331847->(珠海,中国电信),1324240->(深圳,中国联通))//调用SparkContext的broadcast方法,将telephoneDetail广播发送scala>valtdBroadCast=sc.broadcast(telephoneDetail)tdBroadCast:org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[String,(String,String)]]=Broadcast(2)5.2广播变量//构建一个包含(电话号码,用户名)的Listscala>valcustomer=List((,"tom"),(,"jerry"))customer:List[(String,String)]=List(tom),jerry))//将customer转换为RDDscala>valcusRDD=sc.parallelize(customer)cusRDD:org.apache.spark.rdd.RDD[(String,String)]=ParallelCollectionRDD[2]atparallelizeat<console>:26//使用广播变量tdBroadCast补全用户信息scala>valcustomerDetail=cusRDD.map(x=>{|valshorttel=x._1.substring(0,7)|valdetail=tdBroadCast.value(shorttel)|(x._1,x._2,detail._1,detail._2)|}|)customerDetail:org.apache.spark.rdd.RDD[(String,String,String,String)]=MapPartitionsRDD[3]atmapat<console>:325.2广播变量scala>customerDetail.collectres4:Array[(String,String,String,String)]=Array(tom,珠海,中国电信),jerry,深圳,中国移动))//释放tdBroadCast广播变量scala>tdBroadCast.unpersist实际业务中,需要广播的数据往往是通过读取数据库表或者读取文件生成,而非示例中手工生成;当广播变量不再使用后,要及时释放。在主流的分布式计算框架中,都存在Spark广播变量类似的应用,其主要目的就是减少数据传递开销及减少对CPU资源的消耗。5.2广播变量广播变量提供了一种只读的共享变量,它是在每个机器节点上保存一个缓存,而不是每个任务保存一份副本。这样不需要在不同任务之间频繁地通过网络传递数据,从而减少了网络开销,同时也减少了CPU序列化与反序列化的次数。采用广播变量时,通常可以在每个节点上保存一个较大的输入数据集,这要比常规的变量副本更高效(普通变量是每个任务一个副本,而一个节点上可能有多个任务)。5.2广播变量累加器是Spark提供的另一种共享变量机制;在Spark中,每一个任务可能会分配到不同节点中执行;在执行过程中,如果需要将多个节点中的数据累加到一个变量中,则可以通过累计器实现,即利用累加器可以实现计数(类似MapReduce中的计数器)或者求和(SUM)。Spark原生支持了数字类型的累加器,开发者也可以自定义新的累加器。如果创建累加器的时设置了名称,则该名称会展示在SparkUI上,有助于了解程序运行处于哪个阶段。5.3累加器开发人员可以调用SparkContext.accumulator(v)可以创建一个累加器,v为累加器的初始值。累加器创建后,可以使用add方法或者+=操作符来进行累加操作。注意:任务本身并不能读取累加器的值,只有驱动器程序可以使用value方法访问累加器的值。以下代码展示了如何使用累加器对一个元素数组求和:scala>valaccum=sc.accumulator(0,"MyAccumulator")scala>valrdd=sc.parallelize(Array(1,2,3,4))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:24scala>rdd.foreach(x=>accum+=x)scala>accumres7:org.apache.spark.Accumulator[Int]=105.3累加器Spark内置了整数累加器、长精度浮点数累加器和几个累加器,上述代码使用的累加器即为整型累加器;开发人员也可以通过继承AccumulatorParam来自定义累加器。AccumulatorParam主要有两个方法:(1)zero,这个方法为累加器提供一个“零值”;(2)addInPlace,将收到的两个参数值进行累加。例如,假设我们需要为Vector提供一个累加机制,那么可能的实现方式如下:objectVectorAccumulatorParamextendsAccumulatorParam[Vector]{defzero(initialValue:Vector):Vector={Vector.zeros(initialValue.size)}defaddInPlace(v1:Vector,v2:Vector):Vector={v1+=v2}}5.3累加器//使用如下方式创建累加器//valvecAccum=sc.accumulator(newVector(...))(VectorAccumulatorParam)对于在action算子中更新的累加器,Spark保证每个任务对累加器的更新只会被应用一次,例如,某些任务如果重启过,则不会再次更新累加器。而如果在transformation算子中更新累加器,那么用户需要注意,一旦某个任务因为失败被重新执行,那么其对累加器的更新可能会实施多次。累加器并不会改变Spark懒惰求值的运算特性。如果在RDD算子中更新累加器,那么其值只会在RDD做action计算的时候被更新一次。因此,在transformation算子(如map)中更新累加器,其值并不能保证一定被更新;如以下代码所示:valaccum=sc.accumulator(0)data.map{x=>accum+=x;f(x)}//这里,accum的值任然是0,因为没有action算子,所以map也不会进行实际的计算5.3累加器理解RDD的依赖关系任务6理解SparkRDD的执行流程及RDD间的依赖关系。新的数据保存方式——检查点机制。SparkRDD的操作分为转换操作和行动操作两大类,由于RDD的不可修改性,需要由旧RDD不断产生新RDD,以供给下次操作使用,直到最后一个RDD经过行动操作后,产生需要的结果并输出;RDD采用了惰性计算机制,即在RDD的执行过程中,真正的计算发生在RDD的“行动”操作时刻,对于“行动”之前的所有“转换”操作,Spark只是记录下“转换”操作使用的部分基础数据集以及RDD生成的轨迹,即RDD之间的依赖关系,而不会触发真正的计算。对于输入数据Input,Spark从逻辑上生成RDD1和RDD2两个RDD,经过一系列“转换”操作,逻辑上生成了RDDn;但上述RDD并未真正生成,他们是逻辑上的数据集,Spark只是记录了RDD之间的生成和依赖关系。当RDDn要进行输出时(执行“行动操作”时),Spark才会根据RDD的依赖关系生成DAG(有向无环图),并从起点开始真正的计算。6.1SparkRDD的执行流程上述处理过程中,RDD之间前后相连,形成了“血缘关系(Lingeage)”,通过血缘关系连接起来的一系列RDD操作就可以实现管道化(pipeline),避免了多次转换操作之间数据同步的等待,而且不用担心有过多的中间数据,因为这些具有血缘关系的操作都管道化了,一个操作得到的结果不需要保存为中间数据,而是直接管道式地流入到下一个操作进行处理。同时,这种通过血缘关系把一系列操作进行管道化连接的设计方式,也使得管道中每次操作的计算变得相对简单,保证了每个操作在处理逻辑上的单一性;在HadoopMapReduce的设计中,为了尽可能地减少MapReduce过程,在单个MapReduce中会写入过多复杂的逻辑。6.1SparkRDD的执行流程RDD的每次转换操作都会产生一个新的RDD,那么前后RDD之间便形成了一定的依赖关系;RDD中的依赖关系分为窄依赖(NarrowDependency)与宽依赖(WideDependency),两种依赖之间的区别如下图所示。6.2

RDD间的依赖关系窄依赖:一个RDD对它的父RDD,只有简单的一对一的依赖关系,也就是说,RDD中的每个partition,仅仅依赖于父RDD中的一个partition,父RDD和子RDD的partition之间是一对一的关系。这种情况下,是简单的RDD之间的依赖关系,也被称之为窄依赖。6.2

RDD间的依赖关系宽依赖:本质就是shuffle,也就是说每一个父RDD中的partition中的数据,都可能会传输一部分到下一个RDD的每一个partition,也就是说,每一个父RDD和子RDD的partition之间,具有交互错杂的关系,那么这种情况就叫做两个RDD之间是宽依赖,同时他们之间发生的操作是shuffle。6.2

RDD间的依赖关系总体而言,如果父RDD的一个分区只被一个子RDD的一个分区所使用就是窄依赖,否则就是宽依赖。窄依赖典型的操作包括map、filter、union等,宽依赖典型的操作包括groupByKey、sortByKey等。6.2

RDD间的依赖关系Spark的这种依赖关系设计,使其具有了天生的容错性,大大加快了Spark的执行速度。相对而言,在两种依赖关系中,窄依赖的失败恢复更为高效,它只需要根据父RDD分区重新计算丢失的分区即可,而且可以并行地在不同节点进行重新计算。对于宽依赖,通常子RDD分区通常来自多个父RDD分区,重新计算的开销较大。上图所示,RDDa、RDDb直接为窄依赖,当RDDb的分区b1丢失时,只需要重新计算父RDDa的a1分区即可。而RDDc、RDDe之间为宽依赖,当RDDe的分区e1丢失时,则需要重新计算RDDe的所有分区,这就产生了冗余计算(c1、c2、c3中对于e2的数据)。6.2

RDD间的依赖关系当Spark集群中某一个节点由于宕机导致数据丢失,可以通过Spark中RDD的容错机制恢复丢失的数据。RDD提供了两种故障恢复方式:血统方式和设置检查点方式。如前所述,血统方式主要是根据RDD之间的依赖关系对丢失数据的RDD进行数据恢复。如果父子RDD间是窄依赖,则只需把父RDD对应分区重新计算即可,不需要依赖于其他节点,计算过程也不会产生冗余计算;若父子RDD间是宽依赖,则需要父RDD的所有分区都要从头到尾计算,计算过程存在冗余。为了解决宽依赖中的计算冗余问题,Spark又提供了另一种数据容错方式——设置检查点方式6.3检查点机制设置检查点方式,本质是通过将RDD写入词磁盘,是为了协助血统做容错辅助。如果血统过长会造成容错成本过高,这样在中间阶段做检查点容错性能更优;检查点机制下,如果检查点后的某节点出现问题而丢失分区,可以直接从检查点的RDD(从磁盘中读取)开始重做Lineage,这样可以减少开销;通常情况下,Spark通过将数据写入到HDFS文件系统实现RDD检查点功能,而HDFS是多副本的高可靠存储(通过多副本

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论