典型的业务场景_第1页
典型的业务场景_第2页
典型的业务场景_第3页
典型的业务场景_第4页
典型的业务场景_第5页
已阅读5页,还剩23页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

典型的业务场景2学习任务了解读文件时避免数据源的数据倾斜了解调整并行度分散同一个Task3知识目标数据倾斜常见种类有哪几种常见数据倾斜的常见特征有哪些数据倾斜解决方案大致分为哪几步01能力目标了解读文件时避免数据源的数据倾斜能够简单叙述调整并行度分散同一个Task的不同Key02学习目标4目录01避免数据源的数据倾斜--读文件02调整并行度分散同一个Task的不同Key5避免数据源的数据倾斜--读文件Spark以通过textFile(path,minPartitions)方法读取文件时,使用TextFileFormat。对于不可切分的文件,每个文件对应一个Split从而对应一个Partition。此时各文件大小是否一致,很大程度上决定了是否存在数据源的数据倾斜。另外,对于不可切分的压缩文件,即使压缩后的文件大小一致,它所包含的实际数据量也可能差别很多,因为源文件数据重复度越高,压缩比越高。此时可通过在数据生成端将不可切分文件存储为可切分文件,或者保证各文件包含数据量相同的方式避免数据倾斜。原理6避免数据源的数据倾斜--读文件对于可切分的文件,每个Split大小由如下算法决定。其中goalSize等于所有文件总大小除以minPartitions。而blockSize,如果是HDFS文件,由文件本身的block大小决定;如果是Linux本地文件,且使用本地模式,由fs.local.block.size决定。原理protectedlongcomputeSplitSize(longgoalSize,longminSize,longblockSize){returnMath.max(minSize,Math.min(goalSize,blockSize));}默认情况下各Split的大小不会太大,一般相当于一个Block大小(在Hadoop2中,默认值为128MB),所以数据倾斜问题不明显。如果出现了严重的数据倾斜,可通过上述参数调整。7避免数据源的数据倾斜--读文件现通过脚本生成一些文本文件,并通过如下代码进行简单的单词计数。为避免Shuffle,只计单词总个数,不须对单词进行分组计数。案例SparkConfsparkConf=newSparkConf().setAppName("ReadFileSkewDemo");JavaSparkContextjavaSparkContext=newJavaSparkContext(sparkConf);longcount=javaSparkContext.textFile(inputFile,minPartitions) .flatMap((Stringline)->Arrays.asList(line.split("")).iterator()).count();System.out.printf("totalwords:%s",count);javaSparkContext.stop();8避免数据源的数据倾斜--读文件总共生成如下11个csv文件,其中10个大小均为271.9MB,另外一个大小为8.5GB。案例9避免数据源的数据倾斜--读文件之后将8.5GB大小的文件使用gzip压缩,压缩后大小仅为25.3MB。案例10避免数据源的数据倾斜--读文件使用如上代码对未压缩文件夹进行单词计数操作。Split大小为max(minSize,min(goalSize,blockSize)=max(1B,min((271.910+8.51024)/1MB,128MB)=128MB,无明显数据倾斜。案例11避免数据源的数据倾斜--读文件使用同样代码对包含压缩文件的文件夹进行同样的单词计数操作。未压缩文件的Split大小仍然为128MB,而压缩文件(gzip压缩)由于不可切分,且大小仅为25.3MB,因此该文件作为一个单独的Split/Partition。虽然该文件相对较小,但是它由8.5GB文件压缩而来,包含数据量是其它未压缩文件的32倍,因此处理该Split/Partition/文件的Task耗时为4.4分钟,远高于其它Task的10秒。运行结果如下图所示。案例12避免数据源的数据倾斜--读文件案例13避免数据源的数据倾斜--读文件由于上述gzip压缩文件大小为25.3MB,小于128MB的Split大小,不能证明gzip压缩文件不可切分。现将minPartitions从默认的1设置为229,从而目标Split大小为max(minSize,min(goalSize,blockSize)=12MB。如果gzip压缩文件可切分,则所有Split/Partition大小都不会远大于12。反之,如果仍然存在25.3MB的Partition,则说明gzip压缩文件确实不可切分,在生成不可切分文件时需要如上文所述保证各文件数量大致相同。案例14避免数据源的数据倾斜--读文件如下图所示,gzip压缩文件对应的Split/Partition大小为25.3MB,其它Split大小均为12MB左右。而该Task耗时4.7分钟,远大于其它Task的4秒。案例15避免数据源的数据倾斜--读文件适用场景:数据源存在不可切分文件,且文件内包含的数据量相差较大。解决方案:尽量使用可切分的格式代替不可切分的格式,或者保证各文件实际包含数据量大致相同。优势:可撤底消除数据源的数据倾斜,效果显著。劣势:数据源一般来源于外部系统,需要外部系统的支持。小结16调整并行度分散同一个Task的不同KeySpark在做Shuffle时,默认使用HashPartitioner(非HashShuffle)对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的Key对应的数据被分配到了同一个Task上,造成该Task所处理的数据远大于其它Task,从而造成数据倾斜。如果调整Shuffle时的并行度,使得原本被分配到同一Task的不同Key发配到不同Task上处理,则可降低原Task所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。原理17调整并行度分散同一个Task的不同Key现有一张测试表,名为student_external,内有10.5亿条数据,每条数据有一个唯一的id值。现从中取出id取值为9亿到10.5亿的共1.5亿条数据,并通过一些处理,使得id为9亿到9.4亿间的所有数据对12取模后余数为8,其它数据集对其id除以100取整,从而使得id大于9.4亿的数据在Shuffle时可被均匀分配到所有Task中,而id小于9.4亿的数据全部分配到同一个Task中。案例18调整并行度分散同一个Task的不同Key处理过程如下:案例INSERTOVERWRITETABLEtestSELECTCASEWHENid<940000000THEN(9500000+(CAST(RAND()*8ASINTEGER))*12)ELSECAST(id/100ASINTEGER)END,NameFROMstudent_externalWHEREidBETWEEN900000000AND1050000000;通过上述处理,一份可能造成后续数据倾斜的测试数据已经准备好。19调整并行度分散同一个Task的不同Key接下来,使用Spark读取该测试数据,并通过groupByKey(12)对id分组处理,且Shuffle并行度为12。代码如下:案例publicclassSparkDataSkew{publicstaticvoidmain(String[]args){SparkSessionsparkSession=SparkSession.builder() .appName("SparkDataSkewTunning") .config("hive.metastore.uris","thrift://hadoop1:9083") .enableHiveSupport() .getOrCreate();Dataset<Row>dataframe=sparkSession.sql("select*fromtest");20调整并行度分散同一个Task的不同Key案例dataframe.toJavaRDD().mapToPair((Rowrow)->newTuple2<Integer,String>(row.getInt(0),row.getString(1))).groupByKey(12).mapToPair((Tuple2<Integer,Iterable<String>>tuple)->{intid=tuple._1();AtomicIntegeratomicInteger=newAtomicInteger(0);tuple._2().forEach((Stringname)->atomicInteger.incrementAndGet()); returnnewTuple2<Integer,Integer>(id,atomicInteger.get());}).count();sparkSession.stop();sparkSession.close();}}21调整并行度分散同一个Task的不同Key本次实验所使用集群节点数为4,每个节点可被Yarn使用的CPU核数为16,内存为16GB。使用如下方式提交上述应用,将启动4个Executor,每个Executor可使用核数为12(该配置并非生产环境下的最优配置,仅用于本文实验),可用内存为12GB。案例spark-submit--queueambari--num-executors4--executor-cores12--executor-memory12g--classcom.jasongj.spark.driver.SparkDataSkew--masteryarn--deploy-modeclientSparkExample-with-dependencies-1.0.jar22调整并行度分散同一个Task的不同Key运行结果如下图所示:案例23调整并行度分散同一个Task的不同KeyGroupByStage的Task状态如上图所示,Task8处理的记录数为4500万,远大于(9倍于)其它11个Task处理的500万记录。而Task8所耗费的时间为38秒,远高于其它11个Task的平均时间(16秒)。整个Stage的时间也为38秒,该时间主要由最慢的Task8决定。在这种情况下,可以通过调整Shuffle并行度,使得原来被分配到同一个Task(即该例中的Task8)的不同Key分配到不同Task,从而降低Task8所需处理的数据量,缓解数据倾斜。案例24调整并行度分散同一个Task的不同Key通过groupByKey(48)将Shuffle并行度调整为48,重新提交到Spark。新的Job的GroupByStage所有Task状态如下图所示。案例25调整并行度分散同一个Task的不同KeyTask8的4500万,降低了75%左右,而其耗时从原来Task8的38秒降到了24秒。在这种场景下,调整并行度,并不意味着一定要增加并行度,也可能是减小并行度。从下图可见,处理记录数最多的Task6所处理的记录数约为1045万,耗时为23秒。处理记录数最少的Task1处理的记录数约为545万,耗时12秒。案例26调整并行度分散同一个Task的不同Key适用场景:大量不同的Key被分配到了相同的Task造成该Task数据量过大。解决方案:调整并行度。一般是增大并行度,但有时如本例减小

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论