离线计算系统-第8天mapreduce详解_第1页
离线计算系统-第8天mapreduce详解_第2页
离线计算系统-第8天mapreduce详解_第3页
离线计算系统-第8天mapreduce详解_第4页
离线计算系统-第8天mapreduce详解_第5页
免费预览已结束,剩余47页可下载查看

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

课程大纲(MAPREDUCE详解 MAPREDUCE程序运行演 MAPREDUCE示例编写及编程规 wordcount示例编 MAPREDUCE程序运行模式及debug方 Mapreduce程序的运行机 概 mr程序运行流 Maptask实例数的决定机 maptask数量的决定机 MAPREDUCE中的 MAPREDUCE中的序列 概 Jdk序列化和MR序列化之间的比 自定义对象实现MR中的序列化接 Mapreduce中的排序初 需求 分 实 Mapreduce中的分区 需求 分 实 mapreduce的shuffle机 概述 详细流 mapreduce数据压 概 MR支持的压缩编 Reducer输出压 Mapper输出压 MapReduce与 MapReduce编程案 reduce端join算法实 map端join算法实 web日志预处 附:Mapreduce参数优 1资源相关参 本地运行mapreduce作 课程大纲(MAPREDUCE详解MapReducemap、reduceMapreudceMapreduceMapreducedebugMapReduceMapreduce程序的机MapReduceMapReduceMapReduceMapreduceMapreduceyarnMapreduceMapreducemapreduce分布式运算框架的编程思想mapreduce常用算法的编程套路mapreduceMapreduce是一个分布式运算程序的编程框架hadoop 框架Mapreduce 式运算程序,并发运行在一个hadoop集群上;为什么wordcount122个阶段(一个阶段独立并发,一个阶段汇聚34、程序如何分配运算任务(切片56mapreduce11、MRAppMaster(mapreduceapplication2、3、MAPREDUCE框架结构及运行机结mapreduce2、mapTaskmap阶段的整个数据处理流程MR程序运行流流程示意流程解1、一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次jobmaptask实例数量,然后向集群申请机器启动相应数量maptask进程2maptask利用客户指定的inputformat来获取RecordReader数据,形成输入KVKVmap()map()方法输出的KV对收集到缓存KVK3、MRAppMaster到所有maptask进程任务完成之后会根据客户指定的参数启动相应reducetaskreducetask进程要处理的数据范围(数据分区)4ReducetaskMRAppMaster告知的待处理数据所在位置,从若干台maptaskmaptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,KVoutputformat将结果数据输出到外部MapTask并行度决定机maptaskmapjob的处理速度那么,mapTask并行实例是否越多越好呢?其并行度又是如何决定呢?mapTask并行度的决定机jobmapjob时决定map阶段并行度的规划的基本逻辑为:多个split),然后每一个split分配一个mapTask并行实例处理FileInputFormatgetSplits()方法完成,其FileInputFormat切片机1、切片定义在InputFormat类中的getSplit()方2、FileInputFormat中默认的切片机blockFileInputFormatfile1.txt.split1--file1.txt.split1--file1.txt.split2--128~256file1.txt.split3--256~320file2.txt.split1--0~10M3、FileInputFormat中切片的大小的参数配通过分析源码,在FileInputFormat中,计算切片大小的逻辑:Math.max(minSize,Math.min(maxSize,blockSize)); 因此,默认情况下,切片大小blocksizeminsize(切片最小值blockSizeblocksize12、运算任务的类型:CPUIO3map并行度的经验之2*12core+64Gmap20-100个map,最好每map的执行时间至少一分钟。jobmap或者reducetask30-40jobmap或者reducetask(map|reduce)setup和加入到调度器中进行调度,这个中间的过程可能都要花费几秒钟,所以如果每个task都非常快就跑完了,就会在task的开始和结束的时候浪费太多的时间。taskJVM(mapred.job.reuse.jvm.num.tasks,1JVM数目(Job)1task256MB512MBReduceTask并行度的决reducetaskjobmaptask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:reduce注意:reducetask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1reducetaskreducetaskjobrducereduce持平,或者比集群的reduceslotsMAPREDUCE程序运行演Hadoophadoop-mapreduce-example-2.4.1.jarjarMR示例程序,可以通过以下步骤运行:hdfs,yarn然后在集群中的任意一台服务器上启动执行程序(hadoopjarhadoop-mapreduce-example-2.4.1.jar MAPREDUCE实践篇MAPREDUCE示例编写及编程规编程规用户编写的程序分成三个部分:Mapper,Reducer,Driver(提交运行mr程序的客户端MapperKV对的形式(KV的类型可自定义MapperKV对的形式(KV的类型可自定义Mappermap()ReducerMapperReducerreduce()Reducetaskk的<k,v>reduce()MapperReducerDrvierjob1.7.2wordcount示例编mapper//////keyin: valuein://map方法的生命周期://key://value://value: InterruptedException{Stringline=value.toString();String[]wordsline.splitfor(Stringword:words){}}}reducer////生命周期:框架每传递进来一个kv组,reduceprotectedvoidreduce(Textkey,Iterable<IntWritable>values,Contextcontext)throwsIOException,InterruptedException{intcount=0;//遍历这一组kv的所有v,累加到countcount+=value.get();}}}}定义一个主类,job并提交publicpublicclassWordCountRunner里……)job对象//jobpublicstaticvoidmain(String[]args)throwsException{Configurationconf=newConfiguration();Jobwcjob=//job所在的jar//设 的业务逻辑Mapper类的输出key和value的数据类//设 的业务逻辑Reducer类的输出key和value的数据类//// ////向yarnbooleanres= }MAPREDUCE程序运行模本地运行模mapreduceLocalJobRunnerhdfs(mrdebugeclipsewindowswindows%HADOOP_HOME% 并且要将d:/hadoop-2.6.1的lib和 替换成windows平台编译的版集群运行模mapreduceyarn集群resourcemanagerhdfsAJARhadoopBlinuxeclipsemain(项目中要带参数:/r/=yarn以及yarn的两个基本配置CwindowseclipsejobYarnRunnermapreduce附:在windows平台问hadoop时改变自身标识的方法之二MAPREDUCEcombiner是MRMapperReducercombinercombinerreducer的区别在于运行的位置:Combinermaptask所在的节点运行ReducerMapper的输出结果;combinermaptask的输出进行局部汇总,以减小网络传输量1combinerReducerreduce2、在job中设置 combiner而且,combinerkvreducerkvMAPREDUCE原理篇mapreduceshuffle机概述mapreduce中,mapreducemapreduce框架中最shuffle;shuffle:洗牌、发牌——(机制:数据分区,排序,缓存中,对数据key进行了分区和排序;主要流程Shuffleshuffle是MRmaptask和reducetask3个操作:12、Sortkey3、Combinervalue详细流1、maptask收集的map()方法输出的kv对,放到内存缓冲区234partitonerkey进行排5reducetaskmaptask6reducetaskmaptask的结果文件,reducetask会将这Shufflemapreduce程序的执行效率,原则上说,缓冲区越大,磁io的次数越少,执行速度就越快缓冲区的大小可以通过参数调整 参数 默认详细流程示意概Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多所以,hadoop自己开发了一套序列化机制(Writable),Jdk序列化和MR序列化之间的比publicpublicclassTestSeripublicstaticvoidmain(String[]args)throwsException//ByteArrayOutputStream,用来接收不同序列化机制的序列化结果ByteArrayOutputStreamba=newByteArrayOutputStream();ByteArrayOutputStreamba2=new//DataOutputStreamjdkDataOutputStreamdout=newDataOutputStreamdout2=newObjectOutputStreamobout=new//beanItemBeanSeritemBeanSer=newItemBeanSer(1000L,89.9f);ItemBeanitemBean=newItemBean(1000L,89.9f);//StringTextTextatext=new//byte[]byteArray=for(byteb:byteArray){} Stringastr=//byte[]byteArray2=ba2.toByteArray();for(byteb:byteArray2){}}}自定义对象实现MR中的序列化接beankeycomparableshufflekey进行排序,bean *反序列化的方法,反序列化时,从流中publicvoidreadFields(DataInputin)throwsIOExceptionupflow=dflow=in.readLong();sumflow=in.readLong();}*publicvoidwrite(DataOutputout)throwsIOException 的}publicintcompareTo(FlowBeano)}MapReducemapreduce等运算程序则相当于运行于操作系统之上的应用程序YARN的重要概1yarn2yarn只提供运算资源的调度(yarn申请资源,yarn就负责分配资源3yarn4yarn序,spark程序,tez……6spark、stormyarn上运行,只要他们各自的框架中yarn规范的资源请求机制即可7Yarn就成为一个通用的资源调度平台,从此,企业中以前存在的各种运算集群都可Yarn中运行运算程序的示mapreduceMAPREDUCE实践篇Mapreduce中的排序初需00-FD-07-A4-72-5C-0E-8B-C7-F1-40020-10-7A-28-CC-245C-0E-8B-8B-B1-400分beanbeanmapkeyMR程序在处理数据的过程中会对数据排序(mapkvreduce之前,会排序),mapkey 实1publicpublicclassFlowBean longlongdownflow;longsumflow;publicFlowBean(){}publicFlowBean(longupflow,longdownflow){this.upflow=this.downflow=this.sumflow=upflow+}publiclonggetSumflow(){returnsumflow;}publicvoidsetSumflow(longsumflow){this.sumflow=sumflow;}publiclonggetUpflow(){returnupflow;}publicvoidsetUpflow(longupflow){this.upflow=upflow;}publiclonggetDownflow(){returndownflow;}publicvoidsetDownflow(longdownflow){this.downflow=downflow;}publicvoidwrite(DataOutputout)throwsIOException } 各个字段信publicvoidreadFields(DataInputin)throwsIOException{upflow=in.readLong();downflow=in.readLong();sumflow=in.readLong();}publicStringtoString()returnupflow+"\t"+downflow+"\t"+}publicintcompareTo(FlowBeano)returnsumflow>o.getSumflow()?-}}2mapperpublicpublicclassFlowCountprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{Stringline=value.toString();String[]fields=line.split("\t");try{Stringphonenbr=longupflow=}}longdflow=FlowBeanflowBean=newFlowBean(upflow,}catch(Exceptione)}}}protectedvoidreduce(FlowBeanbean,Iterable<Text>phonenbr,Contextcontext)throwsIOException,InterruptedException{TextphoneNbr=}publicstaticvoidmain(String[]args)throwsExceptionConfigurationconf=newJobjob=//FileInputFormat.setInputPaths(job,ne FileOutputFormat.setOutputPath(job,ne }Mapreduce中的分区需分Mapreducemapkvkeykeyhashcode%reducetaskCustomPartitionerjobpartitioner实定义自己的从map到reduce之间的数据(分组)分发规则按 号所属的省份来分发(分组默认的分组组件是staticstaticHashMap<String,Integer>provinceMap=newHashMap<String,static}returncode==null?5:}}概mapreduce的一种优化策略:通过压缩编码对mapperreducer的输出进行压缩,以减少磁盘IO,MR程序运行速度(cpu运算负担)1MapreducemapreduceIO或23IOjob,多用压缩MR支持的压缩编Reducer输出压reduce1 2JobJobjob=pressOutput(job, Mapper输出压reduce1 2PRESS,压缩文件Hadoop自带的InputFormat类内置支持压缩文件的,比如TextInputformat类,在initializepublicpublicvoidinitialize(InputSplitFileSplitsplit=(FileSplit)genericSplit;Configurationjob=start=split.getStart();end=start+split.getLength();finalPathfile=//openthefileandseektothestartofthesplitfinalFileSystemfs=file.getFileSystem(job);fileIn=//根据文件后缀名创建相应压缩编码的CompressionCodeccodec=newCompressionCodecFactory(job).getCodec(file);if(null!=codec){pressedInput=pressor //if(codecinstanceof pressionCodec){ pressionInputStreamcIn= pressor,start,end,//如果是可切片压缩编码,则创建一个 压缩数in=newCompressedSplitLineReader(cIn,job,start=cIn.getAdjustedStart();end=cIn.getAdjustedEnd();filePosition=cIn;}else//如果是不可切片压缩编码,则创建一个SplitLineReader 解压数据流传递给普通SplitLineReaderinin=newSplitLineReader(codec.createInputStream(fileIn,pressor),job,this.recordDelimiterBytes);filePosition=}}else//如果不是压缩文件,则创建普通SplitLineReader in=newSplitLineReader(fileIn,job,this.recordDelimiterBytes);filePosition=fileIn;}4.4.MapReduce编程案reducejoin算法实123323SQL查询运算:selectselect te,,b.category_id,b.pricefromt_orderajoint_productbona.pid=2reducetaskreduce中进行数据的串联publicpublicclassOrderJoinprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//Stringline=String[]fields=//拿到Stringitemid=//获取到这一行所在的文件名(通过inpusplit)Stringname="你拿到的文件名";//根据文件名,切分出各字段(ab3个字段OrderJoinBeanbean=newOrderJoinBean();bean.set(null,null,null,null,null);context.write(newText(itemid),bean);}}protectedvoidreduce(Textkey,Iterable<OrderJoinBean>beans,Contextcontext)throwsIOException,InterruptedException{//拿到的key是某一个itemid,//拿到的beans是来自于两类文件的//{1000,amount}{1000,amount} -- //将来自于b文件的bean里面的字段,跟来自于a的所有bean}}}缺点:这种方式中,joinreduce阶段完成,reduce端的处理压力太大,map节reduce阶段极易产生数据倾斜mapjoinmapjoin算法实1map节点,这样,mapjoinjoin2--mapperpublicpublicclassTestDistributedCacheFileReaderin=null;BufferedReaderreader=HashMap<String,String>b_tab=newHashMap<String,String>();Stringlocalpath=null;Stringuirpath=//mapprotectedvoidsetup(Contextcontext)throwsIOException,InterruptedException//cachefilePath[]files=context.getLocalCacheFiles();localpath=files[0].toString();URI[]cacheFiles=//IO//maptaskin=newreader=newBufferedReader(in);Stringline=null;String[]fields=line.split(",");}}protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//maptask所负责的那一个切片数据(hdfs上String[]fields=Stringa_itemid=fields[0];Stringa_amount=fields[1];Stringb_name=//输出结 98.9context.write(newText(a_itemid),newText(a_amount+"\t"+":"+localpath+"\t"+b_name));}}publicstaticvoidmain(String[]args)throwsExceptionConfigurationconf=newConfiguration();Jobjob=Job.getInstance(conf);//这里 正常的需要处理的数据所在路FileInputFormat.setInputPaths(job,ne FileOutputFormat.setOutputPath(job,ne ////task//task //jartaskclasspath}}web日志预处1对web日志中的各字段识别切分根据KPI统计需求,生成各类请求过滤数2publicpublicclassWebLogBeanprivateStringremote_addr;//ipprivateStringremote_user;//记录客户端用户名称,忽略属性"-"privateStringtime_local;//记录 privateStringrequest;//url与httpprivateStringstatus;//privateStringbody_bytes_sent;//privateStringhttp_referer;//privateStringhttp_user_agent;//privatebooleanvalidtrue;//publicStringgetRemote_addr(){returnremote_addr;}publicvoidsetRemote_addr(Stringremote_addr){this.remote_addr=remote_addr;}publicStringgetRemote_user(){returnremote_user;}publicvoidsetRemote_user(Stringremote_user){this.remote_user=remote_user;}publicStringgetTime_local(){returntime_local;}publicvoidsetTime_local(Stringtime_local){this.time_local=time_local;}publicStringgetRequest(){returnrequest;}publicvoidsetRequest(Stringrequest){this.request=request;}publicStringgetStatus(){returnstatus;}publicvoidsetStatus(

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论