java提交spark任务到yarn平台的配置讲解_第1页
java提交spark任务到yarn平台的配置讲解_第2页
java提交spark任务到yarn平台的配置讲解_第3页
java提交spark任务到yarn平台的配置讲解_第4页
java提交spark任务到yarn平台的配置讲解_第5页
已阅读5页,还剩4页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

java提交spark任务到yarn平台的配置讲解一、背景采用spark的方式处理,所以需要将spark的功能集成到代码,采用yarn客户端的方式管理spark任务。不需要将cdh的一些配置文件放到resource路径下,只需要配置一些配置即可,非常方便二、任务管理架构三、接口1、任务提交1./**2.*提交任务到yarn集群3.*4.*@paramconditions5.*yarn集群,spark,hdfs具体信息,参数等6.*@returnappid7.*/8.

publicStringsubmitSpark(YarnSubmitConditionsconditions){9.("初始化sparkonyarn参数");10.11.//初始化yarn客户端12.("初始化sparkonyarn客户端");13.Listargs=Lists.newArrayList("--jar",conditions.getApplicationJar(),"--class",14.conditions.getMainClass());15.

if(conditions.getOtherArgs()!=null&&conditions.getOtherArgs().size()>0){16.

for(Strings:conditions.getOtherArgs()){17.args.add("--arg");18.args.add(mons.lang.StringUtils.join(newString[]{s},","));19.}20.}21.22.//identifythatyouwillbeusingSparkasYARNmode23.System.setProperty("SPARK_YARN_MODE","true");24.SparkConfsparkConf=newSparkConf();25.

if(mons.lang.StringUtils.isNotEmpty(conditions.getJobName())){26.sparkConf.setAppName(conditions.getJobName());27.}28.29.sparkConf.set("spark.yarn.jars",conditions.getSparkYarnJars());30.

if(conditions.getAdditionalJars()!=null&&conditions.getAdditionalJars().length>0){31.sparkConf.set("spark.jars",mons.lang.StringUtils.join(conditions.getAdditionalJars(),","));32.}33.34.

if(conditions.getFiles()!=null&&conditions.getFiles().length>0){35.sparkConf.set("spark.files",mons.lang.StringUtils.join(conditions.getFiles(),","));36.}37.

for(Map.Entrye:conditions.getSparkProperties().entrySet()){38.sparkConf.set(e.getKey().toString(),e.getValue().toString());39.}40.41.//添加这个参数,不然spark会一直请求:8030,一直重试42.sparkConf.set("yarn.resourcemanager.hostname",conditions.getYarnResourcemanagerAddress().split(":")[0]);43.//设置为true,不删除缓存的jar包,因为现在提交yarn任务是使用的代码配置,没有配置文件,删除缓存的jar包有问题,44.sparkConf.set("spark.yarn.preserve.staging.files","true");45.46.//初始化yarn的配置47.Configurationcf=newConfiguration();48.Stringos=System.getProperty("");49.

booleancross_platform=false;50.

if(os.contains("Windows")){51.cross_platform=true;52.}53.cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务54.//设置yarn资源,不然会使用localhost:803255.cf.set("yarn.resourcemanager.address",conditions.getYarnResourcemanagerAddress());56.//设置namenode的地址,不然jar包会分发,非常恶心57.cf.set("fs.defaultFS",conditions.getSparkFsDefaultFS());58.59.ClientArgumentscArgs=newClientArguments(args.toArray(newString[args.size()]));60.Clientclient=newClient(cArgs,cf,sparkConf);61.("提交任务,任务名称:"+conditions.getJobName());62.63.

try{64.65.ApplicationIdappId=client.submitApplication();66.67.

returnappId.toString();68.69.}catch(Exceptione){70.logger.error("提交spark任务失败",e);71.

returnnull;72.}finally{73.

if(client!=null){74.client.stop();75.}76.}77.}2、任务进度获取1./**2.*停止spark任务3.*4.*@paramyarnResourcemanagerAddress5.*yarn资源管理器地址,例如:master:8032,查看yarn集群获取具体地址6.*@paramappIdStr7.*需要取消的任务id8.*/9.

publicvoidkillJob(StringyarnResourcemanagerAddress,StringappIdStr){10.("取消spark任务,任务id:"+appIdStr);11.//初始化yarn的配置12.Configurationcf=newConfiguration();13.Stringos=System.getProperty("");14.

booleancross_platform=false;15.

if(os.contains("Windows")){16.cross_platform=true;17.}18.cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务19.//设置yarn资源,不然会使用localhost:803220.cf.set("yarn.resourcemanager.address",yarnResourcemanagerAddress);21.22.//创建yarn的客户端,此类中有杀死任务的方法23.YarnClientyarnClient=YarnClient.createYarnClient();24.//初始化yarn的客户端25.yarnClient.init(cf);26.//yarn客户端启动27.yarnClient.start();28.

try{29.//根据应用id,杀死应用30.yarnClient.killApplication(getAppId(appIdStr));31.}catch(Exceptione){32.logger.error("取消spark任务失败",e);33.}34.//关闭yarn客户端35.yarnClient.stop();36.37.}3、任务取消1./**2.*获取spark任务状态3.*4.*5.*@paramyarnResourcemanagerAddress6.*yarn资源管理器地址,例如:master:8032,查看yarn集群获取具体地址7.*@paramappIdStr8.*需要取消的任务id9.*/10.

publicSparkTaskStategetStatus(StringyarnResourcemanagerAddress,StringappIdStr){11.("获取任务状态启动,任务id:"+appIdStr);12.//初始化yarn的配置13.Configurationcf=newConfiguration();14.Stringos=System.getProperty("");15.

booleancross_platform=false;16.

if(os.contains("Windows")){17.cross_platform=true;18.}19.cf.setBoolean("mapreduce.app-submission.cross-platform",cross_platform);//配置使用跨平台提交任务20.//设置yarn资源,不然会使用localhost:803221.cf.set("yarn.resourcemanager.address",yarnResourcemanagerAddress);22.("获取任务状态,任务id:"+appIdStr);23.24.SparkTaskStatetaskState=newSparkTaskState();25.//设置任务id26.taskState.setAppId(appIdStr);27.YarnClientyarnClient=YarnClient.createYarnClient();28.//初始化yarn的客户端29.yarnClient.init(cf);30.//yarn客户端启动31.yarnClient.start();32.ApplicationReportreport=null;33.

try{34.report=yarnClient.getApplicationReport(getAppId(appIdStr));35.}catch(Exceptione){36.logger.error("获取spark任务状态失败");37.}38.39.

if(report!=null){40.YarnApplicationStatestate=report.getYarnApplicationState();41.taskState.setState(());42.//任务执行进度43.

floatprogress=report.getProgress();44.taskState.setProgress(progress);45.//最终状态46.FinalApplicationStatusstatus=report.getFinalApplicationStatus();47.taskState.setFinalStatus(());48.}else{49.taskState.setState(ConstParam.SPARK_FAILED);50.taskState.setProgress(0.0f);51.taskState.setFinalStatus(ConstParam.SPARK_FAILED);52.}53.54.//关闭yarn客户端55.yarnClient.stop();56.("获取任务状态结束,任务状态:"+JSON.toJSONString(taskState));57.

returntaskState;58.}四、yarn参数调节1、可分配给容器的物理内存数量,一个nodemanage分配的内存,如果机器内存是128g,尽量分配2/3yarn.nodemanager.resource.memory-mb:80g2、可以为容器分配的虚拟CPU内核的数量。该参数在CDH4.4以前版本中无效。

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论