html5android课程老师提供的hadoopspark on yarn本文在一个比较高层面上描述了如何YARN开发新应_第1页
html5android课程老师提供的hadoopspark on yarn本文在一个比较高层面上描述了如何YARN开发新应_第2页
html5android课程老师提供的hadoopspark on yarn本文在一个比较高层面上描述了如何YARN开发新应_第3页
html5android课程老师提供的hadoopspark on yarn本文在一个比较高层面上描述了如何YARN开发新应_第4页
html5android课程老师提供的hadoopspark on yarn本文在一个比较高层面上描述了如何YARN开发新应_第5页
免费预览已结束,剩余37页可下载查看

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

一般的概念就是“ApplicationSubmission”提交一个”Application”到YARN的ResourceManager。客户端()与ResourceManager之间通过”RMProtocol”协议进行通信。如果有需要,客户端通过RMProtocol#getNewApplication调用来获得一个新的“ApplicationId”,接着通过调用RMProtocol#submitApplication来提交任务(Application)。作为RMProtocol#submitApplication调用的一部分,客户端需要提供足够的信息给ResourceManager来启动应用程序的第一个container,即ApplicationMaster。客户端需要提供的信息包括任务运行所需要的本地文件,jar包,真正要为启动ApplicationMaster提供Unix进程信息。YARNResourceManager接着会在RM分配的第一个container上启动指定的ApplicationMaster。ApplicationMasterResourceManager之间会通过‘AMRMProtocol’配的任务,ApplicationMaster接着会通过AMRMProtocol#allocate协议请求请求和接受containers。如果分配到了container,ApplicationMaster就会通过量等等。一旦任务完成,ApplicationMaster就会通过AMRMProtocol#finishApplicationMaster协议告知ResourceManager任务完成了。ApplicationMaster来查询信息。如果有必要,客户端通过RMProtocol#KillApplication也能杀死应用。 RMProtocol–ppcsr通过这个协议可以向ResourceManager和注销自己,还能从Scheduler处请取container的任务状态更新信息。ResourceManager的ApplicationsManager(AsM)接口。RMProtocolYarnConfigurationyarnConf=newYarnConfiguration(conf);InetSocketAddressrmAddress=LOG.info("ConnectingtoResourceManagerat"+rmAddress);configurationappsManagerServerConf=newConfiguration(conf);applicationsManager=((RMProtocol)rpc.getRMProtocol.class,rmAddress,GetNewApplicationRequestrequest=GetNewApplicationResponseresponse=LOG.info("GotnewApplicationId="+才能正确的设置container的参数,以启动ApplicationMaster。可以参考GetNewApplicationResponse以获取的信息。ResourceManager所需要的启动ApplicationMaster的所有信息:ApplicationInfo:id,:User::container上运行。ContainerLaunchContext正如前面所描述的,定义了等),,securitytokens,环境变量(CLASSPATHetc.)和被执行令。//CreateanewApplicationSubmissionContextApplicationSubmissionContextappContext=//settheApplicationId//settheapplication//CreateanewcontainerlaunchcontextfortheAM'scontainerContainerLaunchContextamContainer=//DefinethelocalresourcesrequiredMap<String,LocalResource>localResources=newHashMap<String,//LetsassumethejarweneedforourApplicationMasterisavailable//HDFSatacertainknownpathtousandwewanttomakeitavailable//theApplicationMasterinthelaunchedcontainerPathjarPath;//<-knownpathtojarfileFileStatusjarStatus=fs.getFileStatus(jarPath);LocalResourceamJarRsrc=//Setthetypeofresource-fileor//archivesareuntarredatthedestinationbytheframework//Setvisibilityofthe//Settingtomostprivateoptioni.e.thisfilewill//bevisibletothisinstanceoftherunningapplication//Setthelocationofresourcetobecopiedoverinto//workingdirectory//Settimestampandlengthoffilesothatthe//candobasicsanitychecksforthelocal//afterithasbeencopiedovertoensureitisthe//resourcetheintendedtousewiththeapplication//TheframeworkwillcreateasymlinkcalledAppMaster.jarin//workingdirectorythatwillbelinkedbacktotheactual//TheApplicationMaster,ifneedstoreferencethejarfile,//needtousethesymlinkfilename. //Setthelocalresourcesintothelaunchcontext//SetuptheenvironmentneededforthelaunchcontextMap<String,String>env=newHashMap<String,//Forexample,wecouldsetuptheclasspath//Assumingourclassesorjarsareavailableaslocalresourcesin//workingdirectoryfromwhichthecommandwillberun,weneedto//"."tothe//Bydefault,allthehadoopspecificclasspathswillalreadybe//in$CLASSPATH,soweshouldbecarefulnottooverwriteit.StringclassPathEnv="$CLASSPATH:./*:";env.put("CLASSPATH",//ConstructthecommandtobeexecutedonthelaunchedcontainerStringcommand="${JAVA_HOME}"+/bin/java""MyAppMaster""arg1arg2arg3" 2>"+ List<String>commands=newArrayList<String>();//addadditionalcommandsif//Setthecommandarrayintothecontainer//Definetheresourcerequirementsforthe//Fornow,YARNonlysupportsmemorysowesetthe////Iftheprocesstakesmorethanitsallocatedmemory,it//bekilledbythe//Memorybeingrequestedforshouldbelessthanmax//oftheclusterandallasksshouldbeamultipleofthemincapability.Resourcecapability=Records.newRecord(Resource.class);//CreatetherequesttosendtotheApplicationsManagerSubmitApplicationRequestappRequest=//Submittheapplicationtothe//Ignoretheresponseaseitheravalidresponseobjectisreturned//successoranexceptionthrowntodenotethefailure客户端可以通过RMProtocol#getApplicationReport与ResourceManager通信GetApplicationReportRequestreportRequest=GetApplicationReportResponsereportResponse=ApplicationReportreport=reportResponse.getApplicationReport();(1.1一般性的任务信息ApplicationIdApplicationId,applicationqueue,提交application的user,application开始的时间(1.3).Application的信息:如果任务支持某种类型的进程,它可以设置的url,客户端可以通过ApplicationReport#getTrackingUrl来获取url,并通过这个url来progress的状态.(1.4).ApplicationStatus:ResourceManager能够看到的一些任务的状态,可以通过Application#getYarnApplicationStateYarnApplicationStateFINISHED,ApplicationReport#getFinalApplicationStatuscheck任务的成功/失败。在失败时,ApplicationReport#getDiagnostics可以提供一些关于失败的信息。如果ApplicationMaster支持,客户端可以直接通过ApplicationReport中包含的RMProtocol协议支持KillApplication调用,允许客户端通过ResourceManagerApplicationMasterkill消息。ApplicationMaster也可以通过设计为客户端提供abort调用,那么客户端就能通过rpc调用来终止任务KillApplicationRequestkillRequest=job运行需要的所有必要的信息和资源。ApplicationMaster负责任务当ApplicationMaster启动时,可以通过环境变量来获得一些参数,例如:ApplicationMaster的NodeManger主机的详细信息,可以查阅ApplicationConstants来获得参数名称。所有与ResouceManagerApplicationAttemptId(如果任务失败可能会有多次重试)。ApplicationAttemptIdApplicationMastercontainerId来获得。有些辅助的API可以将从环境变量获得的值转换为对象。Map<String,String>envs=System.getenv();StringcontainerIdString=if(containerIdString==null){//containeridshouldalwaysbesetintheenvbytheframeworkthrownewIllegalArgumentException("ContainerIdnotsetinthe} ApplicationMaster初始化完成后,可以通过ARMRMProtocol#regispplicationMaster来向ResourceManager。ApplicationMaster经常通过ResouceManagerScheduler接口与之通讯。//ConnecttotheScheduleroftheResourceManager.YarnConfigurationyarnConf=newYarnConfiguration(conf);InetSocketAddressrmAddress=LOG.info("ConnectingtoResourceManagerat"+rmAddress);AMRMProtocolresourceManager= //RegistertheAMwiththe//Settherequiredinfointotheregistration////hostonwhichtheappmasteris//rpcportonwhichtheappmasteracceptsrequestsfromthe//trackingurlforthetotrackappmasterprogressRegispplicationMasterRequestappMasterRequest=//Theregistrationresponseisusefulasitprovidesinformationabout////SimilartotheGetNewApplicationResponseinthe,it//wouldbeneededbytheApplicationMasterwhenrequestingfor pplicationMasterResponseresponseApplicationMaster需要发出心跳给ResouceManagerApplicationMaster还活着且正在运行。在ResouceManager端设置的超时时间可以通过YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS来,缺省值为ResouceManagerAMRMProtocol#allocate调用可以作为心跳,它还支持发送调用,对ResourceManager来说,是一种有效的发送心跳方式。中“*”代表container可以分配在任何主机上。ResoucecapabilityYARN版本只支持基于内存的资源分配,因此资源请求只container指定比较高的优先级,而给reduce任务的container指定比较低的优先级。//Resource //setuprequirementsfor//whetheraparticularrack/hostis//usefulforapplicationsthatare//todatalocality//setthepriorityforthePrioritypri=Records.newRecord(Priority.class);//Setupresourcetype//Fornow,onlymemoryissupportedsowesetmemoryResourcecapability=Records.newRecord(Resource.class);//setno.ofcontainers//matchingthespecificationscontainer的资源请求对象requirement以后,ApplicationMaster需要构建AllocateRequest发送到ResourceManager。AllocateRequest包括:申请的container的数量Releasedcontainers:在某些情况下,ApplicationMastercontainercontainer给ResourceManager,这些container可以分配给其他的应用使用。List<ContainerId>releasedContainersAllocateRequestreq=//Theresponseidsetintherequestwillbesentback//theresponsesothattheApplicationMaster//matchittoitsoriginalaskandactappropriay.//SetApplicationAttemptId//Addthelistofcontainersbeingaskedfor//IftheApplicationMasterhasnoneedfor//containersduetoover-allocationorforany//reason,itcanreleasethembacktotheResourceManager//AssumingtheApplicationMastercantrackitsprogressAllocateResponseallocateResponse=ResourceManager返回的AllocateResponse通过AMResponse对象包含了下面这Rebootflag(重启标志)ApplicationMaster失去了和ResourceManager同步的场containerResourceManager的更新信息。ApplicationMaster能够查看完成的container的状态信息,并采取适当的策略,比如重试某个失败的任务。有一点需要注意的是,container不一定会立即分配给ApplicationMaster。这不意味着送了,在考虑到集群容量、优先级和调度策略的条件下,ApplicationMaster最终会获得//GetAMResponsefrom //Retrievelistofallocatedcontainersfromthe//andoneachallocatedcontainer,letsassumeweare//thesamefor(ContainerallocatedContainer:allocatedContainers){LOG.info("Launching commandonanew+",containerId="++ containerNode="+":"++ +",containerState"++",+

