hadoop提交作业分析_第1页
hadoop提交作业分析_第2页
hadoop提交作业分析_第3页
hadoop提交作业分析_第4页
hadoop提交作业分析_第5页
免费预览已结束,剩余13页可下载查看

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

hadoop提交作业分析hadoop提交作业分析hadoop提交作业分析资料仅供参考文件编号:2022年4月hadoop提交作业分析版本号:A修改号:1页次:1.0审核:批准:发布日期:Hadoop提交作业流程分析bin/hadoopjarmainclassargs……这样的命令,各位玩Hadoop的估计已经调用过NN次了,每次写好一个Project或对Project做修改后,都必须打个Jar包,然后再用上面的命令提交到HadoopCluster上去运行,在开发阶段那是极其繁琐的。程序员是“最懒”的,既然麻烦肯定是要想些法子减少无谓的键盘敲击,顺带延长键盘寿命。比如有的人就写了些Shell脚本来自动编译、打包,然后提交到Hadoop。但还是稍显麻烦,目前比较方便的方法就是用Hadoopeclipseplugin,可以浏览管理HDFS,自动创建MR程序的模板文件,最爽的就是直接Runonhadoop了,但版本有点跟不上Hadoop的主版本了,目前的MR模板还是的。还有一款叫HadoopStudio的软件,看上去貌似是蛮强大,但是没试过,这里不做评论。那么它们是怎么做到不用上面那个命令来提交作业的呢不知道没关系,开源的嘛,不懂得就直接看源码分析,这就是开源软件的最大利处。我们首先从bin/hadoop这个Shell脚本开始分析,看这个脚本内部到底做了什么,如何来提交Hadoop作业的。因为是Java程序,这个脚本最终都是要调用Java来运行的,所以这个脚本最重要的就是添加一些前置参数,如CLASSPATH等。所以,我们直接跳到这个脚本的最后一行,看它到底添加了那些参数,然后再逐个分析(本文忽略了脚本中配置环境参数载入、Java查找、cygwin处理等的分析)。#runitexec"$JAVA"$JAVA_HEAP_MAX$HADOOP_OPTS-classpath"$CLASSPATH"$CLASS"$@"从上面这行命令我们可以看到这个脚本最终添加了如下几个重要参数:JAVA_HEAP_MAX、HADOOP_OPTS、CLASSPATH、CLASS。下面我们来一个个的分析(本文基于ClouderaHadoop分析)。首先是JAVA_HEAP_MAX,这个就比较简单了,主要涉及代码如下:JAVA_HEAP_MAX=-Xmx1000m#checkenvvarswhichmightoverridedefaultargsif["$HADOOP_HEAPSIZE"!=""];then#echo"runwithheapsize$HADOOP_HEAPSIZE"JAVA_HEAP_MAX="-Xmx""$HADOOP_HEAPSIZE""m"#echo$JAVA_HEAP_MAXfi首先赋予默认值-Xmx1000m,然后检查中是否设置并导出了HADOOP_HEAPSIZE,如果有的话,就使用该值覆盖,得到最后的JAVA_HEAP_MAX。接着是分析CLASSPATH,这是这个脚本的重点之一。这部分主要就是添加了相应依赖库和配置文件到CLASSPATH。#首先用Hadoop的配置文件目录初始化CLASSPATH

CLASSPATH="${HADOOP_CONF_DIR}"

……

#下面是针对于Hadoop发行版,添加Hadoop核心Jar包和webapps到CLASSPATH

if[-d"$HADOOP_HOME/webapps"];then

CLASSPATH=${CLASSPATH}:$HADOOP_HOME

fi

forfin$HADOOP_HOME/hadoop-*;do

CLASSPATH=${CLASSPATH}:$f;

done

#添加libs里的Jar包

