版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、spark任务提交源码入手小试牛刀spark由于实际工作当中,都是将spark的任务提交到yarn集群上面去,所以我们安装spark的环境只需要安装一个任务提交客户端即可第一步:下载安装包并解压node01下载spark3.0的安装包 cd/kkb/softwget/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgztar-zxfspark-3.0.0-bin-hadoop3.2.tgz-C/kkb/install/第二步:修改配置文件node01执行以下命令修改spark-env.sh配置文件 cd/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/cpspark-env.sh.templatespark-env.shvimspark-env.shexportJAVA_HOME=/kkb/install/jdk1.8.0_141exportHADOOP_HOME=/kkb/install/hadoop-3.1.4exportHADOOP_CONF_DIR=/kkb/install/hadoop-3.1.4/etc/hadoopexportSPARK_CONF_DIR=/kkb/install/spark-3.0.0-bin-hadoop3.2/confexportYARN_CONF_DIR=/kkb/install/hadoop-3.1.4/etc/hadoopnode01执行以下命令修改slaves配置文件cd/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/cpslaves.templateslavesvimslaves#编辑文件内容添为以下node01node02node03node01执行以下命令修改spark-defaults.conf配置选项cd/kkb/install/spark-3.0.0-bin-hadoop3.2/confcpspark-defaults.conf.templatespark-defaults.confvimspark-defaults.confspark.eventLog.enabledspark.eventLog.dirspark.eventLpresstruehdfs://node01:8020/spark_logtrue第三步:安装包的分发将node01机器的spark安装包分发到其他机器上面去node01执行以下命令进行分发 cd/kkb/install/scp-rspark-3.0.0-bin-hadoop3.2/node02:$PWDscp-rspark-3.0.0-bin-hadoop3.2/node03:$PWDnode01执行以下命令启动spark集群 hdfsdfs-mkdir-p/spark_logcd/kkb/install/spark-3.0.0-bin-hadoop3.2sbin/start-all.shsbin/start-history-server.sh第五步:访问浏览器管理界面直接浏览器访问 http://node01:8080查看spark集群管理webUI界面。注意,如果8080端口没法访问,顺延8081端口进行访问,如果8081端口也没法访问,继续往后顺延端口号http://node01:18080/访问查看spark的historyserver地址2、spark运行计算圆周率之任务提交过程spark集群安装运行成功之后,我们就可以运行计算spark的任务了,例如我们可以提交一个spark的自带案例来计算圆周率,其中spark的任务提交又有多种方式,例如local模式,standAlone模式或者yarn模式等等,其中我们实际工作当中用的最多的就是yarn模式,以下是几种提交运行模式的介绍sparklocal local模式不用启动任何的spark的进程,只需要解压一个spark的安装包就可以直接使用了,基于local模式的client提交运行方式,提交命令如下binbin/spark-submit--classorg.apache.spark.examples.SparkPi--masterlocal--deploy-modeclient--executor-memory2G--total-executor-cores4examples/jars/spark-examples_2.12-3.0.0.jar10基于onyarn的cluster模式任务提交基于local模式的cluster提交运行方式,提交命令如下binbin/spark-submit--classorg.apache.spark.examples.SparkPi--masterlocal--deploy-modecluster--executor-memory2G--total-executor-cores4examples/jars/spark-examples_2.12-3.0.0.jar10我们会看到,基于local模式的cluster提交方式直接报错2.2、spark任务提交的standAlone模式基于standAlone的任务提交,需要我们安装搭建spark集群,并启动master以及worker进程提交命令如下bin/spark-submit--classorg.apache.spark.examples.SparkPi\--masterspark://node01:7077\--deploy-modeclient\--executor-memory2G\--total-executor-cores4\examples/jars/spark-examples_2.12-3.0.0.jar10基于cluster任务的提交命令bin/spark-submit--classorg.apache.spark.examples.SparkPi\--masterspark://node01:7077\--deploy-modecluster\--executor-memory2G\--total-executor-cores4\examples/jars/spark-examples_2.12-3.0.0.jar103、spark任务提交的yarn模式并且将任务提交到yarn集群上面去node01执行以下命令提交任务到yarn集群上面去运行 bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\examples/jars/spark-examples_2.12-3.0.0.jar50\其中sparkonyarncluster模式代码提交运行架构如下sparkonyarnclient模式运行.drawiobin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modecluster\examples/jars/spark-examples_2.12-3.0.0.jar50\其中sparkonyarnclient模式任务提交过程如下spark-submitonyarncluster任务提交.drawio3、spark任务提交脚本分析mit 提交任务的过程是通过spark-submit这个脚本来进行提交的,那我们就一起来看一下spark-submit这个脚本的内容#!/usr/bin/envbash##LicensedtotheApacheSoftwareFoundation(ASF)underoneormore#contributorlicenseagreements.SeetheNOTICEfiledistributedwith#thisworkforadditionalinformationregardingcopyrightownership.#TheASFlicensesthisfiletoYouundertheApacheLicense,Version2.0#(the"License");youmaynotusethisfileexceptincompliancewith#theLicense.YoumayobtainacopyoftheLicenseat##/licenses/LICENSE-2.0##Unlessrequiredbyapplicablelaworagreedtoinwriting,software#distributedundertheLicenseisdistributedonan"ASIS"BASIS,#WITHOUTWARRANTIESORCONDITIONSOFANYKIND,eitherexpressorimplied.#SeetheLicenseforthespecificlanguagegoverningpermissionsand#limitationsundertheLicense.#if[-z"${SPARK_HOME}"];thensource"$(dirname"$0")"/find-spark-home#disablerandomizedhashforstringinPython3.3+exportPYTHONHASHSEED=0exec"${SPARK_HOME}"/bin/spark-classorg.apache.spark.deploy.SparkSubmit"$@"查看发现spark-submit这个脚本里面执行了spark-class这个脚本,然后带了一个org.apache.spark.deploy.SparkSubmit这个参数,其中使用$@来将我们传入给spark-submit这个脚本的所有参数都传递过来了既然知道了执行了spark-class脚本,后面带上了org.apache.spark.deploy.SparkSubmit这个class类,那么我们就来看一下spark-class这脚本内容 #!/usr/bin/envbashif[-z"${SPARK_HOME}"];thensource"$(dirname"$0")"/find-spark-home."${SPARK_HOME}"/bin/load-spark-env.sh#Findthejavabinaryif[-n"${JAVA_HOME}"];thenRUNNER="${JAVA_HOME}/bin/java"elseif["$(command-vjava)"];thenRUNNER="java"elseecho"JAVA_HOMEisnotset">&2exit1#FindSparkjars.if[-d"${SPARK_HOME}/jars"];thenSPARK_JARS_DIR="${SPARK_HOME}/jars"elseSPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"if[!-d"$SPARK_JARS_DIR"]&&[-z"$SPARK_TESTING$SPARK_SQL_TESTING"];thenecho"FailedtofindSparkjarsdirectory($SPARK_JARS_DIR)."1>&2echo"YouneedtobuildSparkwiththetarget\"package\"beforerunningthisprogram."1>&2exit1elseLAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"#Addthelauncherbuilddirtotheclasspathifrequested.if[-n"$SPARK_PREPEND_CLASSES"];thenLAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"#Fortestsif[[-n"$SPARK_TESTING"]];thenunsetYARN_CONF_DIRunsetHADOOP_CONF_DIRbuild_command(){"$RUNNER"-Xmx128m$SPARK_LAUNCHER_OPTS-cp"$LAUNCH_CLASSPATH"org.apache.spark.launcher.Main"$@"printf"%d\0"$?}#Turnoffposixmodesinceitdoesnotallowprocesssubstitutionset+oposixCMD=()DELIM=$'\n'CMD_START_FLAG="false"whileIFS=read-d"$DELIM"-rARG;doif["$CMD_START_FLAG"=="true"];thenCMD+=("$ARG")elseif["$ARG"==$'\0'];then#AfterNULLcharacterisconsumed,changethedelimiterandconsumecommandstring.DELIM=''CMD_START_FLAG="true"elif["$ARG"!=""];thenecho"$ARG"done<<(build_command"$@")COUNT=${#CMD[@]}LAST=$((COUNT-1))LAUNCHER_EXIT_CODE=${CMD[$LAST]}if![[$LAUNCHER_EXIT_CODE=~echo"${CMD[@]}"|head-n-1exit1^[0-9]+$]];then2if[$LAUNCHER_EXIT_CODE!=0];thenexit$LAUNCHER_EXIT_CODECMD=("${CMD[@]:0:$LAST}")exec"${CMD[@]}"最后执行了$CMD这个命令,并且带上了所有的参数,那么CMD究竟是个什么东西,我们可以修改脚本给它打印出来看一下修改spark-class脚本,然后添加一行打印cd/kkb/install/spark-3.0.0-bin-hadoop3.2/vimbin/spark-class#在后面位置添加一行打印命令出来CMD=("${CMD[@]:0:$LAST}")echo"${CMD[@]}"exec"${CMD[@]}"重新提交任务cd/kkb/install/spark-3.0.0-bin-hadoop3.2/bin/spark-submit--classorg.apache.spark.examples.SparkPi--masteryarn--deploy-modeclientexamples/jars/spark-examples_2.12-3.0.0.jar50下:[hadoop@node01spark-3.0.0-bin-hadoop3.2]$bin/spark-submit--classorg.apache.spark.examples.SparkPi--masteryarn--deploy-modeclientexamples/jars/spark-examples_2.12-3.0.0.jar50/kkb/install/jdk1.8.0_141/bin/java-cp/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/:/kkb/install/spark-3.0.0-bin-hadoop3.2/jars/*:/kkb/install/hadoop-3.1.4/etc/hadoop/-Xmx1gorg.apache.spark.deploy.SparkSubmit--masteryarn--deploy-modeclient--classorg.apache.spark.examples.SparkPiexamples/jars/spark-examples_2.12-3.0.0.jar50观察打印信息,主要就是执行了一个命令java-cp/kkb/install/spark-3.0.0-bin-hadoop3.2/conf/:/kkb/install/spark-3.0.0-bin-hadoop3.2/jars/*:/kkb/install/hadoop-3.1.4/etc/hadoop/-Xmx1gorg.apache.spark.deploy.SparkSubmit--masteryarn--deploy-modeclient--classorg.apache.spark.examples.SparkPiexamples/jars/spark-examples_2.12-3.0.0.jar50就是执行了一个java的命令,通过org.apache.spark.deploy.SparkSubmit来进行了任务提交,其实就是启动了一个jvm的虚拟机进程来执行了任务的提交,就是执行了SparkSubmit的main方法,我们可以去查看源码,找到SparkSubmit的main方法,验证启动的过程步骤4、SparkSubmit源码分析前面我们通过脚本查看到了我们提交任务都是通过SparkSubmit这个类,那么我们就可以通过源码当中的SparkSubmit来查看这个类的main方法,main方法作为入口启动了一个java的进程,通过IDEA的快捷键Ctrl+shift+alt+N来搜索SparkSubmit这类,然后知道Object当中的main方法、查看main方法 /**/***在这里作为程序的入口类overridedefmain(args:Array[String]):Unit={valsubmit=newSparkSubmit(){self=>/***解析传入的参数*@paramargs*@returnoverrideprotecteddefparseArguments(args:Array[String]):SparkSubmitArguments={newSparkSubmitArguments(args){overrideprotecteddeflogInfo(msg:=>String):Unit=self.logInfo(msg)overrideprotecteddeflogWarning(msg:=>String):Unit=self.logWarning(msg)overrideprotecteddeflogError(msg:=>String):Unit=self.logError(msg)}}overrideprotecteddeflogInfo(msg:=>String):Unit=printMessage(msg)overrideprotecteddeflogWarning(msg:=>String):Unit=printMessage(s"Warning:$msg")overrideprotecteddeflogError(msg:=>String):Unit=printMessage(s"Error:$msg")overridedefdoSubmit(args:Array[String]):Unit={try{super.doSubmit(args)}catch{casee:SparkUserAppException=>exitFn(e.exitCode)}}}/***最终的任务提交submit.doSubmit(args)}进入到super.doSubmit方法Submit其实就是执行了SparkSubmit内部当中的doSubmit方法 overridedefdoSubmit(args:Array[String]):Unit={try{super.doSubmit(args)}catch{casee:SparkUserAppException=>exitFn(e.exitCode)}}3、查看父类当中的doSubmit方法 查看doSubmit方法当中的parseArguments方法defdoSubmit(args:Array[String]):Unit={//Initializeloggingifithasn'tbeendoneyet.Keeptrackofwhetherloggingneedsto//beresetbeforetheapplicationstarts./***初始化日志记录valuninitLog=initializeLogIfNecessary(true,silent=true)/***解析参数所有的参数都解析出来封装到一个对象里面去了叫做SparkSubmitArgumentsvalappArgs=parseArguments(args)if(appArgs.verbose){logInfo(appArgs.toString)}/***使用模式匹配来执行任务提交的各种操作appArgs.actionmatch{caseSparkSubmitAction.SUBMIT=>submit(appArgs,uninitLog)caseSparkSubmitAction.KILL=>kill(appArgs)caseSparkSubmitAction.REQUEST_STATUS=>requestStatus(appArgs)caseSparkSubmitAction.PRINT_VERSION=>printVersion()}}3.1、查看doSubmit方法当中的parseArguments方法查看到parseArguments valappArgs=parseArguments(args)点击parseArguments方法进入到这个方法的具体实现如下protectedprotecteddefparseArguments(args:Array[String]):SparkSubmitArguments={newSparkSubmitArguments(args)}可以看到该方法就是创建了SparkSubmitArguments这个对象,点击进入到这个对象当中来3.2、查看SparkSubmitArguments当中的parse方法查看方法解析SparkSubmitOptionParser这个java类当中的parse方法 3.2.1、查看handle这个方法所在的具体实现handle这个方法的具体实现通过idea的快捷键ctrl+shift+h的方式,我们可以看到具体实现在SparkSubmitArguments当中,我们直接去看SparkSubmitArguments当中的实现方法SparkSubmitArguments当中的handle方法如下:这个方法主要就是在解析各种参数3.3、查看SparkSubmitArguments当中的loadEnvironmentArguments方法给action赋值 前面已经查看了SparkSubmitArguments当中的parse方法,该方法主要就是解析我们的参数,然后给每个指定的参数进行赋值了,下面还有一个方法叫做loadEnvironmentArguments这个方法主要就是给action进行赋值的查看SparkSubmitArguments当中的loadEnvironmentArguments()方法查看loadEnvironmentArguments方法3.4、action赋值成功之后在SparkSubmit当中的doSubmit方法当中提交任务 给action赋值成功之后,默认值就是SUBMIT,那么在SparkSubmit当中执行doSubmit方法当中,使用模式匹配来进行任务提交3.5、查看SparkSubmit当中的submit方法 找到了默认就是任务提交之后,我们就可以直接去看submit方法的实现了,通过submit方法来提交任务了3.6、查看SparkSubmit当中的runMain方法上面通过SparkSumit当中的doRunMain方法,执行到了runMain方法,查看runMain方法里面的任务具体提交过程 在上述的runMain方法当中,执行了一行代码valval(childArgs,childClasspath,sparkConf,childMainClass)=prepareSubmitEnvironment(args)这一行代码至关重要,创建了一个childMainClass这个属性值,有了这个属性值,才能继续给下方的mainClass进行赋值3.6.1、查看SparkSubmit的prepareSubmitEnvironment这个方法在prepareSubmitEnvironment这个方法当中给childMainClass进行了赋值 通过提交模式为yarn模式,将childMainclass赋值为了org.apache.spark.deploy.yarn.YarnClusterApplication//Inyarn-clustermode,useyarn.Clientasawrapperaroundtheuserclass/***判断如果是yarn集群模式,那么给childMainClass赋值为org.apache.spark.deploy.yarn.YarnClusterApplicationif(isYarnCluster){childMainClass=YARN_CLUSTER_SUBMIT_CLASSif(args.isPython){childArgs+=("--primary-py-file",args.primaryResource)childArgs+=("--class","org.apache.spark.deploy.PythonRunner")}elseif(args.isR){valmainFile=newPath(args.primaryResource).getNamechildArgs+=("--primary-r-file",mainFile)childArgs+=("--class","org.apache.spark.deploy.RRunner")}else{if(args.primaryResource!=SparkLauncher.NO_RESOURCE){childArgs+=("--jar",args.primaryResource)}childArgs+=("--class",args.mainClass)}if(args.childArgs!=null){args.childArgs.foreach{arg=>childArgs+=("--arg",arg)}}}这样就得到了childMainclass的最终结果值为org.apache.spark.deploy.yarn.YarnClusterApplication3.7、查看SparkSubmit当中的runMain方法的app.start方法 上面通过childMainclass的最终结果值为org.apache.spark.deploy.yarn.YarnClusterApplication,然后执行了app.start查看SparkSubmit当中的runMain方法里面的app.start方法3.8、查看YarnClusterApplication当中的start方法调用了start方法之后其实就是调用了org.apache.spark.deploy.yarn.YarnClusterApplication当中的start方法通过IDE的快捷键ctrl+shift+alt+N在SparkSource工程当中查找YarnClusterApplication这个类3.8.1、查看newClient创造Client对象创建了Client对象,在client对象的构造器当中,申明了一个yarnClient对象,这个对象的创建调用了YarnClient.createYarnClient这个方法3.8.2、查看YarnClient对象的创建查看YarnClient.createYarnClient这个方法,其实就是通过new创建了一个YarnClient对象 3.8.3、查看YarnClientImpl的对象创建通过newYarnClientImpl();来创建了YarnClientImpl这个对象,查看这个对象的创建初始化方法 这个方法又调用了super(YarnClientImpl.class.getName());这一行代码3.9、查看Client当中的run方法前面通过YarnClusterApplication当中的start方法创建了Client对象,然后调用了run方法 查看run方法的具体实现类容如下:3.10、查看submitApplication方法的具体实现在Client当中,通过submitApplication正式向ResourceManager提交了任务 appContext来源于上面定义的一个变量3.10.1、查看Client当中的appContext的内容定义 在Client当中执行submitApplication方法来提交任务的时候,该方法当中通过yarnClient.submitApplication(appContext)这一句代码来实现了任务的提交,在提交任务的时候携带了一个参数叫做appContext查看createApplicationSubmissionContext方法3.10.1、查看Client当中的containerContext前面已经看过了appContext当中主要就是定义解析一些参数,接下来就继续来查看 valvalcontainerContext=createContainerLaunchContext(newAppResponse)这一行代码当中的创建容器的方法,查看createContainerLaunchContext方法实现如下4、查看ApplicationMaster的启动流程 前面找到了我们通过java命令行的方式启动了一个java进程,叫做ApplicationMaster,这个ApplicationMaster想要启动执行,肯定也是有一个main方法的执行入口,通过main方法的执行入口来执行程序的启动,使用idea的快捷键ctrl+alt+shift+N打开ApplicationMaster,然后找到Object伴生对象当中的main方法4.1、查看ApplicationMaster类当中的main方法下面的ApplicationMasterArguments类的初始化在ApplicationMaster类当中的main下面执行了一行代码 valvalamArgs=newApplicationMasterArguments(args),这一句代码创建了一个ApplicationMasterArguments这个对象,这个对象专门用于解析前面传入过来的参数,查看ApplicationMasterArguments类的初始化4.2、查看ApplicationMaster在main方法当中创建的对象ApplicationMaster在ApplicationMaster这个伴生对象的main方法当中,还执行了一句代码 master=newApplicationMaster(amArgs,sparkConf,yarnConf)这一句代码主要就是创建了ApplicationMaster这个对象,这个对象的初始化创建过程如下在创建ApplicationMaster这个对象的时候,可以看到在里面初始化了一个YarnRMClient这个对象,这个对象其实就是一个中间的桥梁,用于连接ApplicationMaster与ResourceManager。4.3、查看ApplicationMaster中的main方法里面执行的master.run方法在ApplicationMaster当中的main方法里面,执行了一行代码如下:ugiugi.doAs(newPrivilegedExceptionAction[Unit](){overridedefrun():Unit=System.exit(master.run())这一行代码当中执行了master.run,其实就是开始运行ApplicationMaster了,我们可以查看run方法的具体实现内容如下4.3.1、查看run方法当中的runDriver方法的具体实现在ApplicationMaster的run方法当中,执行了一段代码如下 if(isClusterMode){runDriver()}else{runExecutorLauncher()}判断如果是集群模式,那么就执行了runDriver这个方法,通过runDriver这个方法,启动了driver程序判断如果是集群模式,那么就执行了runDriver这个方法,通过runDriver这个方法,启动了driver程序runDriver当中主要运行了两条任务线第一条是跟资源相关的资源申请线第二条是跟用户程序执行相关的程序执行线那么我们可以去看一下runDrvier方法的具体实现内容如下在runDriver方法当中,主要执行了启动用户的程序,等待sparkContext对象的创建,注册ApplicationMaster以及分配container等多个动作。其中runDriver主要分为两条线往下运行第一条线:启动集群环境,申请运行资源相关的第二条线:resumeDriver保证用户代码继续往下执行4.3.2、查看runDriver方法当中的startUserApplication方法的具体实现在runDriver方法当中的具体实现里面,执行了一句代码 userClassThread=startUserApplication()通过这一行代码启动了用户的应用程序,其中startUserApplication方法的具体实现如下4.3.3、查看runDriver方法当中的createAllocator方法具体实现在ApplicationMaster的runDriver方法当中,执行了一行代码如下 createAllocatorcreateAllocator(driverRef,userConf,rpcEnv,appAttemptId,distCacheConf)这一行代码主要就是在进行container的分配,该方法的具体实现如下:4.3.4、继续查看createAllocator方法当中的allocateResources方法内部实现在上面ApplicationMaster当中调用createAllocator方法当中,执行了一行代码如下 allocatorallocator.allocateResources()在allocateResources方法的具体实现如下4.3.5、查看allocateResources方法内部的handleAllocatedContainers方法的具体实现在YarnAllocator当中,我们看到在执行allocateResources的时候,我们通过 valallocatedContainers=allocateResponse.getAllocatedContainers()这一行代码获取到了所有的可分配的container,然后通过下一行代码if(allocatedContainers.size>0){logDebug(("Allocatedcontainers:%d.Currentexecutorcount:%d."+"Launchingexecutorcount:%d.Clusterresources:%s.")allocatedContainers.size,sizenumExecutorsStarting.get,allocateResponse.getAvailableResources))handleAllocatedContainers(allocatedContainers.asScala)}使用了handleAllocatedContainers(allocatedContainers.asScala)来进行容器container的分配,handleAllocatedContainers的方法的具体实现如下4.3.5、查看handleAllocatedContainers方法当中的runAllocatedContainers方法内部具体实现在YarnAllocator当中执行handleAllocatedContainers方法时,执行了一行代码如下 runAllocatedContainers(containersToUse)这一行代码主要就是在进行运行分配好了的container了,这个runAllocatedContainers方法的具体实现内容如下该方法的具体实现如下4.3.6、查看线程的执行的run方法 在YarnAllocator当中执行runAllocatedContainers方法的时候,使用了线程池launcherPool的方式进行执行,最后调用run方法代码如下newExecutorRunnable(Some(container),conf,sparkConf,driverUrl,executorId,ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID//useuntilfullysupported).run()run方法的具体内容如下4.3.7、查看startContainer方法的具体实现 在上面ExecutorRunnable当中,执行run方法的时候,在run方法当中初始化了NodeManagerClient这个对象,这个对象主要是是用于与NodeManager进行通信,通过客户端与NodeManager服务端进行通信,然后通过调用startContainer方法在NodeManager上面创建Exector进程,在run方法内部有一行代码执行如下startContainer()4.3.8、查看在ExecutorRunnable当中运行startContainer方法调用prepareCommand方法的过程在ExecturoRunnable当中执行startContainer方法的时候,调用了一行代码valvalcommands=prepareCommand()这一行代码主要就是在准备java的一个命令,通过java-server的方式来启动另外一个java进程,查看prepareCommand方法的具体实现如下ecutor 前面看到了我们通过java命令启动了一个Executor的进程,那么就是执行了org.apache.spark.executor.YarnCoarseGrainedExecutorBackend这个类当中的main方法,启动了java的一个进程,那么我们就可以直接去找这个类的main方法去查看这个进程的启动过程,使用idea的ctrl+alt+shift+N快捷键来查找YarnCoarseGrainedExecutorBackend这个类,并找到它的main方法作为程序的入口类5.1、查看YarnCoarseGrainedExecutorBackend这个Object当中main方法的run方法执行在YarnCoarseGrainedExecutorBackend当中执行了main方法,在main方法里面执行了 CoarseGrainedExecutorBackendCoarseGrainedExecutorBackend.run(backendArgs,createFn)这样一行代码,这样一行代码当中执行了run方法,查看run方法的具体实现如下5.2、查看run方法当中的setupEndPoint方法的具体实现 在上面CoarseGrainedExecutorBackend这个class类当中在执行run方法的时候,run方法当中有一行代码如下env.rpcEnv.setupEndpoint("Executor",backendCreateFn(env.rpcEnv,arguments,env,cfg.resourceProfile))其中通过sparkEnv对象,调用了rpcEnv这个属性,这个对象来调用了setupEndPoint这个方法,这个方法的具体实现代码如下,通过idea的快捷键ctrl+shift+h快捷键可以看到该方法的具体实现内容如下通过消息转发器dispatcher来调用了registerRpcEndpoint5.3、查看setupEndpoint方法当中的registerRpcEndpoint方法的具体实现在上面NettyRpcEnv当中调用了setupEndpoint方法的时候,执行了一行代码如下 dispatcher.registerRpcEndpoint(name,endpoint)通过消息转发器dispatcher来注册了Rpc的Endpoint这个终端,registerRpcEndpoint方法的具体实现内容如下5.4、查看registerRpcEndpoint方法当中的DedicatedMessageLoop对象创建过程在registerRpcEndpoint方法当中,执行了一段代码如下varmessageLoop:MessageLoop=nulltry{messageLoop=endpointmatch{casee:IsolatedRpcEndpoint=>newDedicatedMessageLoop(name,e,this)case_=>sharedLoop.register(name,endpoint)sharedLoop}endpoints.put(name,messageLoop)}catch{caseNonFatal(e)=>moveendpointthrowe}在这一行代码当中,创建了一个对象DedicatedMessageLoop,这个对象当中创建了inbox收件箱对象,主要用于收取消息,对象的创建过程如下在上面DedicatedMessageLoop对象创建的时候,我们会发现调用了一行代码如下: privateprivatevalinbox=newInbox(name,endpoint)这里其实就是创建了一个Inbox对象,通过Inbox这个对象来实现数据的接受,Inbox的具体实现内容如下在inbox这个对象初始化的时候调用了//OnStartshouldbethefirstmessagetoprocessinbox.synchronized{messages.add(OnStart)}其实就是通过LinkedList给InboxMessage添加了一个onStart的事件进去了,这就涉及到通信的生命周期,在spark框架当中,通信的总的接口定义在了RpcEndpoint这个trait当中。在RpcEndpoint这个trait当中定义了一系列的生命周期的顺序,在RpcEndpoint当中是这样定义注释的其中CoarseGrainedExecutorBackend这个类也是RpcEndPoint的子类调用onStart之后,执行CoarseGrainedExecutorBackend的onStart()方法逻辑如下5.6、查看CoarseGrainedExecutorBackend类的onStart方法 前面在看到了通过inbox调用了onStart事件,然后inBox是DedicatedMessageLoop当中定义的对象,DedicatedMessageLoop又是YarnCoarseGrainedExecutorBackend当中定义的对象,而YarnCoarseGrainedExecutorBackend又是CoarseGrainedExecutorBackend的子类,所以这里通过inbox调用了onStart之后,就会执行CoarseGrainedExecutorBackend当中的onStart方法,onStart的方法的具体定义如下在上面onStart的方法内容定义如下overridedefonStart():Unit={logInfo("Connectingtodriver:"+driverUrl)try{_resources=parseOrFindResources(resourcesFileOpt)}catch{caseNonFatal(e)=>exitExecutor(1,"Unabletocreateexecutordueto"+e.getMessage,e)}rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap{ref=>//Thisisaveryfastactionsowecanuse"ThreadUtils.sameThread"driver=Some(ref)//向Driver当中注册Executor通过RegisterExecutor样例类的方式来包装了发送的消息出去ref.ask[Boolean](RegisterExecutor(executorId,self,hostname,cores,fileid}(ThreadUtils.sameThread).onComplete{caseSuccess(_)=>self.send(RegisteredExecutor)caseFailure(e)=>exitExecutor(1,s"Cannotregisterwithdriver:$driverUrl",e,notifyDriver=false)}在这个方法当中调用了RegisterExecutor,通过RegisterExecutor来封装了样例类,所以这里发送消息,一定会有个地方能接收到消息ref.ask[Boolean](RegisterExecutor(executorId,self,hostname,cores,通过RegisterExecutor其实就是向Driver端进行通信,向Driver端进行通信注册Executor,方便Driver端后续做DAG的划分以及task的分解,将分解之后的task运行在Executor上面,既然是向Driver端进行注到Driver端的响应以及回复5.7、简单查看Driver端的消息接收以及回复过程sparkContext对象当中有一个属性叫做SchedulerBackend SchedulerBackend其实就是我们的通信后台,SchedulerBackend是一个trait,可以找到他的实现类为CoarseGrainedSchedulerBackend5.8、查看CoarseGrainedSchedulerBackend当中消息接收以及回复receiveAndReply方法 我们通过SchedulerBackend找到他的实现类为CoarseGrainedSchedulerBackend,在这会进行通信,里面有一个发方法叫做receiveAndReply这个方法,专门用于接收消息,并进行回复的,该方法的定义内容如下其中CoarseGrainedExecutorBackend与CoarseGrainedSchedulerBackend的请求以及响应关系如下图CoarseGrainedExecutorBackend与CoarseGrainedSchedulerBackend通信流程.drawio至此,通过以上的通信环境,我们整个的运行环境全部创建成功,有了driver有了executor,有了resourceManager,有了nodeManager等,就可以进行任务的计算了,至此,在启动ApplicationMaster的时候,第一条线路当中的资源环境问题就已经正式启动成功。剩下的就是第二条线,运行用户代码的问题了。4、spark任务组件之间的通信源码深入剖析1、通信基本概念介绍NIO,AIO等这几种通信模型的话,那么我们需要先了解阻塞和非阻塞,同步和非同步的概念阻塞和非阻塞是指进程在访问数据的时候,数据内部是否准备就绪的一种处理方式。当数据没有准备的时候阻塞:需要等待缓冲区的数据准备好才去处理之后的事情,否则一直等待下去非阻塞:无论缓冲区的数据是否准备好,都立刻返回同步和异步都是基于应用程序和操作系统处理IO时间锁采用的方式。同步:应用程序要直接参与IO读写的操作,在处理IO事件的时候必须阻塞在某个方法上的等待我们IO完成的时间(阻塞IO事件或者通过轮询IO事件的方式)。阻塞直到IO事件遇到write或者read,这个时候我们不能做任何我们想去做的事情,让读写方法加入到线程中,通过阻塞线程来实现,这样对线程的性大。 异步:所有的IO读写都交给操作系统处理,此时应用程序可以处理其他事情,当操作系统完成IO后给应用程序一个通知即可。3、socket通信介绍socket: Socket是应用层与TCP/IP协议族通信的中间软件抽象层,它是一组接口。在设计模式中,Socket其实就是一个门面模式,它把复杂的TCP/IP协议族隐藏在Socket接口后面,对用户来说,一组简单的接口就是全部,让Socket去组织数据,以符合指定的协议先从服务器端说起。服务器端先初始化Socket,然后与端口绑定(bind),对端口进行监听(listen),调用accept阻塞,等待客户端连接。在这时如果有个客户端初始化一个Socket,然后连接服务器(connect),如果连接成功,这时客户端与服务器端的连接就建立了。客户端发送数据请求,服务器端接收请求并处理请求,然后把回应数据发送给客户端,客户端读取数据,最后关闭连接,一次交互结束优点1)传输数据为字节级,传输数据可自定义,数据量小(对于手机应用讲:费用低)2)传输数据时间短,性能高3)适合于客户端和服务器端之间信息实时交互4)可以加密,数据安全性强1)需对传输的数据进行解析,转化成应用级的数据2)对开发人员的开发水平要求高3)相对于Http协议传输,增加了开发量4)效率低下同步阻塞I/O模式,全称BlockingIO,数据的读取写入必须阻塞在一个线程内等待其完成 它是基于流模型实现的,交互的方式是同步、阻塞方式,也就是说在读入输入流或者输出流时,在读写动作完成之前,线程会一直阻塞在那里,它们之间的调用时可靠的线性顺序。它的优点就是代码比较简单、直观;缺点就是IO的效率和扩展性很低,容易成为应用性能瓶颈。主要最大的缺点就是会对我们的操作进行阻塞,例如我们去医院看医生,需要排队取号,如果中途过号了,那么对不起,过号不厚,要么退号,要么重新挂号,这就限制了我们取了号之后非得要在医院里面等着啥也不能干,尽管你不知道你得要等多久,所以这种阻塞式的IO非常不友好,会浪费我们大量的时间采用BIO通信模型的服务端,通常由一个独立的Acceptor线程负责监听客户端的连接,接收到客户端连接之后为客户端连接创建一个新的线程处理请求消息,处理完成之后,返回应答消息给客户端,线程销。该架构最大的问题就是不具备弹性伸缩能力,当并发访问量增加后,服务端的线程个数和并发访问数成线性正比,由于线程是JAVA虚拟机非常宝贵的系统资源,当线程数膨胀之后,系统的性能急剧下降,随着并发量的继续增加,可能会发生句柄溢出、线程堆栈溢出等问题,并导致服务器最终宕机。 NIO:一种非阻塞式通信模式,线程在执行这个通信业务过程中,如果有一个环节没有准备好,那么线程可以去执行其他任务,线程占用的情况大幅度释放。例如我们去饭店吃饭,以前没有外卖的时候,我们去饭店吃饭得要排队等着,等着厨师把我们的饭菜做好了,然后我们才能开吃,这种模型就类似于BIO,阻塞式IO,效率极其低下,为了节约等待的时间,等待的时候我们可以去打球,每隔一会儿回来问一下老板我们的饭菜有没有做好 传统的IO操作面向数据流,意味着每次从流中读一个或多个字节,直至完成,数据没有被缓存在任何地方。NIO操作面向缓冲区,数据从Channel读取到Buffer缓冲区,随后在Buffer中处理数据。利用Buffer读写数据,通常遵循四个步骤:1.把数据写入buffer;2.调用flip;3.从Buffer中读取数据;4.调用buffer.clear()当写入数据到buffer中时,buffer会记录已经写入的数据大小。当需要读数据时,通过flip()方法把buffer从写模式调整为读模式;在读模式下,可以读取所有已经写入的数据。2、channel通道javaNIOChannel通道和流非常相似,主要有以下几点区别:1.通道可以读也可以写,流一般来说是单向的(只能读或者写)。2.通道可以异步读写。3.通道总是基于缓冲区Buffer来读写。正如上面提到的,我们可以从通道中读取数据,写入到buffer;也可以中buffer内读数据,写入到通道有个示意图:Channel有:1.FileChannel2.DatagramChannel3.SocketChannel4.ServerSocketChannelTCP的数据读写。ServerSocketChannel允许我们监听TCP链接请求,每个请求会创建会一个SocketChannel。3、selector选择器如此可以实现单线程管理多个channels,也就是可以管理多个网络链接。通过上面的了解我们知道Selector是一种IOmultiplexing的情况。下面这幅图描述了单线程处理三个channel的情况:1、多个Client同时注册到多路复用器selector上;2、selector遍历所有注册的通道;3、查看通道状态(包括Connect、Accept、Read、Write);4、根据状态执行相应状态的操作; 1.由一个专门的线程来处理所有的IO事件,并负责分发。2.事件驱动机制:事件到的时候触发,而不是同步的去监视事件。3.线程通讯:线程之间通过wait,notify等方式通讯。保证每次上下文切换都是有意义的。减少无谓6、AIO通信模型介绍(异步非阻塞式IO) JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0。也就是我们要介绍的AIO。NIO2.0引入了新的异步通道的概念,并提供了异步文件通道和异步套接字通道的实现。还是以吃饭为例,我们有了外卖之后,我们就彻底与老板进行解耦了,我们下单老板做饭,然后外卖员给我们送过来,在这等待老板做饭的时间,我们可以继续去做别的事情,吃饭等待的时间也被我们利用起来了,更加提高了效率用户程序可以通过向内核发出I/O请求命令,不用等带I/O事件真正发生,可以继续做另外的事情,等I/O操作完成,内核会通过函数回调或者信号机制通知用户进程。这样很大程度提高了系统吞吐量。异步通道提供两种方式获取操作结果。(1)通过Java.util.concurrent.Future类来表示异步操作的结果;(2)在执行异步操作的时候传入一个Java.nio.channels.CompletionHandler接口的实现类作为操作完成的回调。NIO2.0的异步套接字通道是真正的异步非阻塞IO,它对应UNIX网络编程中的事件驱动IO(AIO),它不需要通过多路复用器(Selector)对注册的通道进行轮询操作即可实现异步读写,从而简化了NIO的编程模我们可以得出结论:异步SocketChannel是被动执行对象,我们不需要想NIO编程那样创建一个独立的IO线程来处理读写操作。对于AsynchronousServerSocketChannel和AsynchronousSocketChannel,它们都由JDK底层的线程池负责回调并驱动读写操作。正因为如此,基于NIO2.0新的异步非阻塞Channel进行编程比NIO编程更为简单。信模型的对比 由上述总结得出,并不意味着所有的Java网络编程都必须要选择NIO和Netty,具体选择什么样的IO模型或者NIO框架,完全基于业务的实际应用场景和性能诉求,如果客户端并发连接数不多,周边对接的网元不多,服务器的负载也不重,那就完全没必要选择NIO做服务端;如果是相反情况,那就考虑选择合NIO行开发。定义bio服务端
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2024年广告发布委托协议3篇
- 酒店合作经营协议
- 总经销授权协议
- 体育赛事合同管理规范文本
- 美术馆绿化草坪施工协议
- 房地产开发招投标记录
- 学生特殊饮食需求管理
- 房地产开发项目工资支付承诺
- 孕期胯骨疼的临床特征
- 检验科医师招聘协议书样本
- -2月班主任随堂听课记录表
- 黑布林英语阅读初一年级16《柳林风声》译文和答案
- 人音版音乐七年级下册《红河谷》课件
- U校园 新一代大学英语(提高篇)综合教程1 (全)
- 中药饮片出库单
- 《河南省高标准农田示范区“投融建运管”一体化推进操作导则(试行)》
- 危重患者营养支持的意义及时机
- 林业基础知识考试复习题库(浓缩500题)
- 国开2023春《语言学概论》形考任务1-3+大作业参考答案
- 六年级上册《比》《圆》测试题(A4版)
- 网络用语对现代汉语词汇学习的影响研究
评论
0/150
提交评论