版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
第7章大数据分析算法本章目标本章主要内容是SparkMLlib中的经典聚类算法和分类算法,通过学习本章,应该达到以下目标:掌握几种经典的大数据聚类和分类算法的基本原理、详细步骤、应用实例和Spark实现。通过大数据聚类算法和分类算法的SparkMLlib实现进行实例分析,掌握大数据聚类和分类分析程序的设计思路和实现方法。本章内容7.1聚类算法7.1.1经典聚类算法聚类算法的基本原理K-Means算法二分K-Means算法高斯混合模型幂迭代聚类7.1.2大数据聚类算法的应用7.2分类算法7.2.1经典分类算法分类算法的基本原理决策树算法逻辑回归算法支持向量机算法随机森林算法朴素贝叶斯算法7.2.2大数据分类算法的应用7.1聚类算法7.1.1经典聚类算法聚类算法的基本原理目的:通过分析将没有标签的、事先不知道会分为几类的数据划分成若干簇,并保证每一簇中的数据尽量接近,而不同簇的数据距离尽量远。基本思路:利用数据集中数据特征值之间的距离远近来判断数据是否被划分到同一类别。聚类算法的基本原理聚类算法基于划分的聚类算法基于层次的聚类算法基于密度的聚类算法基于网格的聚类算法K-Means算法K-Means算法是一种迭代求解的基于划分的聚类算法。基本思路:首先初始化K个划分,然后将样本从一个划分转移到另一个划分以优化聚类质量,迭代此过程直至最大迭代次数。K-Means算法优点:易于理解算法复杂度低聚类效果虽然是局部最优但足够解决大部分问题对较大规模的数据集可以保证较好的伸缩性,即数据对象从几百到几百万都能保持聚类结果准确度一致。K-Means算法缺点:k值需要人为设定,不同的k值结果也不同。初始划分的中心也会影响聚类结果,不同的选取方式得到的结果不同。
K-Means算法对孤立点和噪声数据比较敏感,异常值会导致均值偏离严重,陷入局部最小值。离散程度高的分类、非凸分类、类别不平衡的分类就不适合使用K-Means算法。K-Means算法的基本步骤开始根据给定的k值,选取k个初始划分中心计算数据集中每个样本点到k个划分中心的距离,将样本点划分到距离最近的划分中心的类中针对每个类别计算样本点的平均值,得到新的划分中心是否达到最大迭代次数或划分中心变化是否小于设定的阈值结束否是K-Means算法的应用实例fromnumpyimportarray
frompysparkimportSparkContextfrompyspark.mllib.clusteringimportKMeans,KMeansModel
#appName:在集群webUI上显示的作业名称。#master:要连接到的集群
URL(例如
mesos:#host:port,spark:#host:port,#local[4])。sc=SparkContext(appName="KMeans_pyspark",master='local')第1步:引入必要的类。使用pyspark时需要引入SparkContext类,SparkContext是spark功能的入口点,在引入类后需要定义sc=SparkContext(appName="KMeans_pyspark",master='local'),否则运行程序时可能会报错。K-Means算法需要从pyspark.mllib.clustering中引入KMeans类和KMeansModel类。K-Means算法的应用实例nodes=array([0.0,0.0,1.0,1.0,9.0,8.0,8.0,9.0]).reshape(4,2)print(nodes)第2步:创建数据集。这里创建一个包含4个数据点、每个数据点包括2个属性的数据集。[[0.0.][1.1.][9.8.][8.9.]]结果如下:K-Means算法的应用实例model=KMeans.train(sc.parallelize(nodes),2,maxIterations=10,initializationMode="random")第3步:根据上面创建的数据集训练K-Means聚类模型。其中sc.parallelize()可以将数据集转化为MLlib需要的RDD格式,该方法有2个参数,第1个参数是待转化的数据集,第2个参数是数据集的切片数(默认值:None)。创建K-Means模型需要用到KMeans.train()方法。下面的例子中runs、seed、initializationSteps、epsilon和initialModel参数没有指定,使用默认值。K-Means算法的应用实例forclusterCenterinmodel.clusterCenters:print(clusterCenter)fornodeinnodes:print(str(node)+"belongstocluster"+str(model.clusterCenters[model.predict(node)]))第4步:根据KMeansModel自带的属性及方法获得聚类中心和样本点对应的聚类值。[8.58.5][0.50.5][0.0.]belongstocluster[0.50.5][1.1.]belongstocluster[0.50.5][9.8.]belongstocluster[8.58.5][8.9.]belongstocluster[8.58.5]结果如下:K-Means算法的spark实现K-Means算法的实现由方法runAlgorithmWithWeight()完成。该方法进行训练的整体思路是数据分区,将聚类中心点信息广播至各个分区计算每个中心点的累计距离距离(损失),累计坐标值和计数;以聚类的索引ID作为key进行reduce,计算整个数据集的每个中心点的累计距离和成本,累计坐标值和计数。privatedefrunAlgorithmWithWeight(data:RDD[VectorWithNorm],instr:Option[Instrumentation]):KMeansModel={
valsc=data.sparkContext
valinitStartTime=System.nanoTime()
valdistanceMeasureInstance=DistanceMeasure.decodeFromString(this.distanceMeasure)K-Means算法的spark实现
#初始化中心,支持random(随机选择)或K-Means||(选择最优样本点)
valcenters=initialModelmatch{caseSome(kMeansCenters)=>kMeansCenters.clusterCenters.map(newVectorWithNorm(_))caseNone=>if(initializationMode==KMeans.RANDOM){initRandom(data)}else{initKMeansParallel(data,distanceMeasureInstance)}}valnumFeatures=centers.head.vector.sizevalinitTimeInSeconds=(System.nanoTime()-initStartTime)/1e9logInfo(f"Initializationwith$initializationModetook$initTimeInSeconds%.3fseconds.")K-Means算法的spark实现varconverged=falsevarcost=0.0variteration=0valiterationStartTime=System.nanoTime()instr.foreach(_.logNumFeatures(numFeatures))valshouldDistributed=centers.length*centers.length*numFeatures.toLong>1000000LK-Means算法的spark实现
#执行Lloyd算法的迭代,直到收敛
while(iteration<maxIterations&&!converged){valbcCenters=sc.broadcast(centers)valstats=if(shouldDistributed){distanceMeasureIputeStatisticsDistributedly(sc,bcCenters)}else{distanceMeasureIputeStatistics(centers)}valbcStats=sc.broadcast(stats)valcostAccum=sc.doubleAccumulatorK-Means算法的spark实现
#找到新的中心
valcollected=data.mapPartitions{points=>valcenters=bcCenters.valuevalstats=bcStats.valuevaldims=centers.head.vector.sizevalsums=Array.fill(centers.length)(Vectors.zeros(dims))#使用clusterWeightSum计算聚类中心
#clustercenter=sample1*weight1/clusterWeightSum+sample2*weight2/clusterWeightSum+...K-Means算法的spark实现
#创建clusterWeightSum数组,长度为中心个数
valclusterWeightSum=Array.ofDim[Double](centers.length)points.foreach{point=> #针对每一个样本点
val(bestCenter,cost)=distanceMeasureInstance.findClosest(centers,stats,point) #计算样本点point属于哪个中心点和在该点下的cost值
costAccum.add(cost*point.weight)distanceMeasureInstance.updateClusterSum(point,sums(bestCenter))clusterWeightSum(bestCenter)+=point.weight}K-Means算法的spark实现Iterator.tabulate(centers.length)(j=>(j,(sums(j),clusterWeightSum(j)))).filter(_._2._2>0)}.reduceByKey{(sumweight1,sumweight2)=>axpy(1.0,sumweight2._1,sumweight1._1)(sumweight1._1,sumweight1._2+sumweight2._2)}.collectAsMap()if(iteration==0){instr.foreach(_.logNumExamples(costAccum.count))instr.foreach(_.logSumOfWeights(collected.values.map(_._2).sum))}
bcCenters.destroy()bcStats.destroy()K-Means算法的spark实现
#更新聚类中心和成本
converged=truecollected.foreach{case(j,(sum,weightSum))=>valnewCenter=distanceMeasureInstance.centroid(sum,weightSum)if(converged&&!distanceMeasureInstance.isCenterConverged(centers(j),newCenter,epsilon)){converged=false}centers(j)=newCenter}cost=costAccum.valueinstr.foreach(_.logNamedValue(s"Cost@iter=$iteration",s"$cost"))iteration+=1}K-Means算法的spark实现
valiterationTimeInSeconds=(System.nanoTime()-iterationStartTime)/1e9logInfo(f"Iterationstook$iterationTimeInSeconds%.3fseconds.")if(iteration==maxIterations){logInfo(s"KMeansreachedthemaxnumberofiterations:$maxIterations.")}else{logInfo(s"KMeansconvergedin$iterationiterations.")}logInfo(s"Thecostis$cost.")newKMeansModel(centers.map(_.vector),distanceMeasure,cost,iteration)}二分K-Means算法
二分K-Means算法的基本步骤开始将所有样本点划分到一个聚类簇中,再将这个簇一分为二选择划分后能最大程度降低SSE的簇作为可分解的簇使用K-Means算法将可分解的簇分解成两簇是否达到最大迭代次数或划分中心变化是否小于设定的阈值结束否是二分K-Means算法的应用实例fromnumpyimportarray
frompysparkimportSparkContextfrompyspark.mllib.clusteringimportBisectingKMeans,BisectingKMeansModel#appName:在集群webUI上显示的作业名称。#master:要连接到的集群URL(例如mesos:#host:port,spark:#host:port,#local[4])。sc=SparkContext(appName="BisectingKMeans_pyspark",master='local')第1步:引入必要的类。使用pyspark时需要引入SparkContext类,SparkContext是spark功能的入口点,在引入类后需要定义sc=SparkContext(appName="KMeans_pyspark",master='local'),否则运行程序时可能会报错。二分K-Means算法需要从pyspark.mllib.clustering中引入BisectingKMeans类和BisectingKMeansModel类。二分K-Means算法的应用实例nodes=array([0.0,0.0,1.0,1.0,9.0,8.0,8.0,9.0,5.0,4.0,4.0,5.0,3.0,4.0,4.0,2.0]).reshape(8,2)print(nodes)第2步:创建数据集。这里创建一个包含8个数据点、每个数据点包括2个属性的数据集。[[0.0.][1.1.][9.8.][8.9.][5.4.][4.5.][3.4.][4.2.]]结果如下:二分K-Means算法的应用实例bskm=BisectingKMeans()model=bskm.train(sc.parallelize(nodes,2),k=4)第3步:根据上面创建的数据集创建二分K-Means聚类模型。创建二分K-Means模型需要用到BisectingKMeans.train()方法。下面的例子中maxIterations、Mindivisbleclustersize和seed参数没有指定,使用默认值。二分K-Means算法的应用实例forclusterCenterinmodel.clusterCenters:print(clusterCenter)第4步:根据BisectingKMeansModel自带的属性及方法获得聚类中心和样本点对应的聚类值。[0.50.5][3.53.][4.54.5][8.58.5]结果如下:fornodeinnodes:print(str(node)+"belongstocluster"+str(model.clusterCenters[model.predict(node)]))[0.0.]belongstocluster[0.50.5][1.1.]belongstocluster[0.50.5][9.8.]belongstocluster[8.58.5][8.9.]belongstocluster[8.58.5][5.4.]belongstocluster[4.54.5][4.5.]belongstocluster[4.54.5][3.4.]belongstocluster[3.53.][4.2.]belongstocluster[3.53.]结果如下:二分K-Means算法的spark实现二分K-means算法在MLlib中也是通过API调用scala版本的BisectingKMeans类实现的。该算法从一个包含所有点的集群开始。它迭代地在底层找到可划分的簇,并使用K-means将每个簇平分,直到总共有k个簇或没有簇可划分为止。将同一层的簇的平分步骤组合在一起,以增加并行性。如果在底层将所有可分簇平分将导致更多的k个簇,更大的簇获得更高的优先级。deftrain(self,rdd,k=4,maxIterations=20,minDivisibleClusterSize=1.0,seed=-1888008604):#调用trainBisectingKMeansAPI java_model=callMLlibFunc("trainBisectingKMeans",rdd.map(_convert_to_vector),k,maxIterations,minDivisibleClusterSize,seed)returnBisectingKMeansModel(java_model)二分K-Means算法的spark实现#trainBisectingKMeansAPIdeftrainBisectingKMeans(data:JavaRDD[Vector],k:Int,maxIterations:Int,minDivisibleClusterSize:Double,seed:java.lang.Long):BisectingKMeansModel={valkmeans=newBisectingKMeans().setK(k).setMaxIterations(maxIterations).setMinDivisibleClusterSize(minDivisibleClusterSize)if(seed!=null)kmeans.setSeed(seed)kmeans.run(data)#调用BisectingKMeans类中的run()方法}二分K-Means算法的spark实现defrun(input:RDD[Vector]):BisectingKMeansModel={#将数据集由RDD[Vector]转化为RDD[(Vector,Double)]格式valinstances=input.map(point=>(point,1.0))#判断RDD当前是否设置存储级别valhandlePersistence=input.getStorageLevel==StorageLevel.NONE#调用runWithWeight()方法实现算法
runWithWeight(instances,handlePersistence,None)}Scala版本的BisectingKMeans类按照给定的参数运行run方法训练模型。具体源码如下:二分K-Means算法的spark实现#runWithWeight()方法具体实现二分K-means算法private[spark]defrunWithWeight(instances:RDD[(Vector,Double)],handlePersistence:Boolean,instr:Option[Instrumentation]):BisectingKMeansModel={vald=instances.map(_._1.size).firstlogInfo(s"Featuredimension:$d.")valdMeasure=DistanceMeasure.decodeFromString(this.distanceMeasure)valnorms=instances.map(d=>Vectors.norm(d._1,2.0))#计算数据的二范数#将数据转化成VectorWithNorm类
valvectors=instances.zip(norms).map{case((x,weight),norm)=>newVectorWithNorm(x,norm,weight)}if(handlePersistence){vectors.persist(StorageLevel.MEMORY_AND_DISK)}else{
二分K-Means算法的spark实现
#计算和缓存用于快速距离计算的向量范数。
norms.persist(StorageLevel.MEMORY_AND_DISK)}varassignments=vectors.map(v=>(ROOT_INDEX,v))varactiveClusters=summarize(d,assignments,dMeasure)instr.foreach(_.logNumExamples(activeClusters.values.map(_.size).sum))instr.foreach(_.logSumOfWeights(activeClusters.values.map(_.weightSum).sum))valrootSummary=activeClusters(ROOT_INDEX)valn=rootSummary.sizelogInfo(s"Numberofpoints:$n.")logInfo(s"Initialcost:${rootSummary.cost}.")valminSize=if(minDivisibleClusterSize>=1.0){math.ceil(minDivisibleClusterSize).toLong}else{math.ceil(minDivisibleClusterSize*n).toLong}二分K-Means算法的spark实现logInfo(s"Theminimumnumberofpointsofadivisibleclusteris$minSize.")varinactiveClusters=mutable.Seq.empty[(Long,ClusterSummary)]valrandom=newRandom(seed)varnumLeafClustersNeeded=k-1varlevel=1varpreIndices:RDD[Long]=nullvarindices:RDD[Long]=nullwhile(activeClusters.nonEmpty&&numLeafClustersNeeded>0&&level<LEVEL_LIMIT){
#可分集群具有足够大和非平凡成本。
vardivisibleClusters=activeClusters.filter{case(_,summary)=>(summary.size>=minSize)&&(summary.cost>MLUtils.EPSILON*summary.size)}
#如果我们不需要所有可分集群,选择较大的集群。
if(divisibleClusters.size>numLeafClustersNeeded){divisibleClusters=divisibleClusters.toSeq.sortBy{case(_,summary)=>-summary.size}.take(numLeafClustersNeeded).toMap}二分K-Means算法的spark实现if(divisibleClusters.nonEmpty){valdivisibleIndices=divisibleClusters.keys.toSetlogInfo(s"Dividing${divisibleIndices.size}clustersonlevel$level.")varnewClusterCenters=divisibleClusters.flatMap{case(index,summary)=>val(left,right)=splitCenter(summary.center,random,dMeasure)Iterator((leftChildIndex(index),left),(rightChildIndex(index),right))}.map(identity)#解决产生不可序列化映射的Scalabug(SI-7005)varnewClusters:Map[Long,ClusterSummary]=nullvarnewAssignments:RDD[(Long,VectorWithNorm)]=nullfor(iter<-0untilmaxIterations){newAssignments=updateAssignments(assignments,divisibleIndices,newClusterCenters,dMeasure).filter{case(index,_)=>divisibleIndices.contains(parentIndex(index))}newClusters=summarize(d,newAssignments,dMeasure)newClusterCenters=newClusters.mapValues(_.center).map(identity).toMap}二分K-Means算法的spark实现if(preIndices!=null){preIndices.unpersist()}preIndices=indicesindices=updateAssignments(assignments,divisibleIndices,newClusterCenters,dMeasure).keys.persist(StorageLevel.MEMORY_AND_DISK)assignments=indices.zip(vectors)inactiveClusters++=activeClustersactiveClusters=newClustersnumLeafClustersNeeded-=divisibleClusters.size}else{logInfo(s"Noneactiveanddivisibleclustersleftonlevel$level.Stopiterations.")inactiveClusters++=activeClustersactiveClusters=Map.empty}level+=1}二分K-Means算法的spark实现if(preIndices!=null){preIndices.unpersist()}if(indices!=null){indices.unpersist()}if(handlePersistence){vectors.unpersist()}else{norms.unpersist()}valclusters=activeClusters++inactiveClustersvalroot=buildTree(clusters,dMeasure)valtotalCost=root.leafNodes.map(_.cost).sumnewBisectingKMeansModel(root,this.distanceMeasure,totalCost)}高斯混合模型高斯混合模型(GaussianMixtureModel,GMM)与目前大多数聚类算法不同,其他聚类算法多以相似度为划分依据,而GMM则将概率作为判断依据。它假设所有样本都是由某个给定参数的多元高斯分布生成,通过样本点属于某一类别的概率大小来判断该样本点所属的聚类。基本思路:给定聚类的簇个数k,对给定数据集,使用极大似然估计法,推导出每一个混合成分的均值向量μ、协方差矩阵Σ和权重w,得到的多元高斯分布对应聚类的一个簇。高斯混合模型的基本步骤开始初始化K个多元高斯分布及其权重根据贝叶斯定理估计每个样本由每个成分生成的后验概率根据均值、协方差及上一步中得到的后验概率,更新均值向量、协方差矩阵和权重是否达到最大迭代次数或似然函数的增加值是否小于收敛阈值结束否是高斯混合模型的应用实例fromnumpyimportarrayfrompysparkimportSparkContext#frompyspark.mllib.linalgimportVectors,DenseMatrixfrompyspark.mllib.clusteringimportGaussianMixture,GaussianMixtureModelsc=SparkContext(appName="GMM_pyspark",master='local')第1步:引入必要的类。使用pyspark时需要引入SparkContext类,SparkContext是spark功能的入口点,在引入类后需要定义sc=SparkContext(appName="KMeans_pyspark",master='local'),否则运行程序时可能会报错。GMM算法需要从pyspark.mllib.clustering中引入GaussianMixture类和GaussianMixtureModel类,从pyspark.mllib.linalg中引入Vectors类和DenseMatrix类。高斯混合模型的应用实例nodes=array([-0.1,-0.05,-0.01,-0.1,0.9,0.8,\0.75,0.935,-0.83,-0.68,-0.91,-0.76])\.reshape(6,2)第2步:创建数据集。这里创建一个包含6个数据点、每个数据点包括2个属性的数据集。[[-0.1-0.05][-0.01-0.1][0.90.8][0.750.935][-0.83-0.68][-0.91-0.76]]结果如下:高斯混合模型的应用实例clusterdata=sc.parallelize(nodes,2)model=GaussianMixture.train(clusterdata,3,convergenceTol=0.0001,maxIterations=50,seed=10)第3步:根据上面创建的数据集训练GMM聚类模型。训练GMM模型需要用到GaussianMixture.train()方法。下面的例子中initialModel参数没有指定,使用默认值。高斯混合模型的应用实例fornodeinnodes:print(str(node)+"belongstocluster:"+str(model.predict(node)))第4步:根据GaussianMixtureModel自带的属性及方法获得样本点对应的聚类值。[-0.1-0.05]belongstocluster:2[-0.01-0.1]belongstocluster:2[0.90.8]belongstocluster:0[0.750.935]belongstocluster:0[-0.83-0.68]belongstocluster:1[-0.91-0.76]belongstocluster:1结果如下:softPredicted=model.predictSoft([-0.1,-0.05])abs(softPredicted[0]-1.0)<0.03False结果如下:高斯混合模型的spark实现在SparkMLlib中,GMM主要通过GaussianMixture类实现。此类对GMM执行期望最大化。高斯混合模型表示一个独立的高斯分布的综合分布,其相关的“混合”权重指定每个人对综合的贡献。给定一组样本点,这个类将最大化k个高斯混合体的对数可能性,迭代直到对数可能性的变化小于convergenceTol,或者直到达到迭代次数的最大值。虽然这个过程一般保证收敛,但不能保证找到全局最优解。deftrain(cls,rdd,k,convergenceTol=1e-3,maxIterations=100,seed=None,initialModel=None):initialModelWeights=None#权重winitialModelMu=None#均值向量μ
initialModelSigma=None#协方差矩阵Σ
ifinitialModelisnotNone:ifinitialModel.k!=k:raiseValueError("Mismatchedclustercount,initialModel.k=%s,howeverk=%s"%(initialModel.k,k))initialModelWeights=list(initialModel.weights)initialModelMu=[initialModel.gaussians[i].muforiinrange(initialModel.k)]initialModelSigma=[initialModel.gaussians[i].sigmaforiinrange(initialModel.k)]java_model=callMLlibFunc("trainGaussianMixtureModel",rdd.map(_convert_to_vector),k,convergenceTol,maxIterations,seed,initialModelWeights,initialModelMu,initialModelSigma)#调用APIreturnGaussianMixtureModel(java_model)高斯混合模型的spark实现#trainGaussianMixtureModelAPI,返回一个包含每个多元混合的权重、均值和#协方差的列表(list)deftrainGaussianMixtureModel(data:JavaRDD[Vector],k:Int,convergenceTol:Double,maxIterations:Int,seed:java.lang.Long,initialModelWeights:java.util.ArrayList[Double],initialModelMu:java.util.ArrayList[Vector],initialModelSigma:java.util.ArrayList[Matrix]):GaussianMixtureModelWrapper={#创建一个GaussianMixture类
valgmmAlg=newGaussianMixture().setK(k).setConvergenceTol(convergenceTol).setMaxIterations(maxIterations)高斯混合模型的spark实现if(initialModelWeights!=null&&initialModelMu!=null&&initialModelSigma!=null){valgaussians=initialModelMu.asScala.toSeq.zip(initialModelSigma.asScala.toSeq).map{case(x,y)=>newMultivariateGaussian(x,y)}valinitialModel=newGaussianMixtureModel(initialModelWeights.asScala.toArray,gaussians.toArray)gmmAlg.setInitialModel(initialModel)}if(seed!=null)gmmAlg.setSeed(seed)
#调用GaussianMixture类中的run()方法和GaussianMixtureModelWrapper#APInewGaussianMixtureModelWrapper(gmmAlg.run(data.rdd))}高斯混合模型的spark实现MLlib中实现GMM也是通过GaussianMixture类中的run()方法实现的。defrun(data:RDD[Vector]):GaussianMixtureModel={valsc=data.sparkContext#使用spark的线性代数库BreezevalbreezeData=data.map(_.asBreeze).cache()#获得输入向量的长度
vald=breezeData.first().lengthrequire(d<GaussianMixture.MAX_NUM_FEATURES,s"GaussianMixturecannothandlemore"+s"than${GaussianMixture.MAX_NUM_FEATURES}featuresbecausethesizeofthecovariance"+s"matrixisquadraticinthenumberoffeatures.")valshouldDistributeGaussians=GaussianMixture.shouldDistributeGaussians(k,d)高斯混合模型的spark实现
#确定初始权重和相应的高斯函数
#如果用户提供了初始GMM,则使用这些值
#否则,我们从统一的权重开始、来自数据的随机均值和对角协方差矩阵#使用来自样本的分量方差
val(weights,gaussians)=initialModelmatch{caseSome(gmm)=>(gmm.weights,gmm.gaussians)caseNone=>valsamples=breezeData.takeSample(withReplacement=true,k*nSamples,seed)(Array.fill(k)(1.0/k),Array.tabulate(k){i=>valslice=samples.view.slice(i*nSamples,(i+1)*nSamples)newMultivariateGaussian(vectorMean(slice.toSeq),initCovariance(slice.toSeq))})}varllh=Double.MinValue#currentlog-likelihoodvarllhp=0.0#previouslog-likelihood高斯混合模型的spark实现variter=0while(iter<maxIterations&&math.abs(llh-llhp)>convergenceTol){#创建并广播柯里化集群贡献函数
valcompute=sc.broadcast(ExpectationSum.add(weights,gaussians)_)#将所有样本点的集群贡献聚合起来
valsums=breezeData.treeAggregate(ExpectationSum.zero(k,d))(compute.value,_+=_)#根据部分赋值创建新的分布(文献中通常称为“M-步”)
valsumWeights=sums.weights.sum高斯混合模型的spark实现if(shouldDistributeGaussians){valnumPartitions=math.min(k,1024)valtuples=Seq.tabulate(k)(i=>(sums.means(i),sums.sigmas(i),sums.weights(i)))val(ws,gs)=sc.parallelize(tuples,numPartitions).map{case(mean,sigma,weight)=>updateWeightsAndGaussians(mean,sigma,weight,sumWeights)}.collect().unzipArray.copy(ws,0,weights,0,ws.length)Array.copy(gs,0,gaussians,0,gs.length)}else{vari=0while(i<k){val(weight,gaussian)=updateWeightsAndGaussians(sums.means(i),sums.sigmas(i),sums.weights(i),sumWeights)weights(i)=weightgaussians(i)=gaussiani=i+1}}高斯混合模型的spark实现llhp=llh#当前值变成以前的值
llh=sums.logLikelihood#最新算得的对数似然比
iter+=1compute.destroy()}breezeData.unpersist()newGaussianMixtureModel(weights,gaussians)}高斯混合模型的spark实现#更新权重和高斯函数privatedefupdateWeightsAndGaussians(mean:BDV[Double],sigma:BreezeMatrix[Double],weight:Double,sumWeights:Double):(Double,MultivariateGaussian)={valmu=(mean/=weight)BLAS.syr(-weight,Vectors.fromBreeze(mu),Matrices.fromBreeze(sigma).asInstanceOf[DenseMatrix])valnewWeight=weight/sumWeightsvalnewGaussian=newMultivariateGaussian(mu,sigma/weight)(newWeight,newGaussian)}高斯混合模型的spark实现GaussianMixtureModelWrapperAPI是为GaussianMixtureModel提供辅助方法的包装器。private[python]classGaussianMixtureModelWrapper(model:GaussianMixtureModel){valweights:Vector=Vectors.dense(model.weights)valk:Int=weights.size#以向量列表和对应于每个多元高斯分布的矩阵的形式返回高斯分布
valgaussians:Array[Byte]={valmodelGaussians=model.gaussians.map{gaussian=>Array[Any](gaussian.mu,gaussian.sigma)}SerDe.dumps(JavaConverters.seqAsJavaListConverter(modelGaussians).asJava)}defpredictSoft(point:Vector):Vector={Vectors.dense(model.predictSoft(point))}defsave(sc:SparkContext,path:String):Unit=model.save(sc,path)}幂迭代聚类
幂迭代聚类的基本步骤开始求得一个按行归一化的关联矩阵W,设定期望聚类数k随机初始化一个向量v(0)≠0
结束否是
幂迭代聚类的应用实例importmathfrompysparkimportSparkContextfrompyspark.mllib.clusteringimportPowerIterationClustering,PowerIterationClusteringModelsc=SparkContext(appName="PIC_pyspark",master='local')第1步:引入必要的类。使用pyspark时需要引入SparkContext类,SparkContext是spark功能的入口点,在引入类后需要定义sc=SparkContext(appName=“KMeans_pyspark”,master=‘local’),否则运行程序时可能会报错。PIC算法需要从pyspark.mllib.clustering中引入PowerIterationClustering类和PowerIterationClusteringModel类。幂迭代聚类的应用实例#产生一个分布在圆形上的数据模型,参数分别为圆的半径和的个数defgenCircle(r,n):points=[]foriinrange(0,n):theta=2.0*math.pi*i/npoints.append((r*math.cos(theta),r*math.sin(theta)))returnpoints#求高斯相似度defsim(x,y):dist2=(x[0]-y[0])*(x[0]-y[0])+(x[1]-y[1])*(x[1]-y[1])returnmath.exp(-dist2/2.0)第2步:创建数据集。幂迭代聚类的应用实例#产生同心圆样的RDD数据模型:内圈的圆半径为1.0,有10个点;外圈的圆半径为4.0,有40个点r1=1.0n1=10r2=4.0n2=40n=n1+n2points=genCircle(r1,n1)+genCircle(r2,n2)similarities=[(i,j,sim(points[i],points[j]))foriinrange(1,n)forjinrange(0,i)]rdd=sc.parallelize(similarities,2)第2步:创建数据集。幂迭代聚类的应用实例model=PowerIterationClustering.train(rdd,2,40)第3步:根据上面创建的数据集训练PIC聚类模型。训练PIC模型需要用到PowerIterationClustering.train()方法。下面的示例中initModel未指定,使用默认值。幂迭代聚类的应用实例result=sorted(model.assignments().collect(),key=lambdax:x.id)ifresult[0].cluster==result[3].cluster:print("True")print("共有"+str(model.k)+"个聚类")第4步:根据PowerIterationClusteringModel自带的属性及方法获得样本点对应的聚类分配。True共有2个聚类结果如下:幂迭代聚类的spark实现在MLlib中,PIC也是通过PowerIterationClustering类中的train()方法实现的。deftrain(cls,rdd,k,maxIterations=100,initMode="random"):model=callMLlibFunc("trainPowerIterationClusteringModel",rdd.map(_convert_to_vector),int(k),int(maxIterations),initMode)returnPowerIterationClusteringModel(model)幂迭代聚类的spark实现train()方法调用了trainPowerIterationClusteringModelAPI,返回PowerIterationClusteringModel。deftrainPowerIterationClusteringModel(data:JavaRDD[Vector],k:Int,maxIterations:Int,initMode:String):PowerIterationClusteringModel={valpic=newPowerIterationClustering().setK(k).setMaxIterations(maxIterations).setInitializationMode(initMode)valmodel=pic.run(data.rdd.map(v=>(v(0).toLong,v(1).toLong,v(2))))newPowerIterationClusteringModelWrapper(model)}幂迭代聚类的spark实现trainPowerIterationClusteringAPI调用scala的PowerIterationClustering类中的run()方法。defrun(similarities:RDD[(Long,Long,Double)]):PowerIterationClusteringModel={valw=normalize(similarities)#规范化相似度矩阵
valw0=initModematch{case"random"=>randomInit(w)#生成随机顶点属性
case"degree"=>initDegreeVector(w)#生成度向量
}
w.unpersist()#图w0已经在randomInit/initDegreeVector中物化,所以我们可以不用释放wpic(w0)}幂迭代聚类的spark实现#通过行的和对亲和矩阵(A)进行规范化,并返回规范化亲和矩阵(W)private[clustering]defnormalize(similarities:RDD[(Long,Long,Double)]):Graph[Double,Double]={valedges=similarities.flatMap{case(i,j,s)=>if(s<0.0){thrownewSparkException(s"Similaritymustbenonnegativebutfounds($i,$j)=$s.")}if(i!=j){Seq(Edge(i,j,s),Edge(j,i,s))}else{None}}幂迭代聚类的spark实现valgA=Graph.fromEdges(edges,0.0)valvD=gA.aggregateMessages[Double](sendMsg=ctx=>{ctx.sendToSrc(ctx.attr)},mergeMsg=_+_,TripletFields.EdgeOnly)valgraph=Graph(vD,gA.edges).mapTriplets(e=>e.attr/math.max(e.srcAttr,MLUtils.EPSILON),newTripletFields(/*useSrc*/true,/*useDst*/false,/*useEdge*/true))materialize(graph)gA.unpersist()graph}幂迭代聚类的spark实现#运行PIC算法,其中参数w是归一化亲和矩阵,即PIC文中的矩阵w,以#w,,ij,,=a,,ij,,/d,,ii,,作为它的边性质和幂迭代的初始向量作为它的顶点性质。privatedefpic(w:Graph[Double,Double]):PowerIterationClusteringModel={valv=powerIter(w,maxIterations)valassignments=kMeans(v,k).map{case(id,cluster)=>Assignment(id,cluster)}newPowerIterationClusteringModel(k,assignments)}}幂迭代聚类的spark实现#生成随机顶点属性(v0)以开始幂迭代。private[clustering]defrandomInit(g:Graph[Double,Double]):Graph
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2025年云南经贸外事职业学院高职单招职业技能测试近5年常考版参考题库含答案解析
- 2025年义乌工商职业技术学院高职单招数学历年(2016-2024)频考点试题含答案解析
- 2025年上海出版印刷高等专科学校高职单招高职单招英语2016-2024历年频考点试题含答案解析
- 维护服务碳排放分析-洞察分析
- 二零二五年度环保厨房设施采购及安装合同4篇
- 四年级数学(四则混合运算)计算题专项练习与答案汇编
- 2025年中国VE(维生素E)行业竞争格局分析及投资规划研究报告
- 五年级数学(小数四则混合运算)计算题专项练习及答案
- 四年级数学(小数加减运算)计算题专项练习与答案汇编
- 2025年马拉硫磷原药项目可行性研究报告
- 课题申报书:GenAI赋能新质人才培养的生成式学习设计研究
- 骆驼祥子-(一)-剧本
- 全国医院数量统计
- 2024年医美行业社媒平台人群趋势洞察报告-医美行业观察星秀传媒
- 电工(中级工)理论知识练习题(附参考答案)
- 工业设计概论试题
- 2024-2030年中国商务服务行业市场现状调查及投资前景研判报告
- 高一英语必修一试卷(含答案)(适合测试)
- 中国的世界遗产智慧树知到期末考试答案2024年
- 中国绿色食品市场调查与分析报告
- 手卫生依从性调查表
评论
0/150
提交评论