forfin$HADOOP_HOME/lib/*.jar;do

CLASSPATH=${CLASSPATH}:$f;

Done

forfin$HADOOP_HOME/lib/*.jar;do

CLASSPATH=${CLASSPATH}:$f;

done

#下面的TOOL_PATH只在命令为“archive”时才添加到CLASSPATH

forfin$HADOOP_HOME/hadoop-*;do

TOOL_PATH=${TOOL_PATH}:$f;

done

forfin$HADOOP_HOME/build/hadoop-*;doTOOL_PATH=${TOOL_PATH}:$f;done#最后添加用户的自定义HadoopClasspathif["$HADOOP_CLASSPATH"!=""];thenCLASSPATH=${CLASSPATH}:${HADOOP_CLASSPATH}fi上面只分析一部分,由于代码比较长,针对开发者部分的CLASSPATH添加没有列出来。下面是这个脚本的重点、实体之处:CLASS分析。Shell脚本会根据你输入的命令参数来设置CLASS和HADOOP_OPTS,其中CLASS所指向的类才是最终真正执行你的命令的实体。#figureoutwhichclasstorun

if["$COMMAND"="namenode"];then

CLASS=''

HADOOP_OPTS="$HADOOP_OPTS$HADOOP_NAMENODE_OPTS"

……

elif["$COMMAND"="fs"];then

CLASS=

HADOOP_OPTS="$HADOOP_OPTS$HADOOP_CLIENT_OPTS"

……

elif["$COMMAND"="jar"];then

CLASS=

……

elif["$COMMAND"="archive"];then

CLASS=CLASSPATH=${CLASSPATH}:${TOOL_PATH}HADOOP_OPTS="$HADOOP_OPTS$HADOOP_CLIENT_OPTS"……elseCLASS=$COMMANDfi这里我们要关心的就是"$COMMAND"="jar"时对应的类,这个类等下我们继续分析,这是我们通向最终目标的下一个路口。脚本在最后还设置了、等HADOOP_OPTS。接着,就利用exec命令带上刚才的参数提交任务了。通过对上面的分析,我们知道了,如果想取代这个脚本,那就必须至少把Hadoop依赖的库和配置文件目录给加到CLASSPATH中(JAVA_HEAP_MAX和HADOOP_OPTS不是必须的),然后调用类来提交Jar到Hadoop。PS:对BashShell不熟的可以先看看这我们分析了bin/hadoop脚本,知道了提交一个Hadoop作业所需要的基本设置以及真正执行任务提交的类。这一篇我们就来分析这个提交任务的类,看它内部具体又做了些什么。RunJar是Hadoop中的一个工具类,结构很简单,只有两个方法:main和unJar。我们从main开始一步步分析。main首先检查传递参数是否符合要求,然后从第一个传递参数中获取jar包的名字,并试图从jar中包中获取manifest信息,以查找mainclassname。如果查找不到mainclassname,则把传递参数中的第二个设为mainclassname。接下去,就是在""下创建一个临时文件夹,并挂载上关闭删除线程。这个临时文件夹用来放置解压后的jar包内容。jar包的解压工作由unJar方法完成,通过JarEntry逐个获取jar包内的内容,包括文件夹和文件,然后释放到临时文件夹中。解压完毕后,开始做classpath的添加,依次把解压临时文件夹、传递进来的jar包、临时文件夹内的classes文件夹和lib里的所有jar包加入到classpath中。接着以这个classpath为搜索URL新建了一个URLClassLoader(要注意这个类加载器的parent包括了刚才bin/hadoop脚本提交时给的classpath),并设置为当前线程的上下文类加载器。最后,利用方法,以刚才的那个URLClassLoader为类加载器,动态生成一个mainclass的Class对象,并获取它的main方法,然后以传递参数中剩下的参数作为调用参数来调用这个main方法。好了,从上分析看来,这个RunJar类是一个很简单的类,就是解压传递进来的jar包,再添加一些classpath,然后动态调用jar包里的mainclass的main方法。看到这里,我想你应该知道如何利用java代码来编写一个替代bin/hadoop的程序了,主要就是两步:添加Hadoop的依赖库和配置文件;解压jar包,再添加一些classpath,并动态调用相应方法。最偷懒的方法,直接用RunJar类就行了。通过前面两篇文章的分析,对Hadoop的作业提交流程基本明了了,下面我们就可以开始编写代码模拟这个流程。第一步要做的是添加Hadoop的依赖库和配置文件到classpath。最常用的方法就是用一个容器先把各个要添加到classpath的文件或文件夹存储起来,后面再作为类加载器的URL搜索路径。/**

*Addadirectoryorfiletoclasspath.

*

*@paramcomponent

*/