//Launchandstartthecontaineronaseparatethreadtokeep//threadunblockedasallcontainersmaynotbeallocatedatonego.LaunchContainerRunnablerunnableLaunchContainer=new }//CheckwhatthecurrentavailableresourcesintheclusterareResourceavailableResources=amResp.getAvailableResources();//Basedonthisinformation,anApplicationMastercanmake////Checkthecompleted//Let'sassumewearekeeacountoftotalcompleted List<ContainerStatus>completedContainersfor {LOG.info("GotcontainerstatusforcontainerID=++",state="++",exitStatus="++",diagnostics="+intexitStatus=containerStatus.getExitStatus();if(0!=exitStatus){//container//-100isaspecialcasewherethe//wasaborted/pre-emptedforsomereasonif(-100!=exitStatus){//applicationjoboncontainerreturnedanon-zeroexit//countsascompleted

else//somethingelsebad//appjobdidnotcompleteforsome//weshouldre-tryasthecontainerwaslostforsome//decrementingtherequestedcountsothatweaskfor //wedonotneedtoreleasethecontainerasthathas//beendonebythe}}else//nothingto//containercompletedsuccessfully}}}container分配给ApplicationMaster以后,ApplicationMaster需要做和类似的过程来为最终运行的taskContainerLaunchContexttask能够在已分配的container上运行。一旦ContainerLaunchContext定义好了,//AssuminganallocatedContainerobtainedfromAMResponseContainercontainer;//ConnecttoContainerManagerontheallocatedcontainerStringcmIpPortStr=container.getNodeId().getHost()++ ContainerManagercmcmAddress,conf);//NowwesetupaContainerLaunchContextContainerLaunchContextctx=try}catch(IOException{"Gettingcurrentuserfailedwhentryingtolaunch}

