版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
接下来,咱们借助一个小例子,来讲一讲广播变量的含义与作用。这个例子和WordCout有关,它可以说是分布式编程里的 oworld了,WordCout就是用来统计文件中全部单词的,你肯定已经非常熟悉了,所以,我们例子中的需求增加了一点难度,我们要对指定列表中给定的单词计数。1234valdict=List(“spark”,valwords==words.filter(word=>.map((_,1)).reduceByKey(_+按照这个需求,同学小A实现了如上的代码,一共有4行,我们逐一来看。第1行在Driver端给定待查单词列表dict;第2行以textFileAPI分布式文件,内容包含一列,的是常见的单词;第3行用列表dict中的单词过滤分布式文件内容,只保留dict中给定的单词;第4行调用reduceByKey对单词进行累加计数。学习过调度系统之后,我们知道,第一行代码定义的dict列表连带后面的3行代码会一同打包到Task里面去。这个时候,Task就像是一架架小飞机,携带着这些“行李”,飞往集群中不同的Executors。对于这些“行李”来说,代码的“负重”较轻,可以忽略不你可能会说:“也还好吧,dict个例子中的并行度是10000,那么,Driver端需要通过网络分发总共10000份dict拷贝。这个时候,集群内所有的Eecutors需要消耗大量内存来这10000份的拷贝,对宝贵的网络和内存资源来说,这已经是一笔不小的浪费了。更何况,如果换做一个更大的数据结构,Task分发所引入的网络与内存开销会更可怕。换句话说,统计计数的业务逻辑还没有开始执行,Spark但是,在着手优化之前,我们不妨先来想,现有的问题是什么,我们要达到的目的是什么。结合刚刚的分析,我们不难发现,WrdCunt的痛点在于,数据结构的分发和受制于并行,并且是以Task为粒度的,因此往往频次过高。痛点明确了,调优的目的也就清晰了,我们需要降低数据结构分发的频次。要达到这个目的,我们首先想到的就是降低并行度。不过,牵一发而动全身,并行度一旦调整,其他与CPU、内存有关的配置项都要跟着适配,这难免把调优变复杂了。实际上,要降低数据结构的分发频次,我们还可以考虑广播变量。广播变量是一种分发机制,它封装目标数据结构,以Executors为粒度去做数据分发。换句话说,在广播变量的工作机制下,数据分发的频次等同于集群中的Executors个数。通常来说,集群中的Executors数量都远远小于Task数量,相差两到三个数量级是常有的事。那么,对于第一版的WordCount实现,如果我们使用广播变量的话,会有哪些代码的改动很简单,主要有两个改动:第一个改动是用broadcast封装dict列表,第二个改动是在dict列表的地方改用broadcast.value替代。12345valdict=List(“spark”,valbc=valwords==words.filter(word=>.map((_,1)).reduceByKey(_+在广播变量的运行机制下,封装成广播变量的数据,由Driver端以Executors为粒度分发,每一个Executors接收到广播变量之后,将其交给BlockManager管理。由于广播变量携带的数据已经通过专门的途径到BlockManager中,因此分发到Executors的Task不需要再携带同样的数据。这个时候,你可以把广播变量想象成一架架货机,专门为Task这些小飞机运送“大件行李”。Driver与每一个Executors之间都开通一条这样的货机航线,统一运载负重较大的“数据行李”。有了货机来帮忙,Task小飞机只需要携带那些负重较轻的代码就好了。等这些Task小飞机在Executors着陆,它们就可以到Executors的公用仓库BlockManager里去提取它们的“大件行李”。总之,在广播变量的机制下,dit列表数据需要分发和的次数锐减。我们假设集群中有20个Eecuors,不过任务并行度还是10000,那么,Drier需要通过网络分发的it列表拷贝就会由原来的10000份减少到20份。同理,集群范围内所有Eeutors需要的it拷贝,也由原来的10000份,减少至20份。这个时候,引入广播变量后的开销只是原来Task分发的1/500!Driver我们来看这样一个例子。在电子商务领域中,开发者往往用事实表来类数据,用维度表来像物品、用户这样的描述性数据。事实表的特点是规模庞大,数据体量随着业务的发展不断地快速增长。维度表的规模要比事实表小很多,数据体量的变化也相对稳定。假设用户维度数据以Parquet文件格式在HDFS文件系统中,业务部门需要我们valuserFile:String=valdf:DataFrame=valbc_df:Broadcast[DataFrame]=首先,我们用ParquetAPIHDFS分布式数据文件生成DataFrame,然后用broadcastDataFrame。从代码上来看,这种实现方式和封装普通变量没有太大差别,它们都调用了broadcastAPI,只是传入的参数不同。DriverDriver到各个Executors,再让Executors把数据缓存到BlockManager就好了。与普通变量相比,分布式数据集的数据源不在Driver端,而是来自所有的Executors。Executors中的每个分布式任务负责生产全量数据集的一部分,也就是图中不同的数据分区。因此,步骤1就是Driver从所有的Executors拉取这些数据分区,然后在本地构建全量数据。步骤2与从普通变量创建广播变量的过程类似。Driver把汇总好的全量数据分发给各个Executors,Executors将接收到的全量数据缓存到系统的BlockManager中。不难发现,相比从普通变量创建广播变量,从分布式数据集创建广播变量的网络开销更大。原因主要有二:一是,前者比后者多了一步网络通信;二是,前者的数据体量通常比后者大很多。你可能会问:“Driver从Executors拉取DataFrame的数据分片,揉成一份全量数据,然后再广去,抛开网络开销不说,来来回回得费这么大劲,图啥呢?”这是一个好问题,因为以广播变量的形式缓存分布式数据集,正是克制Shuffle锏。Shuffle为什么这么说呢?我还是拿电子商务场景举例。有了用户的数据之后,为了分析不同用户的购物习惯,业务部门要求我们对表和用户表进行数据关联。这样的数据关联需求在数据分析领域还是相当普遍的。valtransactionsDF:DataFrame=valuserDF:DataFrame=transactionsDF.join(userDF,Seq(“userID”),因为需求非常明确,同学小A立即调用Parquet数据源API,分布式文件,创表和用户表的DataFrame,然后调用DataFrame的Join方法,以userID作为Joinkeys,用内关联(InnerJoin)的方式完成了两表的数据关联。在分布式环境中,表和用户表想要以userID为Joinkeys进行关联,就必须要确保一个前提:记录和与之对应的用户信息在同一个Executors内。也就是说,如果用户黄小乙的购物信息都在Executor0,而个人属性信息缓存在Executor2,那么,在分布在不进行任何调优的情况下,Spark默认采用ShuffleJoin的方式来做到这一点。ShuffleJoin的过程主要有两步。第一步就是对参与关联的左右表分别进行Shuffle,Shuffle的分区规则是先对Joinkeys计算哈希值,再把哈希值对分区数取模。由于左右表的分区数是一致的,因此Shuffle过后,一定能够保证userID相同的记录和用户数据坐落在同一个Executors内。ShuffleJoinhffle完成之后,第二步就是在同一个Excutrs内,Rducetask就可以对userID一致的记录进行关联操作。但是,由于表是事实表,数据体量异常庞大,对TB级别的数据进行hffle,想想都觉得可怕!因此,上面对两个DaaFrame直接关联的代码,还有很大的调优空间。我们该怎么做呢?话句话说,对于分布式环境中的数据关联来说,要想确保记录和与之对应的用户信息在同一个Eeutors中,我们有没有其他办法呢?克制Shuffle代代123456importvaltransactionsDF:DataFrame=_valuserDF:DataFrame=_valbcUserDF=77transactionsDF.join(bcUserDF,Seq(“userID”),8Drier从所有Eecutors收集erDF所属的所有数据分片,在本地汇总用户数据,然后给每一个Eecutors都发送一份全量数据的拷贝。既然每个Eeutors都有userDF的全量数据,这个时候,表的数据分区待在原地、保持不动,就可以轻松地关联到一致的用户数据。如此一来,我们不需要对数据体量巨大的表进行Shuffle,同样可以在分布式环境中,完成两张表的数据关联。BroadcastJoin将小表广播,避免大表利用广播变量,我们成功地避免了海量数据在集群内的、分发,节省了原本由ufle引入的磁盘和网络开销,大幅提升运行时执行性能。当然,采用广播变量优化也是有成本的,毕竟广播变量的创建和分发,也是会带来网络开销的。但是,相比大表的全网分发,小表的网络开销几乎可以忽略不计。这种小投入、大产出,用极小的成本去博取高额的性能收益,真可以说是“四两拨千斤”!在数据关联场景中,广播变量是克制Shuffle的锏。掌握了它,我们就能以极小的成第一种,从普通变量创建广播变量。在广播变量的运行机制下,普通变量的数据封装成广播变量,由Driver端以Executors为粒度进行分发,每一个Executors接收到广播变量之后,将其交由BlockManager管理。Driver需要从所有的ExecutorsDriverExecutors,Executors存到系统的BlockManager中。ShuffleJoinsBroadcastJoins,就可以用小表广播来代替大表的全网分发,真正做到克制Shuffle。Spark广播机制现有的实现方式是存在隐患的,在数据量较大的情况下,Driver可能会成为瓶颈,你能想到更好的方式来重新实现Spark的广播机制吗?(提示:在什么情况下,不适合把ShuffleJoins转换为BroadcastJoins? 不得售卖。页面已增加防盗追踪,将依法其上一 11|Shuffle的工作原理:为什么说Shuffle是一时无两的性能下一 13|广播变量(二):有哪些途径让SparkSQL选择Broadcast络分发多次,已经远超出了shufflejoin需要传输的数据作者回复:9👍哈~但是,据我观察,这部分代码尚未生效。细节可以参考这个ticket:【Executorsidebroadcastforbroadcastjoins】,看上去还是进先是调用 其中 newOutOfMemoryError("Notenoughmemorytobuildandbroadcastthetabletoall"+"workernodes.Asaworkaround,youcaneitherdisablebroadcastbysetting"+s"${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key}to-1orincreasethespark"+s"drivermemorybysetting${SparkLauncher.DRIVER_MEMORY}toahighervalue.").initCause(oe.ge2 3广播join不是默认开启的吗,好像小表默认10M;还需像文中代码valbcUserDFbroahlerD相的录和用数据坐在同一个ors内。”opD的分数、或者说并行度,确实是由D文件系统决定的;但是,he过后,每个分布式数据集的并行度,就由数pr..hl.priions来决,这个咱在配置哪一讲哟~果你没有手工用riion或是olehle过后(Re阶段)默认确实是开启的,默认值确实也是10MB,但是,这个10MB太太太太太太(xN)小了!12021-05-orpr.o.rs给or指定的rororor理能力其实的地方于,不的者、不同上下文并发和行这两词,所以,回你的问,其实什么不康的~100并行度意味着000个区的分式数据集,这应该不见到。外10个的集群其实也不小了~不过你的200有到,不知是200行度,是 集群发。如果是 集群发的话这个和100r不上。意味着的每个ore要20线程,哈,目前没有这给力的P
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 《说课:直线与圆问题研究》课件
- 《静脉治疗行标解读》课件
- 汽车维护与保养 任务工单2 起动系统检查及起动机更换
- 【深度报告】综合能源服务从配电运维开始
- 湖南省邵阳市邵阳县2023-2024学年八年级上学期期末数学试题
- 《生物成分化学》课件
- 【课件】我所理解的技术教育
- 校园物理科备课组个人工作计划
- 高一教师学年教学计划参考
- 餐厅主管工作计划
- 智慧树知到《药用植物学》章节测试答案
- 乙烯裂解汽油加氢装置设计
- 微处理器系统结构与嵌入式系统设计第二版答案全
- 环式快开盲板技术说明书
- 南极洲[世界地理]
- 华为项目管理规范
- 最新投标书密封条
- JJG596-2012《电子式交流电能表检定规程》
- 内镜中心应急预案
- 别拿常识不当干粮
- 通信线路架空光缆通用图纸指导
评论
0/150
提交评论