publicstaticvoidaddClasspath(Stringcomponent){

if((component!=null)&&()>0)){

try{

Filef=newFile(component);

if()){

URLkey=().toURL();

if(!(key)){

(key);}}}catch(IOExceptione){}}}上面的classPath变量就是我们声明用来装载classpath组件的容器。privatestaticArrayList<URL>classPath=newArrayList<URL>();由于需要添加一些文件夹下的所有Jar包,所以我们还要实现一个遍历添加某文件夹下文件的方法。/**

*Addalljarsindirectorytoclasspath,sub-directoryisexcluded.

*

*@paramdirPath

*/

publicstaticvoidaddJarsInDir(StringdirPath){

Filedir=newFile(dirPath);

if(!()){

return;

}

File[]files=();

if(files==null){

return;

}

for(inti=0;i<;i++){if(files[i].isDirectory()){continue;}else{addClasspath(files[i].getAbsolutePath());}}}简单起见,这个方法没有使用Filter,对文件夹下的文件是通吃,也忽略掉了子文件夹,只处理根文件夹。好了,有了基础方法,下面就是照着bin/hadoop中脚本所做的,把相应classpath添加进去。/**

*Adddefaultclasspathlistedinbin/hadoopbash.

*

*@paramhadoopHome

*/

publicstaticvoidaddDefaultClasspath(StringhadoopHome){

addClasspath(hadoopHome+"/conf");

addClasspath(hadoopHome+"/build/classes");

if(newFile(hadoopHome+"/build/webapps").exists()){

addClasspath(hadoopHome+"/build");

}

addClasspath(hadoopHome+"/build/test/classes");

addClasspath(hadoopHome+"/build/tools");

if(newFile(hadoopHome+"/webapps").exists()){

addClasspath(hadoopHome);

}

addJarsInDir(hadoopHome);

addJarsInDir(hadoopHome+"/build");

addJarsInDir(hadoopHome+"/lib");addJarsInDir(hadoopHome+"/lib/");addJarsInDir(hadoopHome+"/build/ivy/lib/Hadoop/common");}至此,该添加classpath的都已添加好了(未包括第三方库,第三方库可用Conf中的tmpjars属性添加。),下去就是调用RunJar类了。本文为了方便,把RunJar中的两个方法提取了出来,去掉了一些可不要的Hadoop库依赖,然后整合到了类EJob里。主要改变是把原来解压Jar包的“文件夹改为"",并提取出了fullyDelete方法。利用这个类来提交Hadoop作业很简单,下面是一个示例:args=newString[4];

args[0]="E:\\Research\\Hadoop\\";

args[1]="pi";

args[2]="2";

args[3]="100";

armainclassargs命令类似,但是忽略掉了bin/hadoopjar这个命令,因为我们现在不需要这个脚本来提交作业了。新建一个Project,添加一个class,在main里粘上上面的代码,然后RunasJavaApplication。注意看你的Console,你会发现你已经成功把作业提交到Hadoop上去了。有图有真相,粘一下我的运行示例(在Win上开Eclipse,HadoopCluster在Linux,配置文件同Cluster的一样)。下面是在ClouderaDesktop看到的Job信息(它的时间是UTC的)。用上述方法,我们可以做一个类似ClouderaDesktop的Web应用,接受用户提交的Jar,并在Action处理中提交到Hadoop中去运行,然后把结果返回给用户。由于篇幅原因,加上前面介绍过RunJar类,所本文没有粘关于RunJar类的代码,不过你放心,本文提供例子工程下载。你可以在此基础上优化它,添加更多功能。由于大部分是Hadoop的代码,So,该代码基于ApacheLicense。-->>点我下载<<--到此,以Java方式提交Hadoop作业介绍完毕。但,是否还可以再进一步呢现在还只能提交打包好的MR程序,尚不能像HadoopEclipsePlugin那样能直接对包含Mapper和Reducer的类RunonHadoop。为什么直接对这些类RunasJavaApplication提交的作业是在Local运行的呢这其中又包含有什么秘密呢我们将在下面的文章中更深入的剖析Hadoop的作业提交代码,去到最底层,慢慢揭开它的黑面纱。前面我们所分析的部分其实只是Hadoop作业提交的前奏曲,真正的作业提交代码是在MR程序的main里,RunJar在最后会动态调用这个main,在(二)里有说明。我们下面要做的就是要比RunJar更进一步,让作业提交能在编码时就可实现,就像HadoopEclipsePlugin那样可以对包含Mapper和Reducer的MR类直接RunonHadoop。一般来说,每个MR程序都会有这么一段类似的作业提交代码,这里拿WordCount的举例:

Configurationconf=newConfiguration();

String[]otherArgs=newGenericOptionsParser(conf,args).getRemainingArgs();

if!=2){

"Usage:wordcount<in><out>");

(2);

}

Jobjob=newJob(conf,"wordcount");

;

;

;

;

;

;

(job,newPath(otherArgs[0]));

(job,newPath(otherArgs[1]));

(true)0:1);首先要做的是构建一个Configuration对象,并进行参数解析。接着构建提交作业用的Job对象,并设置作业Jar包、对应Mapper和Reducer类、输入输出的Key和Value的类及作业的输入和输出路径,最后就是提交作业并等待作业结束。这些只是比较基本的设置参数,实际还支持更多的设置参数,这里就不一一介绍,详细的可参考API文档。一般分析代码都从开始一步步分析,但我们的重点是分析提交过程中发生的事,这里我们先不理前面的设置对后面作业的影响,我们直接跳到作业提交那一步进行分析,当碰到问题需要分析前面的代码时我会再分析。当调用时,其内部调用的是submit方法来提交,如果传入参数为ture则及时打印作业运作信息,否则只是等待作业结束。submit方法进去后,还有一层,里面用到了job对象内部的jobClient对象的submitJobInternal来提交作业,从这个方法才开始做正事。进去第一件事就是获取jobId,用到了jobSubmitClient对象,jobSubmitClient对应的类是JobSubmissionProtocol的实现之一(目前有两个实现,JobTracker和LocalJobRunner),由此可判断出jobSubmitClient对应的类要么是JobTracker,要么是LocalJobRunner。呃,这下有点想法了,作业提交是上到JobTracker去,还是在本地执行?可能就是看这个jobSunmitClient初始化时得到的是哪个类的实例了,我们可以稍稍的先往后看看,你会发现submitJobInternal最后用了(jobId)来提交作业,再稍稍看看JobTracker和LocalJobRunner的submitJob实现,看来确实是这么回事。好,那我们就先跳回去看看这个jobSubmitClient是如何初始化的。在JobClient的init中我们可以发现jobSubmitClient的初始化语句:

Stringtracker=("","local");

if("local".equals(tracker)){

=newLocalJobRunner(conf);

}else{

=createRPCProxy(conf),conf);}哈,跟conf中的属性有关,如果你没设置,那默认得到的值就是local,jobSubmitClient也就会被赋予LocalJobRunner的实例。平时,我们开发时一般都只是引用lib里面的库,不引用conf文件夹里的配置文件,这里就能解释为什么我们直接RunasJavaApplication时,作业被提交到Local去运行了,而不是HadoopCluster中。那我们把conf文件夹添加到classpath,就能RunonHadoop了么?目前下结论尚早,我们继续分析(你添加了conf文件夹后,可以提交试一试,会爆出一个很明显的让你知道还差什么的错误,这里我就卖卖官子,先不说)。

jobId获取到后,在SystemDir基础上加jobId构建了提交作业的目录submitJobDir,SystemDir由JobClient的getSystemDir方法得出,这个SystemDir在构建fs对象时很重要,确定了返回的fs的类型。下去的configureCommandLineOptions方法主要是把作业依赖的第三方库或文件上传到fs中,并做classpath映射或Symlink,以及一些参数设置,都是些细微活,这里不仔细分析。我们主要关心里面的两个地方,一个是:FileSystemfs=getFs();看上去很简单,一句话,就是获取FileSystem的实例,但其实里面绕来绕去,有点头晕。因为Hadoop对文件系统进行了抽象,所以这里获得fs实例的类型决定了你是在hdfs上操作还是在localfs上操作。好了,我们冲进去看看。

publicsynchronizedFileSystemgetFs()throwsIOException{

if==null){

PathsysDir=getSystemDir();

=(getConf());}returnfs;}看见了吧,fs是由sysDir的getFileSystem返回的。我们再冲,由于篇幅,下面就只列出主要涉及的语句。

(),conf);

(uri,conf);

fs=createFileSystem(uri,conf);

Class<?>clazz=("fs."+()+".impl",null);

if(clazz==null){

thrownewIOException("NoFileSystemforscheme:"+());

}

FileSystemfs=(FileSystem)(clazz,conf);

(uri,conf);