+//SettheenvironmentMap<String,String>unixEnv;//Setuptherequired//Pleasenotethatthelaunchedcontainerdoesnot//theenvironmentoftheApplicationMastersoall//necessaryenvironmentsettingswillneedtobere-//forthisallocatedcontainer.//SetthelocalMap<String,LocalResource>localResources=//Again,thelocalresourcesfromtheApplicationMasterisnotcopied//bydefaulttotheallocatedcontainer.Thus,itisthe//oftheApplicationMastertosetupallthenecessary

//neededbythejobthatwillbeexecutedonthe//Assumethatweareexecutingasscriptontheallocated//andthesscript'slocationinthefilesystemisknowntous.PathsScriptPath;LocalResourcesRsrc=Records.newRecord(LocalResource.class);sConverterUtils.getYarnUrlFromURI(newURI(sScriptPath)));localResources.put("MyExecS.sh",sRsrc);//SetthenecessarycommandtoexecuteontheallocatedcontainerStringcommand="/bin/sh./MyExecS.sh"+ 2>" +List<String>commands=newArrayList<String>();//Sendthestartrequesttothe 能够得到任务的完成进度信息,它也能够通过查询ContainerManager的状态来主动监测已经启动的containers。GetContainerStatusRequeststatusReq= LOG.info("Container+",id="++",status="+LocalResourceYARN分YARNclasspath中。例如,FilepackageFile=newFile(packagePath);UrlpackageUrl=ConverterUtils.getYarnUrlFromPath(FileContext.getFileContext.makeQualified(newPath(packagePath "java-cp'./package/*'some.class.to.Run+"1>"+ApplicationConstants.LOG_DIR_EXPANSION_VAR+"/stdout+"2>" +"/stderr"))Collections.singletonMap("package", ,因此通过使用./package*.,你就可以这些资源了。一个新的container时,你只需要遵循这个相同的过程(假设你是希望资源被分发到你的HDFS上或者本地路径),这样资源的URLcontainerctx一起发送用了太多的物理内存。如果你运行的是一个Java应用程序,你可以使用-hprof来查看是无法正常加载而导致失败。较明智的做法是使用LD_LIBRARY_PATH。在yarn的文档上有一篇很经典的HadoopMapReduceNextGeneration–Writing本文主要讲述yarn程序的执行流程和如何进行开发的一点想法。RMProtocolRM通信,将应用程序运行所需的一些信息,比如local也就是ApplicationMaster(AppMaster)。通过AMRMProtocol协议与ResourceManager通讯,自身,然后继续申请资信息,比如命令行,环境变量等。任务完成后,AppMaster会通过过查询RM来获取job的状态信息,或者如果AppMaster支持也可以直接从AppMaster查询信息。如果需要,可以通过RMProtocol::KillApplication来kill掉application。并containers的执行情况,在container执行失败后做failover的处理。动AppMater,查询或者killAppMaster。MMpiMrscMg)pMNodeManager)ppMaste可以启动或者停止一个container,也可以获取container的执行状态。编写yarn应用程序的详细步骤可以直接参考源码自带的distributed 可以看到,一个YARN应用程序的编写,大量的工作是在编写客户端和AppMaster。而才是真正值得关注的地方。对于大量的应用程序来说,AppMaster的工作机制可能相同,辑即container就可以了,可以大大减少开发成本。MRAppMaster来实现自己的框架。如类似storm的流式计算框架,或者调度RPCService的框架,或者支持MPI的框架。目前上已经有类似的项目出现了,相信不HadoopMapReduce对于业界的大数据及分布式处理系统来说,Hadoop是耳熟能详的卓越开源分布式文件及处理框架,对于Hadoop框架的介绍在此不再累述,读者可参考Hadoop简介。使用和学习过老Hadoop框架(0.20.0及之前版本)的同仁应该很熟悉如下的原MapReduce框架图:1.HadoopMapReduce首先用户程序(Job)提交了一个job,job的信息会发送到JobTracker中,JobTracker跑在哪些机器上,需要管理所有job失败、重启等操作。TaskTrackerMap-reduce集群中每台机器都有的一个部分,他做的事情主要是监视自己所在TaskTracker同时监视当前机器的tasks运行状况。TaskTracker需要把这些信息通过heartbeatJobTracker,JobTrackerjob分配运行在哪些机器上。上图虚线箭头就是表示消息的发送-接收的过程。map-reduce架构是简单明了的,在最初推出的几年,也得到了众多的成功案例,获得JobTrackerMap-reduceJobTrackermap-reducejob非常多的时候,会造成很大的内存开销,潜在来说,也增加了JobTrackerfail的风险,这也是业界普遍总结出老Hadoop的Map-Reduce只能支持4000节点主机的上限。/存的占用情况,如果两个大内存消耗的task被调度到了一块,很容易出现OOM。TaskTrackermaptaskslotreducetaskslot,如果当系统中只有maptaskreducetask的时候,会造成资源的浪费,也就是前面提过的集群资源利用class做了太多的事情,代码量达3000多行,,造成class的任务不清晰,增加bug修复和版本的难度。HadoopMapReduce(例如bug时,都会强制进行系统级别的升级更新。更糟的是,它不管用户的应用程序是不是适用新的Hadoop版本而浪费大量时间。HadoopYarn从业界使用分布式系统的变化趋势和hadoop框架的长远发展来看,MapReduce的能上的缺陷。在过去的几年中,hadoopbug的修复,但是最近这些修复的成本越来越为从根本上解决旧MapReduce框架的性能瓶颈,促进Hadoop框架的更长远发展,从0.23.0版本开始,HadoopMapReduceHadoopMapReduce框架命名为MapReduceV2或者叫Yarn,其架构图如下图所示:2.HadoopMapReduce框架(Yarn)/。新的资源管理器全局管理所有应用程序计算资源的分配,每一个应用的ApplicationMaster负责相应的调度和协调。一个应用程序无非是一个单独的传统的MapReduce任务或者是一个DAG(有向无环图)任务。ResourceManager和每一台机器的节点管理服务器能够管理用户在那台机器上的进程并能ApplicationMasterResourceManager获得的资源和NodeManager协同工作来运行和任务。ResourceManager支持分层级的应用队列,这些队列享有集群一定比例的资源。从某种意义上讲它就是一个纯粹的调度器,它在执行过程中不对应用进行和状态。同样,它也不能重启因应用失ResourceManager是基于应用程序对资源的需求进行调度的;每一个应用程序需要不同类型的资源因此就需要不同的容器。资源包括:内存,CPUMapreduce固定类型的资源使用模型有显著区别,它给集群的使用带来的影响。资源管理器提供一个调度策略的插件,它(CPU)每一个应用的ApplicationMaster的职责有:向调度器索要适当的资源容器,运行任务,应用程序的状态和它们的进程,处理任务原因。HadoopMapReduce代码做大的改变(详见2.3Demo代码开发及详解),但是原框架中的JobTracker和TaskTracker不见了,取而代之的是ResourceManager,ApplicationMaster与NodeManager三个部分。ResourceManager是一个中心的服务,它做的事情是调度、启动每一个Job所属的ApplicationMaster、另外ApplicationMaster的存在情况。细心的读者会发现:Job里面所在的task的、重启等等内容不见了。这就是AppMst存在的原因。ResourceManager负责作业与资源的调度。接收JobSubmitter提交的作业,按照作业的上下文(Context)信息,以及从NodeManager收集来的状态信息,启动调度过程,分配一个Container作为AppMstrNodeManager功能比较专一,就是负责Container状态的,并向RM保持心跳。ApplicationMasterJobJobTracker。但注意每一个Job(不是每一种)都有一个ApplicationMaster,它可以运行在ResourceManager以外的机器上。Yarn框架相对于老的MapReduce框架什么优势呢?我们可以看到:JobTracker(ResourceManager)的资源消耗,并且让监测每一个Job子任务(tasks)状态的程序分布式化了,更安全、更优美。Yarn中,ApplicationMaster是一个可变更的部分,用户可以对不同的编程模型写自己的AppMst,让类型的编程模型能够跑在Hadoop集群中,可以参考hadoopYarn配置模板中的mapred-site.xml配置。对于资源的表示以内存为单位(Yarncpu),比之前以剩余slot数目更合理。老的框架中,JobTracker一个很大的负担就是job下的tasks的运行状况,现在,这个部分就扔给ApplicationMaster做了,而ResourceManager中有一个模块叫做ApplicationsMasters(ApplicationMaster)ApplicationMaster的运行状况,Container是Yarn为了将来作资源而一个框架。这一点应该借鉴了Mesos的工作,目前是一个框架,仅仅提供java虚拟机内存的,hadoop团队的设计思路应该后续能支持更多的资源调度和控制,既然资源表示成内存量,那就没有了之前的mapslot/reduceslot分开造新的Yarn框架相对旧MapRduce框架而言,其配置文件,启停及全局变量等也发生了一些变化,表1.新旧Hadoop/变量/位置变化Yarn检测是否新的YarYarn分离于${had止YarnJAVA_HOME全 Yarn框 启动置Yarn由于新的Yarn框架与原HadoopMapReduce框架相比变化较大,的配置文件中很多项在新框架中2.HadoopHadoopHadoop0.23.XDFSnamenode存放nametable新框架中namenanametableedit文件),默DFSnode的(datanode新框架与老框架置为与分布式clDataNodeJobmapred.job.trackernodeManagerjobtracker剥离,归第无无RMthe无RM无addressaddressof面Yarn.resourcemanager.resource-tracker.addressRM,NodeManagerRMHadoopYarnDemoDemo场景介绍:WeblogichadoopYarnDemoYarnMap-ReduceWeblogic应用服务器组成,每天需要每台对应用服务器WebLogic3.Weblogic如上图所示,<Info>weblogic的日志级别,<Security>,<Management>Weblogic的日志模块,loglevellogmoduleWebLogic日志中出现的次数,每天需要统计出loglevel和logmodule分别出现的次数总数。DemoYarnWeblogichadoopWebLogic各个应用服务器主机上建立分布式,每天将WebLogichadoop分布式文件系统,并且YarnMapReduceLogLevelLogmodule在日志 DemoDemoYarn框架下分布式程序处理该案例的功能,以两台虚拟机作为该Demo的运行平台,两机均为Linux操作系统,机器hostname为OEL和Stephen,OEL作为NameNodeResouceManager节点主机,64位,StephenDataNodeNodeManager节点主机,32位(Hadoop支持异构性),具体如下:3.DemoNameNodeResourceManagerlinuxDataNodeNodeManagerlinux我们把hadoop安装在两台测试机的/hadoop文件系 下,安装后的hadoop /hadoop/hadoop-0.23.0,规划分布式文件系统存放于/hadoop/dfs的本地 为/user/oracle/dfsYarncore-site.xmlURL在hdfs-site.xml中配置nameNode,dataNode的本 2.hdfs-site.xml配置<description><description>mapred-site.xmlYarnmap-reduce最后在Yarn-site.xml中配置ResourceManager,NodeManager的通信端口,web端口等,详细4.Yarn-site.xml配置<?xml<?xml<!--SitespecificYARNconfigurationproperties--<description>Theaddressoftheapplicationsmanagerinterfaceinthe<description>Theaddressofthescheduler<description>TheaddressoftheRMweb<description>Theaddressoftheresourcetracker具体配置项的含义,在hadoop有详细的说明,读者可以参见hadoop0.23.0配置模板Demo以下我们详细介绍一下新的YarnDemoDemo程序的每个类都有详细的注释和说明,Yarn开发为了兼容老版本,API变化不大,可以参考HadoopYarn框架API。在Map程序中,我们以行号为key,行文本为value每一行WebLogic日志输入,将loglevel和logmoduleMapkeyloglevellogmodule的出现次数应该唯一,所以经Map程序处理后的新的record记录的value应该都为1:5.Map业务逻辑publicpublicstaticclassMapClassextendsMapper<Object,Text,Text,{privateTextrecord=newprivatestaticfinalIntWritablerecbytes=newIntWritable(1);publicvoidmap(Objectkey,Textvalue,Contextcontext)throws{Stringline=key-valueloglevellogmoduleif(line==null||line.equals(""))String[]words=line.split(">if(words==null||words.length<2)StringlogLevel=words[1];StringmoduleName=words[2];context.write(record,recbytes);context.write(record,recbytes);}}loglevellogmoduleReduce来分别处理这两部分,loglevelreduce1,logmodulereduce2PatitionerMapKey中包含的logLevel和moduleName的前缀,来分配到不同的Reduce:6.Partition6.Partitionif(numPartitions2)//ReduceloglevellogmoduleReduceifkey.toString().startsWith("logLevel::"))returnreturn1;returnReduceloglevellogmodulepublicstaticclasspublicstaticclassReduceClassextendsReducer<Text,IntWritable,Text,{privateIntWritableresult=newpublicvoidreduce(Textkey,I ble<IntWritable>values,Contextcontext)throwsIOException,inttmp=for(IntWritableval:{tmp=tmp+}}}以上完成了MapReduce的主要处理逻辑,对于程序,我们使用Hadoop提供的Tools工具包方便的进行May-Reduce程序的启动和Map/Reduce对应处理class的配置。importimportjava.io.IOException;importimportjava.io.IOException;importjava.util.Date;import importorg.apache.hadoop.conf.Configured;importorg.apache.hadoop.fs.Path;importorg.apache.hadoop.io.Text;importorg.apache.hadoop.mapreduce.Job;importorg.apache.hadoop.mapreduce.Reducer;importorg.apache.hadoop.mapreduce.Mapper;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.mapreduce.lib.input.FileInputFormat;importorg.apache.hadoop.util.Tool;importpublicclassLogysiserextendsConfiguredimplements{publicstaticvoidmain(String[]{{intres=ToolRunner.run(newConfiguration(),newLog ysiser(),args);}catch(Exception{}}publicintrun(String[]args)throws{if(args==null||args.length{System.out.println("needinputpathandoutputpath");return1;}Stringinputpath=args[0];Stringoutputpath=args[1];Stringshortin=args[0];Stringshortout=args[1];if(shortin.indexOf(File.separator)>=shortin=shortin.substring(shortin.lastIndexOf(File.separator));if(shortout.indexOf(File.separator)>=0)shortout=shortout.substring(shortout.lastIndexOf(File.separator));SimpleDateFormatformater=newSimpleDateFormat("yyyy.MM.dd.HH.mm");shortout=newStringBuffer(shortout).append("-")shortin="/"+shortin;shortout="/"+shortout;shortin="/user/oracle/dfs/"+shortin;shortout="/user/oracle/dfs/"+shortout;Fileinputdir=newFile(inputpath);FileFileoutputdir=newif(!inputdir.exists()||{System.out.println("inputpathnotexistorisn'tdir!");return0;}if{new}Configurationconf=JobConfjob=newJobConf(c

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论