2剖析数据倾斜问题与企业级解决方案教程_第1页
2剖析数据倾斜问题与企业级解决方案教程_第2页
2剖析数据倾斜问题与企业级解决方案教程_第3页
2剖析数据倾斜问题与企业级解决方案教程_第4页
2剖析数据倾斜问题与企业级解决方案教程_第5页
已阅读5页,还剩7页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

下载Java2极速入门

徐老师·更新于2020-08- 上一节1剖析小文件问 3YARN实战下一我们知道MaRedce是分为Map阶段和Reduce阶段,其实提高执行效率就是提高这两个阶段的执行效率默认情况下Map阶段中Map任务的个数是和数据的InptSpit相关的,IntSplt的个数一般是和lck块是有关联的,所以可以认为Map任务的个数和数据的block块个数有关系,针对Map任务的个数我们一般是不需要干预的,除非是前面我们说的海量小文件,那个时候可以考虑把小文件合并成大文件。其他情况是不需要调整的,那就剩下Reduce阶段了,咱们前面说过,默认情况下reduce的个数是1个,所以现在MapRdce任务的压力就集中在Reduce阶段了,如果说数据量比较大的时候,一个reduce任务处理起来肯定是比较慢的,所以我们可以考虑增加reduce任务的个数,这样就可以实现数据分流了,提高计算效率了。job.getPartitionerClass方法看到默认情况下会使用HashPartitioner这个分区类/**Partitionkeysbytheir{@linkObject#hashCode()}.publicclassHashPartitioner<K,V>extendsPartitioner<K,V>5/**Use{@linkObject#hashCode()}topartition.publicintgetPartition(Kkey,VintnumReduceTasks)return(key.hashCode()&Integer.MAX_VALUE)% HashPartitioner继承了Partitioner,这里面其实就一个方法,getPartition,其实map里面每一条 (key.hashCode()&Integer.MAX_VALUE)%numReduceTasks的值,这个值默认是1job.getNumReduceTasks()可知。所以最终任何值%1都返回0,那也就意味着他们都在0号分区,也就只有这一个分区。numReduceTasks的数目调大即可,这个其实就是reduce增加redcue任务个数在一定场景下是可以提高效率的,但是在一些特殊场景下单纯增加reduce任务个数是无法达到质的提升的。其实在私底下我们是知道这份数据的大致情况的,这里面这1000w条数据,值为5的数据有910w条左右,剩下的9个数字一共只有90w条,那也就意味着,这份数据中,值为5的数据比较集中,或者说值为5的数据属于倾斜的数据,在这一整份数据中,它占得比重比其他的数据多得多。假设这100W条数据的文件有3个block,会产生3个Inptplt,最终会产生3个Map任务,默认情况下只有一个reduce任务,所以所有的数据都会让这一个reduce任务处理,这样这个Reduce压力肯定很大,大量的时间都消耗在了这里那根据我们前面的分析,我们可以增加reduce任务的数量,看下面这张图,我们把reduce任务的数量调整到10个,这个时候就会把1000w条数据让这10个reduce任务并行处理了,这个时候效率肯定会有一定的提升,但是最后我们会发现,性能提升是有限的,并没有达到质的提升,那这是为什么呢?我们来分析一下,刚才我们说了我们这份数据中,值为5的数据有910w条,这就占了整份数据的90%了,那这90%的数据会被一个reduce任务处理,在这里假设是让reduce5处理了,reduce5这个任务执行的是比较慢的,其他reduce任务都执行结束很长时间了,它还没执行结束,因为reduce5中处理的数据量和其他reduce中处理的数据量规模相差太大了,所以最终reduce5拖了后腿。咱们mapreduce任务执行消耗的时间是一直统计到最后一个执行结束的reduce任务,所以就算其他reduce任务早都执行结束了也没有用,整个mapreduce任务是没有执行结束的。MapRdce程序执行时,Reduce节点大部分执行完毕,但是有一个或者几个Reduce节点运行很慢,导致整个程序处理时间变得很长增加reduce任务个数,这个属于治标不治本,针对倾斜不是太严重的数据是可以解决问题的,针对倾斜严重的数据,这样是解决不了根本问题的1[root@bigdata01soft]#2total3drwxr-xr-x.91001244Apr2620:34hadoop- -rw-r--r--.1rootroot345625475Jul192019hadoop- -rw-r--r--.1rootroot1860100000Apr2721:58 drwxr-xr-x. 10 245Dec162018-rw-r--r--.1rootroot194042837Apr623:14jdk-8u202-linux--rw-r--r--.1rootroot147616384Apr2716:22-rw-r--r--.1rootroot147976384Apr2716:22这个文件有点大,在wndws本地无法打开,在这里我们去一条数据看一下数据格式,前面是一个数字,后面是一行日志,这个数据是我自己造的,我们主要是使用前面的这个数字,后面的内容主要是为了充数的,要不然文件太小,测试不出来效果。后面我们解析数据的时候只获取前面这个数字即可,前面这个数字是1-10之间的数字[root@bigdata01soft]#tail-110INFOmainorg.apache.hadoop.mapreduce.lib.output.FileOutputCommitter:[root@bigdata01soft]#hdfsdfs-puthello_10000000.dat[root@bigdata01soft]#hdfsdfs-ls 2rootsupergroup18601000002020-04-2722:01下面我们来具体跑一个这份数据,首先复制一份WordCountJob的代码,新的类名为1package2importimportimportimportimportimportimportimportimportimportimport15import17*数据倾斜-增加Reduce**CreatedbypublicclassWordCountJobSkew *MappublicstaticclassMyMapperextendsMapper<LongWritable,Loggerlogger=*需要实现map*这个map函数就是可以接收<k1,v1>,产生*@param*@param*@param*@throws*@throwsprotectedvoidmap(LongWritablek1,Textv1,ContextthrowsIOException,InterruptedException//k1代表的是每一行数据的行首偏移量,v1String[]words=v1.toString().split("Textk2=newLongWritablev2=new *ReducepublicstaticclassMyReducerextendsLoggerlogger=

@param@param@param@throws@throwsprotectedvoidreduce(Textk2,Iterable<LongWritable>v2s,ContextcothrowsIOException,InterruptedException{longsum=0L;for(LongWritablev2v2s){sum+=v2.get();if(sum%200==0){}}Textk3k2;LongWritablev3=new}}*组装publicstaticvoidmain(String[]args){}Configurationconf=newJobjob=}catch(Exception}}} [root@bigdata01hadoop-3.2.0]#hadoopjardb_hadoop-1.0-SNAPSHOT-jar-with- [root@bigdata01hadoop-3.2.0]#hdfsdfs-cat21324354657687989Elapsed:2mins,46sec[root@bigdata01[root@bigdata01hadoop-3.2.0]#hdfsdfs-rm-r[root@bigdata01hadoop-3.2.0]#hadoopjardb_hadoop-1.0-SNAPSHOT-jar-with-Elapsed:2mins,这里由于有0个reduce,所以一共有1行,在这我们截取了一部分,其中这里面有一个reduce任务消耗的时间比较长,其他reduce任务的执行时间都是~秒,这个reduce任务的执行时间是分2秒,那就意味着值为的那9w数据进入到这个ece了,所以它执行的比较慢。针对这个操作我们需要去修改代码,在这里我们再重新复制一个类,基于WordCountJobSkew1package2importimportimportimportimportimportimportimportimportimportimportimportimport *CreatedbypublicclassWordCountJobSkewRandKey *MappublicstaticclassMyMapperextendsMapper<LongWritable,Loggerlogger=Randomrandom=new*需要实现map*这个map函数就是可以接收<k1,v1>,产生*@param*@param*@param*@throws*@throwsprotectedvoidmap(LongWritablek1,Textv1,ContextthrowsIOException,InterruptedException//k1代表的是每一行数据的行首偏移量,v1String[]words=v1.toString().split("Stringkey=key= Textk2=newLongWritablev2=new *ReducepublicstaticclassMyReducerextendsLoggerlogger=*针对<k2,{v2...}>的数据进行累加求和,并且最终把数据转化为k3,v3*@param*@param*@param*@throws*@throwsprotectedvoidreduce(Textk2,Iterable<LongWritable>v2s,ContextthrowsIOException,InterruptedExceptionlongsum=for(LongWritablev2:sum+=if(sum%200}}Textk3=LongWritablev3=new}}

*组装publicstaticvoidmain(String[]{}Configurationconf=newJobjob=FileInputFormat.setInputPaths(job,newPath(args[0]));}

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论