returnfs;又是跟conf有关,看来conf是得实时跟住的。这里用到了Java的反射技术,用来动态生成相应的类实例。其中的class获取与有密切关系,而uri就是在刚才的sysDir基础上构成,sysDir的值又最终是由jobSubmitClient的实例决定的。如果jobSubmitClient是JobTracker的实例,那Scheme就是hdfs。如果是LocalJobRunner的实例,那就是file。从你可以找到如下的信息:<property>

<name>

<value><description>TheFileSystemforfile:uris.</description></property>

<property>

<name>

<value><description>TheFileSystemforhdfs:uris.</description></property>所以在前面的作业提交代码中,在初始化Job实例时,很多事已经决定了,由conf文件夹中的配置文件决定。Configuration是通过当前线程上下文的类加载器来加载类和资源文件的,所以要想RunonHadoop,第一步必须要让Conf文件夹进入Configuration的类加载器的搜索路径中,也就是当前线程上下文的类加载器。第二个要注意的地方是:StringoriginalJarPath=

();if(originalJarPath!=null){

if("".equals())){

(newPath(originalJarPath).getName());

}

());

(newPath(originalJarPath),submitJarFile);

(submitJarFile,replication);

(submitJarFile,newFsPermission(JOB_FILE_PERMISSION));

}else{

("Nojob

jar

file

set.Userclassesmaynotbefound."+"SeeJobConf(Class)orJobConf#setJar(String).");}因为client在提交作业到Hadoop时需要把作业打包成jar,然后copy到fs的submitJarFile路径中。如果我们想RunonHadoop,那就必须自己把作业的class文件打个jar包,然后再提交。在Eclipse中,这就比较容易了。这里假设你启用了自动编译功能。我们可以在代码的开始阶段加入一段代码用来打包bin文件夹里的class文件为一个jar包,然后再执行后面的常规操作。在configureCommandLineOptions方法之后,submitJobInternal会检查输出文件夹是否已存在,如果存在则抛出异常。之后,就开始划分作业数据,并根据split数得到maptasks的数量。最后,就是把作业配置文件写入submitJobFile,并调用(jobId)最终提交作业。至此,对Hadoop的作业提交分析也差不多了,有些地方讲的比较啰嗦,有些又讲得点到而止,但大体的过程以及一些较重要的东西还是说清楚了,其实就是那么回事。下去的文章我们会在前面的jobUtil基础上增加一些功能来支持RunonHadoop,其实主要就是增加一个打包Jar的方法。经过上一篇的分析,我们知道了Hadoop的作业提交目标是Cluster还是Local,与conf文件夹内的配置文件参数有着密切关系,不仅如此,其它的很多类都跟conf有关,所以提交作业时切记把conf放到你的classpath中。因为Configuration是利用当前线程上下文的类加载器来加载资源和文件的,所以这里我们采用动态载入的方式,先添加好对应的依赖库和资源,然后再构建一个URLClassLoader作为当前线程上下文的类加载器。publicstaticClassLoadergetClassLoader(){

ClassLoaderparent=().getContextClassLoader();

if(parent==null){

parent=

}

if(parent==null){

parent=();

}

returnnewURLClassLoader(newURL[0]),parent);}代码很简单,废话就不多说了。调用例子如下:("/usr/lib/conf");

ClassLoaderclassLoader=();

().setContextClassLoader(classLoader);设置好了类加载器,下面还有一步就是要打包Jar文件,就是让Project自打包自己的class为一个Jar包,我这里以标准Eclipse工程文件夹布局为例,打包的就是bin文件夹里的class。

publicstaticFilecreateTempJar(Stringroot)throwsIOException{

if(!newFile(root).exists()){

returnnull;

}

Manifestmanifest=newManifest();

().putValue("Manifest-Version","");

finalFilejarFile=("EJob-",".jar",newFile("")));

().addShutdownHook(newThread(){

publicvoidrun(){

();

}

});

JarOutputStreamout=newJarOutputStream(newFileOutputStream(jarFile),

manifest);

createTempJarInner(out,newFile(root),"");

();

();

returnjarFile;

}

privatestaticvoidcreateTempJarInner(JarOutputStreamout,Filef,

Stringbase)throwsIOException{

if()){

File[]fl=();

if()>0){

base=base+"/";

}

for(inti=0;i<;i++)

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论