Spark实战第5部分使用MLPipeline构建机器学习工作流_第1页
Spark实战第5部分使用MLPipeline构建机器学习工作流_第2页
Spark实战第5部分使用MLPipeline构建机器学习工作流_第3页
Spark实战第5部分使用MLPipeline构建机器学习工作流_第4页
Spark实战第5部分使用MLPipeline构建机器学习工作流_第5页
已阅读5页,还剩7页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Spark实战,第5部分:使用MLPipeline构建机器学习工作流本文将通过一个分类预测的机器学习问题向读者展示如何使用Spark新的MLPipeline库构建机器学习的工作流。通过本文的阅读,读者将会了解到MLPipeline与MLlib相比在设计上的独到和使用上的不同之处,并且会深入理解MLPipeline的基本概念和工作方式,为进一步学习和深入研究打下良好的基础。查看本系列更多内容

|

2

评论:王龙,软件工程师,IBM2015年11月02日内容在IBMBluemix云平台上开发并部署您的下一个应用。开始您的试用引言使用机器学习(MachineLearning)技术和方法来解决实际问题,已经被成功应用到多个领域,我们经常能够看到的实例有个性推荐系统,金融反欺诈,自然语言处理和机器翻译,模式识别,智能控制等。一个典型的机器学习机器学习过程通常会包含:源数据ETL,数据预处理,指标提取,模型训练与交叉验证,新数据预测等。我们可以看到这是一个包含多个步骤的流水线式工作,也就是说数据从收集开始,要经历多个步骤,才能得到我们需要的输出。在本系列第4部分已经向大家介绍了SparkMLlib机器学习库,虽然MLlib已经足够简单易用,但是如果目标数据集结构复杂需要多次处理,或者是对新数据进行预测的时候需要结合多个已经训练好的单个模型进行综合预测(集成学习的思想),那么使用MLlib将会让程序结构复杂,难于理解和实现。值得庆幸的是,在Spark的生态系统里,一个可以用于构建复杂机器学习工作流应用的新库已经出现了,它就是Spark1.2版本之后引入的MLPipeline,经过几个版本的发展,截止目前的1.5.1版本已经变得足够稳定易用了。本文将向读者详细地介绍SparkMLPipeline的设计思想和基本概念,以及如何使用MLPipeline提供的API库编写一个解决分类预测问题的Pipeline式应用程序。相信通过本文的学习,读者可以较为深入的理解MLPipeline,进而将它推广和应用到更多复杂问题的解决方案上去。回页首关于MLPipelineSparkMLPipeline的出现,是受到了

scikit-learn

项目的启发,并且总结了MLlib在处理复杂机器学习问题上的弊端,旨在向用户提供基于DataFrame之上的更加高层次的API库,以更加方便的构建复杂的机器学习工作流式应用。一个Pipeline在结构上会包含一个或多个PipelineStage,每一个PipelineStage都会完成一个任务,如数据集处理转化,模型训练,参数设置或数据预测等,这样的PipelineStage在ML里按照处理问题类型的不同都有相应的定义和实现。接下来,我们先来了解几个重要概念。DataFrame关于DataFrame其实我们已经在本系列第3部分介绍过了,它较之RDD,包含了schema信息,更类似传统数据库中的二维表格。它被MLPipeline用来存储源数据。DataFrame可以被用来保存各种类型的数据,如我们可以把特征向量存储在DataFrame的一列中,这样用起来是非常方便的。TransformerTransformer中文可以被翻译成转换器,是一个PipelineStage,实现上也是继承自PipelineStage类,主要是用来把一个DataFrame转换成另一个DataFrame,比如一个模型就是一个Transformer,因为它可以把一个不包含预测标签的测试数据集DataFrame打上标签转化成另一个包含预测标签的DataFrame,显然这样的结果集可以被用来做分析结果的可视化。EstimatorEstimator中文可以被翻译成评估器或适配器,在Pipeline里通常是被用来操作DataFrame数据并生产一个Transformer,如一个随机森林算法就是一个Estimator,因为它可以通过训练特征数据而得到一个随机森林模型。实现上Estimator也是继承自PipelineStage类。ParameterParameter被用来设置Transformer或者Estimator的参数。要构建一个Pipeline,首先我们需要定义Pipeline中的各个PipelineStage,如指标提取和转换模型训练等。有了这些处理特定问题的Transformer和Estimator,我们就可以按照具体的处理逻辑来有序的组织PipelineStages并创建一个Pipeline,如valpipeline=newPipeline().setStages(Array(stage1,stage2,stage3,…))。然后就可以把训练数据集作为入参并调用Pipelin实例的fit方法来开始以流的方式来处理源训练数据,这个调用会返回一个PipelineModel类实例,进而被用来预测测试数据的标签,它是一个Transformer。回页首随机森林及ML的实现随机森林构建于决策树之上,顾名思义,就是随机的构建一个包含多个决策树的森林。随机森林里的决策树之间是独立的,在随机森林模型构建好以后,对于新来的测试样本数据,随机森林模型会让其中的每个决策树分别做一次预测,然后统计出现此处最多的预测标签,并将它作为最终的预测标签。随机森林算法运用的就是集成学习的思想,在实践中,随机森林往往都有很好表现,并且多次预测结果稳定并且精度非常高,也不容易出现过拟合的问题。也是笔者最喜欢并且最常用的一种机器学习算法。本文并不会重点介绍随机森林的基本理论,因为网上这样的文章已经很多了,本文将把重点放在对SparkML中随机森林的实现以及可调参数的介绍。关于随机森林算法的详细介绍大家可以参考维基百科上的随机森林介绍。SparkML中随机森林实现是在RandomForestClassifier类中,位于org.apache.spark.ml.classification包中,该实现中支持设置的主要参数如下:featuresCol训练数据集DataFrame中存储特征数据的列名。labelCol标签列的名称。impurity树节点选择的不纯度的衡量指标,取值可以是”entroy”或“gini”,默认是”gini”。maxBins离散连续性变量时最大的分箱数,默认是32。理论上箱数越大粒度就越细,但是针对特定的数据集总有一个合理的箱数。maxDepth树的最大深度,默认值是5。numTrees随机森林需要训练的树的个数,默认值是20。predictionCol算法预测结果的存储列的名称,默认是”prediction”。rawPredictionCol原始的算法预测结果的存储列的名称,默认是”rawPrediction”probabilityCol类别预测结果的条件概率值存储列的名称,默认值是”probability”在后文中大家可以看到如何在程序中设置这些参数。可以调用RandomForestClassifier.setXXX方法或者在ParamMap里设定参数,然后再调用RandomForestClassifier.fit方法时传入ParamMap实例,如:RandomForestClassifier的fit方法从源头上来讲,是来自Predictor类(Estimator的子类),Predictor类的fit方法设计和实现上实际上是采用了模板方法的设计模式,具体会调用实现类的train方法图1.Predictor类的fit方法实现预览所以对于RandomForestClassifier类我们最需要关注的就是train方法,其中包含具体从源数据DataFrame训练一个随机森林模型的过程。train方法在提取出DataFrame数据集中的label和features数据之后,进一步调用RandomForest.run方法去真正的开始训练随机森林模型,训练结束后会返回一个RandomForestClassificationModel类实例,这是一个Transformer,会被用来预测测试数据集。图2.RandomForestClassifier类的train方法实现预览对于RandomForest类的run方法的具体实现逻辑,已经在developerWorks的“Spark随机森林算法原理、源码分析及案例实战”

