离线计算系统-第3天mapreduce加强_第1页
离线计算系统-第3天mapreduce加强_第2页
离线计算系统-第3天mapreduce加强_第3页
离线计算系统-第3天mapreduce加强_第4页
离线计算系统-第3天mapreduce加强_第5页
已阅读5页,还剩30页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

课程大纲(MAPREDUCE详解MapReduceMapreudceMapreduceMapreducedebugMapReduceMapreduce程序的机MapReduceMapReduceMapreduceMapreduceyarnMapreduceMapreducemapreduce分布式运算框架的编程思想mapreduce常用算法的编程套路流量统计相关需:自定义javaBeanmapreduce中充当javaBeanWritablepublicvoidwrite(DataOutputout)throwsIOException }publicvoidreadFields(DataInputin)throwsIOException{upflow=in.readLong();downflow=sumflow=}mapreducejobmapreducejobjobjob要将flowBean作为map的key输出,这样mapreduce就会自动排序此时,flowBean要实现接口 Stringprefix=key.toString().substring(0,3);IntegerpartNum=pmap.get(prefix);return}Stringprefix=key.toString().substring(0,3);IntegerpartNum=pmap.get(prefix);return}注意:如果reduceTask的数量>=getPartition的结果数,则会多产生几个空的输出文件part-r-如 1<reduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会社交粉丝数据分 读一读一 在读一 输 <A-<A-<A-<B-<B-读入数据<A-B,C><A-B,F><A-输出:A- 倒排索引建无论hdfs还是mapreduce,对于小文件都有损效率,实践中,又难免处理大量小文件 publicpublicclassWholeFileInputFormat//设置每个小文件不可分片,保证一个小文件生成一个key-valueprotectedbooleanisSplitable(JobContextcontext,Pathfile){returnfalse;}InputSplitsplit,TaskAttemptContextcontext)throwsIOException,InterruptedException{WholeFileRecordReaderreader=newWholeFileRecordReader();reader.initialize(split,context);return}}privateFileSplitprivateBytesWritablevalue=newBytesWritable();privatebooleanprocessed=false;publicvoidinitialize(InputSplitsplit,TaskAttemptContextcontext)throwsIOException,InterruptedException{this.fileSplit=(FileSplit)split;this.conf=context.getConfiguration();}publicbooleannextKeyValue()throwsIOException,InterruptedException{if(!processed){byte[]contents=newbyte[(int)fileSplit.getLength()];Pathfile=fileSplit.getPath();FileSystemfs=file.getFileSystem(conf);FSDataInputStreamin=null;tryin=IOUtils.readFully(in,contents,0,contents.length);value.set(contents,0,contents.length);}finally}processed=true;returntrue;}returnreturn}publicNullWritablegetCurrentKey()throwsIOException,InterruptedException{return}InterruptedException{return}publicfloatgetProgress()throwsIOException{returnprocessed?1.0f:0.0f;}publicvoidclose()throwsIOException//do}}mapreducestaticclassSequenceFileMapperprivateTextfilenameKey;protectedvoidsetup(Contextcontext)throwsIOException,InterruptedException{InputSplitsplit=context.getInputSplit();Pathpath=((FileSplit)split).getPath();filenameKey=new}Contextcontext)throwsIOException,InterruptedException{context.write(filenameKey,value);}}publicintrun(String[]args)throwsException{Configurationconf=newSystem.setProperty("HADOOP_USER_NAME","hdfs");String[]otherArgs=newGenericOptionsParser(conf,args)if(otherArgs.length!=2){}Jobjob=Job.getInstance(conf,"combinesmallfilestopletion(true)?0:}publicstaticvoidmain(String[]args)throwsException}}1、从原始日志文件中数3、如果成功增强,则输出到增强结果 程序的关键点是要在一个mapreduce程序中根据数据的不同输出两类结果到不同 类灵活的输出需求可以通过自定义outputformat来实现1、在mapreduce中外部资publicpublicclassDBLoaderpublicstaticvoiddbLoader(HashMap<String,String>ruleMap){Connectionconn=null;Statementst=null;ResultSetres=try st=conn.createStatement();res=st.executeQuery("selecturl,contentfromurlcontent");while(res.next()){}}catch(Exceptione)}finally}}}}}}publicstaticvoidmain(String[]args){DBLoaderdb=newHashMap<String,String>map=newHashMap<String,String>();}}publicRecordWriter<Text,NullWritable>getRecordWriter(TaskAttemptContextcontext)throwsIOException,InterruptedException{FileSystemfs==PathtoCrawlPath= FSDataOutputStreamenhanceOut=fs.create(enhancePath);FSDataOutputStreamtoCrawlOut=fs.create(toCrawlPath);returnnew}FSDataOutputStreamenhanceOut=null;FSDataOutputStreamtoCrawlOut=null; toCrawlOut){this.enhanceOut=enhanceOut;this.toCrawlOut=toCrawlOut;} InterruptedException{//有了数据,你来负责写到目的地//判断,进来内容如果是带tocrawl的,就往待爬 输出流中写toCrawlOut}} InterruptedException{}}}}}mapreduce*这个程序是对每个小时不断产生的用户上网记录日志进行增强(url所指向的***publicclassLogEnhancerHashMap<String,String>knowledgeMap=newHashMap<String,*maptasksetupmaptask执行的机器内存中protectedvoidsetup(org.apache.hadoop.mapreduce.Mapper.Contextcontext)throwsIOException,InterruptedException{}protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{Stringline=String[]fields=StringUtils.split(line,tryStringurl=//urlStringcontent=//Stringresult=if(null==content)//输往待 result=url+"\t"+}else//result=line+"\t"+content+}}catch(Exceptione)}}}publicstaticvoidmain(String[]args)throwsExceptionConfigurationconf=newJobjob=//要将自定义的输出格式组件设置到job //虽 自定义了outputformat,但是因 的outputformat继承//录 }}自定 1、利用“订单id和成交金额”作为key,可以将map阶段到的所有订单数据按照id分reduce2、在reduce端利用 parator将订单id相同的kv聚组,然后取第一个即是自定 *shuffle过程中reducekv**@author*public}publicintparableparableb)OrderBeanabean=(OrderBean)OrderBeanbbean=(OrderBean)return}}@authorpublicclassOrderBeanimplements privateTextitemid;publicOrderBean()}publicOrderBean(Textitemid,DoubleWritableamount){set(itemid,amount);}this.itemid=itemid;this.amount=amount;}publicTextgetItemid(){returnitemid;}publicDoubleWritablegetAmount(){returnamount;}intcmp if(cmp==0)cmp }return}publicvoidwrite(DataOutputout)throwsIOException{}publicvoidreadFields(DataInputin)throwsIOException{StringreadUTF=in.readUTF();doublereadDouble=this.itemid=newthis.amount=new}publicStringtoString()returnitemid.toString()+"\t"+}}}mapreduce@authorpublicclassSecondarySortstaticclassSecondarySortMapperextendsMapper<LongWritable,Text,OrderBeanbean=newprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{Stringline=String[]fields=StringUtils.split(line,}}staticclassSecondarySortReducerextendsReducer<OrderBean,NullWritable,OrderBean,在设置 parator以后这里收到的kv数据就是:<1001<1001 //item的所有订单中最大金额的那一个,就只要输出这个keyprotectedvoidreduce(OrderBeankey,Iterable<NullWritable>values,Contextcontext)throwsIOException,InterruptedException{}}publicstaticvoidmain(String[]args)throwsExceptionConfigurationconf=newConfiguration();Jobjob=Job.getInstance(conf); parator}}Mapjoin见,比如“订单日志”join“产品信息”--map节点,这样,map节点就可以在本地对自己所读到的大表数join并输出最终结果--并用distributedcache机制将小表的数据分发到每一个maptaskFileReaderin=null;BufferedReaderreader=HashMap<String,String>b_tab=newHashMap<String,String>();Stringlocalpath=null;Stringuirpath=Path[]files=context.getLocalCacheFiles();localpath=files[0].toString();URI[]cacheFiles=//这里读的数据是maptask所在机器本地工 in=newreader=newBufferedReader(in);Stringline=null;String[]fields=line.split(",");}}protectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{//maptask所负责的那一个切片数据(hdfs上String[]fields=Stringa_itemid=fields[0];Stringa_amount=fields[1];Stringb_name=//输出结果 98.9context.write(newText(a_itemid),newText(a_amount+"\t"+":"+localpath+"\t"+b_name));}}publicstaticvoidmain(String[]args)throwsExceptionConfigurationconf=newConfiguration();Jobjob=Job.getInstance(conf);//这里 //}}mapreduce框架中提供的全局计数器来实现publicpublicclassMultiOutputsenumprotectedvoidmap(LongWritablekey,Textvalue,Contextcontext)throwsIOException,InterruptedException{String[]words=for(Stringword:words)}}}jobmapreducejobmapreduceJobControlControlledJobControlledJobcJob1=newControlledJobcJob2=newControlledJob(job2.getConfiguration());ControlledJobcJob3=new//JobControljobControl=new//JobControlThreadjobControlThread=newThread(jobControl);returnreturn}MapReduce::MBMapTask实际使用的资源量超过该值,则会被强制杀死。::MBmapreduce.map.java.optsMapTaskJVMjavaheapsize等参数,e.g.“-Xmx1024m-verbose:gc-Xloggc:/tmp/@taskid@.gc(@taskid@Hadoop框架自动换taskid),默认值:“”mapreduce.reduce.java.opts:ReduceTaskJVM参数,你可以在此配置默认的javaheapsize等参数,e.g.“-Xmx1024mverbose:gc-Xloggc:/tmp/@taskid@.gc”,默认值:mapreduce.map.cpu.vcores:每个Maptaskcpucore数目,默认值mapreduce.reduce.cpu.vcores:Reducetask可使用的最多cpucore数目,默认值 mapreduce.map.maxattempts:每个

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论