一文中有详细介绍,为了避免内容冲突,本文的内容将重点放在MLPipeline的实现层次关系上,在这里不做赘述。回页首目标数据集预览本文所使用的测试数据集来自UCI的

banknoteauthenticationdataset

,这是一个从纸币鉴别过程中的图片里提取的数据集,总共包含五个列,前4列是指标值(连续型),最后一列是真假标识。图3.测试数据集格式四列依次是小波变换图像的方差,小波变换图像的偏态,小波变换图像的峰度,图像熵,类别标签。其实读者并不需要知道什么是小波变换及其相关改变,只需要知道这是四个特征指标的值,我们将根据这些指标训练模型使用模型预测类别。对于该数据集的更多信息,读者可以参考UCI官网的描述。回页首案例分析与编码实现前面提到,本文的目的是使用SparkMLPipeline构建一个对目标数据集进行分类预测的机器学习工作流,案例背景已经相当清晰,在了解了数据集本身和MLPipeline的相关知识后,接下来就是编程实现了。关于实现基本思路和关键的11个步骤笔者已经在代码中做了详细解释,为了方便读者理解,这里特别的把该实例的Pipeline里包含的4个Stage重点介绍下。这四个Stage分别对应代码注释里的步骤2-5,作用如下:第一个,使用StringIndexer去把源数据里的字符Label,按照Label出现的频次对其进行序列编码,如,0,1,2,…。在本例的数据中,可能这个步骤的作用不甚明显,因为我们的数据格式良好,Label本身也只有两种,并且已经是类序列编码的”0”和”1”格式。但是对于多分类问题或者是Label本身是字符串的编码方式,如”High”,”Low”,”Medium”等,那么这个步骤就很有用,转换后的格式,才能被Spark更好的处理。第二个,使用VectorAssembler从源数据中提取特征指标数据,这是一个比较典型且通用的步骤,因为我们的原始数据集里,经常会包含一些非指标数据,如ID,Description等。第三个,创建一个随机森林分类器RandomForestClassifier实例,并设定相关参数,主要是告诉随机森林算法输入DataFrame数据里哪个列是特征向量,哪个是类别标识,并告诉随机森林分类器训练5棵独立的子树。第四个,我们使用IndexToStringTransformer去把之前的序列编码后的Label转化成原始的Label,恢复之前的可读性比较高的Label,这样不论是存储还是显示模型的测试结果,可读性都会比较高。这几个Stage都会被用来构建Pipeline实例,并且会按照顺序执行,最终我们根据得到的PipelineModel实例,进一步调用其transform方法,去用训练好的模型预测测试数据集的分类。清单1.示例程序源代码importorg.apache.spark.ml.Pipelineimportorg.apache.spark.ml.classification._importorg.apache.spark.ml.evaluation.MulticlassClassificationEvaluatorimportorg.apache.spark.ml.feature.{IndexToString,StringIndexer,VectorAssembler}importorg.apache.spark.ml.param.ParamMapimportorg.apache.spark.sql.SQLContextimportorg.apache.spark.{SparkConf,SparkContext}objectClassificationPipeline{defmain(args:Array[String]){if(args.length<1){println("Usage:ClassificationPipelineinputDataFile")sys.exit(1)}valconf=newSparkConf().setAppName("ClassificationwithMLPipeline")valsc=newSparkContext(conf)valsqlCtx=newSQLContext(sc)/**Step1*Readthesourcedatafileandconvertittobeadataframewithcolumnsnamed.*3.6216,8.6661,-2.8073,-0.44699,0*4.5459,8.1674,-2.4586,-1.4621,0*3.866,-2.6383,1.9242,0.10645,0*3.4566,9.5228,-4.0112,-3.5944,0*0.32924,-4.4552,4.5718,-0.9888,0*......*/valparsedRDD=sc.textFile(args(0)).map(_.split(",")).map(eachRow=>{vala=eachRow.map(x=>x.toDouble)(a(0),a(1),a(2),a(3),a(4))})valdf=sqlCtx.createDataFrame(parsedRDD).toDF("f0","f1","f2","f3","label").cache()/****Step2*StringIndexerencodesastringcolumnoflabels*toacolumnoflabelindices.Theindicesarein[0,numLabels),*orderedbylabelfrequencies.*Thiscanhelpdetectlabelinrawdataandgiveitanindexautomatically.*Sothatitcanbeeasilyprocessedbyexistingsparkmachinelearningalgorithms.**/vallabelIndexer=newStringIndexer().setInputCol("label").setOutputCol("indexedLabel").fit(df)/***Step3*DefineaVectorAssemblertransformertotransformsourcefeaturesdatatobeavector*Thisishelpfulwhenrawinputdatacontainsnon-featurecolumns,anditiscommonfor*suchainputdatafiletocontaincolumnssuchas"ID","Date",etc.*/valvectorAssembler=newVectorAssembler().setInputCols(Array("f0","f1","f2","f3")).setOutputCol("featureVector")/***Step4*CreateRandomForestClassifierinstanceandsettheinputparameters.*Herewewilluse5treesRandomForesttotrainoninputdata.*/valrfClassifier=newRandomForestClassifier().setLabelCol("indexedLabel").setFeaturesCol("featureVector").setNumTrees(5)/***Step5*Convertindexedclasslabelsbacktooriginalonesothatitcanbeeasilyunderstoodwhenwe*needtodisplayorsavethepredictionresulttoafile.*/vallabelConverter=newIndexToString().setInputCol("prediction").setOutputCol("predictedLabel").setLabels(labelIndexer.labels)//Step6//Randomlysplittheinputdataby8:2,while80%isfortraining,therestisfortesting.valArray(trainingData,testData)=df.randomSplit(Array(0.8,0.2))/***Step7*CreateaMLpipelinewhichisconstructedbyfor4PipelineStageobjects.*andthencallfitmethodtoperformdefinedoperationsontrainingdata.*/valpipeline=newPipeline().setStages(Array(labelIndexer,vectorAssembler,rfClassifier,labelConverter))valmodel=pipeline.fit(trainingData)/***Step8*Performpredictionsabouttestingdata.ThistransformmethodwillreturnaresultDataFrame*withnewpredictioncolumnappendedtowardspreviousDataFrame.***/valpredictionResultDF=model.transform(testData)/***Step9*Selectfeatures,label,andpredictedlabelfromtheDataFrametodisplay.*Weonlyshow20rows,itisjustforreference.*/predictionResultDF.select("f0","f1","f2","f3","label","predictedLabel").show(20)/***Step10*Theevaluatorcodeisusedtocomputethepredictionaccuracy,thisis*usuallyavaluablefeaturetoestimatepredictionaccuracythetrainedmodel.*/valevaluator=newMulticlassClassificationEvaluator().setLabelCol("label").setPredictionCol("prediction").setMetricName("precision")valpredictionAccuracy=evaluator.evaluate(predictionResultDF)println("TestingError="+(1.0-predictionAccuracy))/***Step11(Optional)*Youcanchoosetoprintorsavethethemodelstructure.*/valrandomForestModel=model.stages(2).asInstanceOf[RandomForestClassificationModel]println("TrainedRandomForestModelis:\n"+randomForestModel.toDebugString)}}回页首运行示例程序在运行程序之前,读者需要把目标数据集上传至你的HDFS上,并把测试程序打成jar包。清单2.示例程序运行命令及参数示例./spark-submit--classcom.ibm.spark.exercise.ml.ClassificationPipeline\

--masterspark://<spark_master>:<port>\

--num-executors6\

--driver-memory3g\

--executor-memory1g\

--executor-cores1/home/fams/spark_exercise-1.0.jar\

hdfs://<hdfs_

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论