基本统计课件_第1页
基本统计课件_第2页
基本统计课件_第3页
基本统计课件_第4页
基本统计课件_第5页
已阅读5页,还剩89页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第5章基本统计第5章基本统计15.1相关性计算两个数据系列之间的相关性是统计学中的常见操作。用spark.ml可灵活地计算多个系列两两之间的相关性。目前Spark支持的相关方法是Pearson方法和Spearman方法。Correlation使用指定的方法计算输入数据集的相关矩阵。输出将是一个DataFrame,它包含向量列的相关矩阵。【例5-1】Correlation的PythonAPI代码。from__future__importprint_function#$exampleon$frompyspark.ml.linalgimportVectorsfrompyspark.ml.statimportCorrelation#$exampleoff$5.1相关性计算两个数据系列之间的相关性是统计学中的常见25.1相关性frompyspark.sqlimportSparkSessionif__name__=="__main__":spark=SparkSession\.builder\.appName("CorrelationExample")\.getOrCreate()#$exampleon$data=[(Vectors.sparse(4,[(0,1.0),(3,-2.0)]),),(Vectors.dense([4.0,5.0,0.0,3.0]),),(Vectors.dense([6.0,7.0,0.0,8.0]),),5.1相关性frompyspark.sqlimpor35.1相关性(Vectors.sparse(4,[(0,9.0),(3,1.0)]),)]df=spark.createDataFrame(data,["features"])r1=Correlation.corr(df,"features").head()print("Pearsoncorrelationmatrix:\n"+str(r1[0]))r2=Correlation.corr(df,"features","spearman").head()print("Spearmancorrelationmatrix:\n"+str(r2[0]))#$exampleoff$spark.stop()5.1相关性(Vectors.sparse(4,[(045.1相关性5.1相关性55.2假设检验假设检验是统计学中一种强有力的工具,用于确定结果是否具有统计显著性,无论该结果是否偶然发生。spark.ml目前支持Pearson的卡方(χ2)独立性测试。ChiSquareTest针对标签的每个特征进行Pearson独立测试。对于每个特征,(特征,标签)对被转换为列联矩阵,对其计算卡方统计量。所有标签和特征值必须是可分类的。【例5-2】Pearson卡方独立性测试的PythonAPI代码。5.2假设检验假设检验是统计学中一种强有力的工具,用于65.2假设检验from__future__importprint_functionfrompyspark.sqlimportSparkSession#$exampleon$frompyspark.ml.linalgimportVectorsfrompyspark.ml.statimportChiSquareTest#$exampleoff$if__name__=="__main__":spark=SparkSession\5.2假设检验from__future__impo75.2假设检验builder\.appName("ChiSquareTestExample")\.getOrCreate()#$exampleon$data=[(0.0,Vectors.dense(0.5,10.0)),(0.0,Vectors.dense(1.5,20.0)),(1.0,Vectors.dense(1.5,30.0)),(0.0,Vectors.dense(3.5,30.0)),(0.0,Vectors.dense(3.5,40.0)),(1.0,Vectors.dense(3.5,40.0))]5.2假设检验builder\85.2假设检验df=spark.createDataFrame(data,["label","features"])r=ChiSquareTest.test(df,"features","label").head()print("pValues:"+str(r.pValues))print("degreesOfFreedom:"+str(r.degreesOfFreedom))print("statistics:"+str(r.statistics))#$exampleoff$spark.stop()5.2假设检验df=spark.createDa95.2假设检验将上述代码保存成Pearsonx.py,然后用命令spark-submitPearsonx.py运行,结果如图5-2所示。

5.2假设检验将上述代码保存成Pearsonx.py105.3累积器通过Summarizer为Dataframe提供向量列摘要统计。可用指标是列的 大值、 小值、平均值、方差、非零数及总计数。【例5-3】Summarizer的PythonAPI代码。from__future__importprint_functionfrompyspark.sqlimportSparkSession#$exampleon$frompyspark.ml.statimportSummarizerfrompyspark.sqlimportRowfrompyspark.ml.linalgimportVectors5.3累积器通过Summarizer为Datafr115.3累积器#$exampleoff$if__name__=="__main__":spark=SparkSession\.builder\.appName("SummarizerExample")\.getOrCreate()sc=spark.sparkContext#$exampleon$5.3累积器#$exampleoff$125.3累积器df=sc.parallelize([Row(weight=1.0,features=Vectors.dense(1.0,1.0,1.0)),Row(weight=0.0,features=Vectors.dense(1.0,2.0,3.0))]).toDF()#createsummarizerformultiplemetrics"mean"and"count"summarizer=Summarizer.metrics("mean","count")#computestatisticsformultiplemetricswithweightdf.select(summarizer.summary(df.features,df.weight)).show(truncate=False)5.3累积器df=sc.parallelize([R135.3累积器#computestatisticsformultiplemetricswithoutweightdf.select(summarizer.summary(df.features)).show(truncate=False)#computestatisticsforsinglemetric"mean"withweightdf.select(Summarizer.mean(df.features,df.weight)).show(truncate=False)#computestatisticsforsinglemetric"mean"withoutweightdf.select(Summarizer.mean(df.features)).show(truncate=False)#$exampleoff$spark.stop()5.3累积器#computestatisticsf145.3累积器将上述代码保存成Summarizerx.py,然后用命令spark-submitSummarizerx.py运行,结果如图5-3所示。5.3累积器将上述代码保存成Summarizerx.p155.4摘要统计【例5-4】通过函数colStats为RDD[Vector]提供列摘要统计信息,colStats()返回一个MultivariateStatisticalSummary实例。importnumpyasnpfrompyspark.mllib.statimportStatisticsmat=sc.parallelize([np.array([1.0,10.0,100.0]),np.array([2.0,20.0,200.0]),np.array([3.0,30.0,300.0])])#anRDDofVectors#Computecolumnsummarystatistics.summary=Statistics.colStats(mat)5.4摘要统计【例5-4】通过函数colStats165.4摘要统计print(summary.mean())#adensevectorcontainingthemeanvalueforeachcolumnprint(summary.variance())#column-wisevarianceprint(summary.numNonzeros())#numberofnonzerosineachcolumn将上述代码保存成RDDVectorx.py,然后用命令spark-submitRDDVectorx.py运行,结果如图5-4所示。5.4摘要统计print(summary.mean()175.4摘要统计5.4摘要统计185.4摘要统计与驻留在spark.mllib中的其他统计函数不同,可以对RDD的键值对执行分层抽样方法sampleByKey和sampleByKeyExact。对于分层抽样,可以将键视为标签,将值视为特定属性。例如,关键字可以是男人、女人或文档ID,并且相应的值可以是人的年龄列表或文档中的单词列表。sampleByKey方法将翻转硬币以决定是否对样本进行采样,因此需要对数据进行一次传递,并提供预期的样本大小。sampleByKeyExact比sampleByKey中使用的每层简单随机抽样需要更多的资源,但是会提供99.99%置信度的精确抽样大小。5.4摘要统计与驻留在spark.mllib中的其195.4摘要统计【例5-5】分层抽样。sampleByKey()允许用户近似采样。frompysparkimportSparkContextif__name__=="__main__":sc=SparkContext(appName="StratifiedSamplingExample")#SparkContext#$exampleon$#anRDDofanykeyvaluepairsdata=sc.parallelize([(1,'a'),(1,'b'),(2,'c'),(2,'d'),(2,'e'),(3,'f')])

5.4摘要统计【例5-5】分层抽样。sampleBy205.4摘要统计#specifytheexactfractiondesiredfromeachkeyasadictionaryfractions={1:0.1,2:0.6,3:0.3}approxSample=data.sampleByKey(False,fractions)#$exampleoff$foreachinapproxSample.collect():print(each)sc.stop()5.4摘要统计#specifytheexact215.4摘要统计将上述代码保存成SparkContext.py,然后用命令spark-submitSparkContext.py运行,结果如图5-5所示。5.4摘要统计将上述代码保存成SparkContex225.5分层抽样与驻留在spark.mllib中的其他统计函数不同,可以对RDD的键值对执行分层抽样方法sampleByKey和sampleByKeyExact。对于分层抽样,可以将键视为标签,将值视为特定属性。例如,关键字可以是男人、女人或文档ID,并且相应的值可以是人的年龄列表或文档中的单词列表。sampleByKey方法将翻转硬币以决定是否对样本进行采样,因此需要对数据进行一次传递,并提供预期的样本大小。sampleByKeyExact比sampleByKey中使用的每层简单随机抽样需要更多的资源,但是会提供99.99%置信度的精确抽样大小。5.5分层抽样与驻留在spark.mllib中的其235.5分层抽样【例5-5】分层抽样。sampleByKey()允许用户近似采样。frompysparkimportSparkContextif__name__=="__main__":sc=SparkContext(appName="StratifiedSamplingExample")#SparkContext#$exampleon$#anRDDofanykeyvaluepairsdata=sc.parallelize([(1,'a'),(1,'b'),(2,'c'),(2,'d'),(2,'e'),(3,'f')])

5.5分层抽样【例5-5】分层抽样。sampleBy245.5分层抽样#specifytheexactfractiondesiredfromeachkeyasadictionaryfractions={1:0.1,2:0.6,3:0.3}approxSample=data.sampleByKey(False,fractions)#$exampleoff$foreachinapproxSample.collect():print(each)sc.stop()5.5分层抽样#specifytheexact255.5分层抽样将上述代码保存成SparkContext.py,然后用命令spark-submitSparkContext.py运行,结果如图5-5所示。5.5分层抽样将上述代码保存成SparkContex265.6流数据显著性检验Spark提供一些检验的在线实现,以支持A/B检验等用例。这些检验可以在SparkStreamingDStream[(Boolean,Double)]上执行,其中每个元组的第一个元素表示对照组(false)或实验组(true),第二个元素是观测值。流显著性检验支持以下参数:peacePeriod:要忽略的流数据中,初始数据点的数量,用于减轻新异效应。windowSize:执行假设检验的先前批次数。设置为0将使用所有先前批次执行累积处理。5.6流数据显著性检验Spark提供一些检验的在线实275.6流数据显著性检验【例5-6】StreamingTest提供的流数据假设检验。importorg.apache.spark.SparkConfimportorg.apache.spark.mllib.stat.test.{BinarySample,StreamingTest}importorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.util.Utils5.6流数据显著性检验【例5-6】Streaming285.6流数据显著性检验objectStreamingTestExample{defmain(args:Array[String]){if(args.length!=3){//scalastyle:offprintlnSystem.err.println("Usage:StreamingTestExample"+"<dataDir><batchDuration><numBatchesTimeout>")5.6流数据显著性检验objectStreaming295.6流数据显著性检验//scalastyle:onprintlnSystem.exit(1)}valdataDir=args(0)valbatchDuration=Seconds(args(1).toLong)valnumBatchesTimeout=args(2).toInt5.6流数据显著性检验//scalastyle:305.6流数据显著性检验valconf=newSparkConf().setMaster("local").setAppName("StreamingTestExample")valssc=newStreamingContext(conf,batchDuration)ssc.checkpoint{valdir=Utils.createTempDir()dir.toString}5.6流数据显著性检验valconf=new315.6流数据显著性检验//$exampleon$valdata=ssc.textFileStream(dataDir).map(line=>line.split(",")match{caseArray(label,value)=>BinarySample(label.toBoolean,value.toDouble)})valstreamingTest=newStreamingTest().setPeacePeriod(0).setWindowSize(0).setTestMethod("welch")5.6流数据显著性检验//$exampleon$325.6流数据显著性检验valout=streamingTest.registerStream(data)out.print()//$exampleoff$//StopprocessingiftestbecomessignificantorwetimeoutvartimeoutCounter=numBatchesTimeoutout.foreachRDD{rdd=>timeoutCounter-=1valanySignificant=rdd.map(_.pValue<0.05).fold(false)(_||_)if(timeoutCounter==0||anySignificant)rdd.context.stop()}ssc.start()ssc.awaitTermination()}}5.6流数据显著性检验valout=stream335.6流数据显著性检验将上述代码保存成StreamingTestExample.scala,然后使用命令spark-submit--classorg.apache.spark.examples.mllib.StreamingTestExamplespark-examples_2.11-2.4.0.jar运行,结果如图5-6所示。

5.6流数据显著性检验将上述代码保存成Streami345.6流数据显著性检验5.6流数据显著性检验355.7随机数据生成随机数据生成对于随机化算法、原型设计和性能测试都非常有用。Spark支持从给定分布(均匀分布、标准正态分布或泊松分布)抽取id值,并生成对应的随机RDD。【例5-7】生成随机双RDD,其值遵循标准正态分布N(0,1),然后将其映射到N(1,4)。RandomRDDs提供相应方法来生成随机双RDD或向量RDD。5.7随机数据生成随机数据生成对于随机化算法、原型设计365.7随机数据生成frompyspark.mllib.randomimportRandomRDDsfrompysparkimportSparkContext,SparkConfsc=SparkContext(appName="PythonRandomnumberGeneration")#sc=...#SparkContext#GeneratearandomdoubleRDDthatcontains1millioni.i.d.valuesdrawnfromthe#standardnormaldistribution`N(0,1)`,evenlydistributedin10partitions.5.7随机数据生成frompyspark.mllib375.7随机数据生成u=RandomRDDs.normalRDD(sc,1000000,10)#ApplyatransformtogetarandomdoubleRDDfollowing`N(1,4)`.v=u.map(lambdax:1.0+2.0*x)print(v)将上述代码保存成RandomRDDs.py,然后用命令spark-submitRandomRDDs.py运行,结果如图5-7所示。5.7随机数据生成u=RandomRDDs.nor385.7随机数据生成5.7随机数据生成395.8核密度估计核密度估计是一种可用于可视化经验概率分布的技术,而无须对观察到样本的特定分布进行假设。它可以用来计算随机变量概率密度函数的估计值,在给定的一组点处进行评估。通过将特定点的经验分布PDF,表示为以每个样本为中心的正态分布PDF的均值来实现该估计。【例5-8】用KernelDensity从样本的RDD计算核密度估计。frompysparkimportSparkContext#$exampleon$frompyspark.mllib.statimportKernelDensity5.8核密度估计核密度估计是一种可用于可视化经验概率分405.8核密度估计#$exampleoff$if__name__=="__main__":sc=SparkContext(appName="KernelDensityEstimationExample")#SparkContext#$exampleon$#anRDDofsampledatadata=sc.parallelize([1.0,1.0,1.0,2.0,3.0,4.0,5.0,5.0,6.0,7.0,8.0,9.0,9.0])#ConstructthedensityestimatorwiththesampledataandastandarddeviationfortheGaussian5.8核密度估计#$exampleoff$if415.8核密度估计#kernelskd=KernelDensity()kd.setSample(data)kd.setBandwidth(3.0)#Finddensityestimatesforthegivenvaluesdensities=kd.estimate([-1.0,2.0,5.0])#$exampleoff$print(densities)sc.stop()将上述代码保存成KernelDensity.py,然后用命令spark-submitKernelDensity.py运行,结果如图5-8所示。5.8核密度估计#kernels425.8核密度估计5.8核密度估计43习题1. 用PySpark编程生成随机双RDD,其值遵循标准正态分布N(0,1),并将其映射到N(1,5)。2. 通过sampleByKey()进行近似分层采样编程。3. 用PySpark编程实现Pearson的卡方(χ2)统计。习题1. 用PySpark编程生成随机双RDD,44第5章基本统计课件45第5章基本统计课件46第5章基本统计课件47第5章基本统计第5章基本统计485.1相关性计算两个数据系列之间的相关性是统计学中的常见操作。用spark.ml可灵活地计算多个系列两两之间的相关性。目前Spark支持的相关方法是Pearson方法和Spearman方法。Correlation使用指定的方法计算输入数据集的相关矩阵。输出将是一个DataFrame,它包含向量列的相关矩阵。【例5-1】Correlation的PythonAPI代码。from__future__importprint_function#$exampleon$frompyspark.ml.linalgimportVectorsfrompyspark.ml.statimportCorrelation#$exampleoff$5.1相关性计算两个数据系列之间的相关性是统计学中的常见495.1相关性frompyspark.sqlimportSparkSessionif__name__=="__main__":spark=SparkSession\.builder\.appName("CorrelationExample")\.getOrCreate()#$exampleon$data=[(Vectors.sparse(4,[(0,1.0),(3,-2.0)]),),(Vectors.dense([4.0,5.0,0.0,3.0]),),(Vectors.dense([6.0,7.0,0.0,8.0]),),5.1相关性frompyspark.sqlimpor505.1相关性(Vectors.sparse(4,[(0,9.0),(3,1.0)]),)]df=spark.createDataFrame(data,["features"])r1=Correlation.corr(df,"features").head()print("Pearsoncorrelationmatrix:\n"+str(r1[0]))r2=Correlation.corr(df,"features","spearman").head()print("Spearmancorrelationmatrix:\n"+str(r2[0]))#$exampleoff$spark.stop()5.1相关性(Vectors.sparse(4,[(0515.1相关性5.1相关性525.2假设检验假设检验是统计学中一种强有力的工具,用于确定结果是否具有统计显著性,无论该结果是否偶然发生。spark.ml目前支持Pearson的卡方(χ2)独立性测试。ChiSquareTest针对标签的每个特征进行Pearson独立测试。对于每个特征,(特征,标签)对被转换为列联矩阵,对其计算卡方统计量。所有标签和特征值必须是可分类的。【例5-2】Pearson卡方独立性测试的PythonAPI代码。5.2假设检验假设检验是统计学中一种强有力的工具,用于535.2假设检验from__future__importprint_functionfrompyspark.sqlimportSparkSession#$exampleon$frompyspark.ml.linalgimportVectorsfrompyspark.ml.statimportChiSquareTest#$exampleoff$if__name__=="__main__":spark=SparkSession\5.2假设检验from__future__impo545.2假设检验builder\.appName("ChiSquareTestExample")\.getOrCreate()#$exampleon$data=[(0.0,Vectors.dense(0.5,10.0)),(0.0,Vectors.dense(1.5,20.0)),(1.0,Vectors.dense(1.5,30.0)),(0.0,Vectors.dense(3.5,30.0)),(0.0,Vectors.dense(3.5,40.0)),(1.0,Vectors.dense(3.5,40.0))]5.2假设检验builder\555.2假设检验df=spark.createDataFrame(data,["label","features"])r=ChiSquareTest.test(df,"features","label").head()print("pValues:"+str(r.pValues))print("degreesOfFreedom:"+str(r.degreesOfFreedom))print("statistics:"+str(r.statistics))#$exampleoff$spark.stop()5.2假设检验df=spark.createDa565.2假设检验将上述代码保存成Pearsonx.py,然后用命令spark-submitPearsonx.py运行,结果如图5-2所示。

5.2假设检验将上述代码保存成Pearsonx.py575.3累积器通过Summarizer为Dataframe提供向量列摘要统计。可用指标是列的 大值、 小值、平均值、方差、非零数及总计数。【例5-3】Summarizer的PythonAPI代码。from__future__importprint_functionfrompyspark.sqlimportSparkSession#$exampleon$frompyspark.ml.statimportSummarizerfrompyspark.sqlimportRowfrompyspark.ml.linalgimportVectors5.3累积器通过Summarizer为Datafr585.3累积器#$exampleoff$if__name__=="__main__":spark=SparkSession\.builder\.appName("SummarizerExample")\.getOrCreate()sc=spark.sparkContext#$exampleon$5.3累积器#$exampleoff$595.3累积器df=sc.parallelize([Row(weight=1.0,features=Vectors.dense(1.0,1.0,1.0)),Row(weight=0.0,features=Vectors.dense(1.0,2.0,3.0))]).toDF()#createsummarizerformultiplemetrics"mean"and"count"summarizer=Summarizer.metrics("mean","count")#computestatisticsformultiplemetricswithweightdf.select(summarizer.summary(df.features,df.weight)).show(truncate=False)5.3累积器df=sc.parallelize([R605.3累积器#computestatisticsformultiplemetricswithoutweightdf.select(summarizer.summary(df.features)).show(truncate=False)#computestatisticsforsinglemetric"mean"withweightdf.select(Summarizer.mean(df.features,df.weight)).show(truncate=False)#computestatisticsforsinglemetric"mean"withoutweightdf.select(Summarizer.mean(df.features)).show(truncate=False)#$exampleoff$spark.stop()5.3累积器#computestatisticsf615.3累积器将上述代码保存成Summarizerx.py,然后用命令spark-submitSummarizerx.py运行,结果如图5-3所示。5.3累积器将上述代码保存成Summarizerx.p625.4摘要统计【例5-4】通过函数colStats为RDD[Vector]提供列摘要统计信息,colStats()返回一个MultivariateStatisticalSummary实例。importnumpyasnpfrompyspark.mllib.statimportStatisticsmat=sc.parallelize([np.array([1.0,10.0,100.0]),np.array([2.0,20.0,200.0]),np.array([3.0,30.0,300.0])])#anRDDofVectors#Computecolumnsummarystatistics.summary=Statistics.colStats(mat)5.4摘要统计【例5-4】通过函数colStats635.4摘要统计print(summary.mean())#adensevectorcontainingthemeanvalueforeachcolumnprint(summary.variance())#column-wisevarianceprint(summary.numNonzeros())#numberofnonzerosineachcolumn将上述代码保存成RDDVectorx.py,然后用命令spark-submitRDDVectorx.py运行,结果如图5-4所示。5.4摘要统计print(summary.mean()645.4摘要统计5.4摘要统计655.4摘要统计与驻留在spark.mllib中的其他统计函数不同,可以对RDD的键值对执行分层抽样方法sampleByKey和sampleByKeyExact。对于分层抽样,可以将键视为标签,将值视为特定属性。例如,关键字可以是男人、女人或文档ID,并且相应的值可以是人的年龄列表或文档中的单词列表。sampleByKey方法将翻转硬币以决定是否对样本进行采样,因此需要对数据进行一次传递,并提供预期的样本大小。sampleByKeyExact比sampleByKey中使用的每层简单随机抽样需要更多的资源,但是会提供99.99%置信度的精确抽样大小。5.4摘要统计与驻留在spark.mllib中的其665.4摘要统计【例5-5】分层抽样。sampleByKey()允许用户近似采样。frompysparkimportSparkContextif__name__=="__main__":sc=SparkContext(appName="StratifiedSamplingExample")#SparkContext#$exampleon$#anRDDofanykeyvaluepairsdata=sc.parallelize([(1,'a'),(1,'b'),(2,'c'),(2,'d'),(2,'e'),(3,'f')])

5.4摘要统计【例5-5】分层抽样。sampleBy675.4摘要统计#specifytheexactfractiondesiredfromeachkeyasadictionaryfractions={1:0.1,2:0.6,3:0.3}approxSample=data.sampleByKey(False,fractions)#$exampleoff$foreachinapproxSample.collect():print(each)sc.stop()5.4摘要统计#specifytheexact685.4摘要统计将上述代码保存成SparkContext.py,然后用命令spark-submitSparkContext.py运行,结果如图5-5所示。5.4摘要统计将上述代码保存成SparkContex695.5分层抽样与驻留在spark.mllib中的其他统计函数不同,可以对RDD的键值对执行分层抽样方法sampleByKey和sampleByKeyExact。对于分层抽样,可以将键视为标签,将值视为特定属性。例如,关键字可以是男人、女人或文档ID,并且相应的值可以是人的年龄列表或文档中的单词列表。sampleByKey方法将翻转硬币以决定是否对样本进行采样,因此需要对数据进行一次传递,并提供预期的样本大小。sampleByKeyExact比sampleByKey中使用的每层简单随机抽样需要更多的资源,但是会提供99.99%置信度的精确抽样大小。5.5分层抽样与驻留在spark.mllib中的其705.5分层抽样【例5-5】分层抽样。sampleByKey()允许用户近似采样。frompysparkimportSparkContextif__name__=="__main__":sc=SparkContext(appName="StratifiedSamplingExample")#SparkContext#$exampleon$#anRDDofanykeyvaluepairsdata=sc.parallelize([(1,'a'),(1,'b'),(2,'c'),(2,'d'),(2,'e'),(3,'f')])

5.5分层抽样【例5-5】分层抽样。sampleBy715.5分层抽样#specifytheexactfractiondesiredfromeachkeyasadictionaryfractions={1:0.1,2:0.6,3:0.3}approxSample=data.sampleByKey(False,fractions)#$exampleoff$foreachinapproxSample.collect():print(each)sc.stop()5.5分层抽样#specifytheexact725.5分层抽样将上述代码保存成SparkContext.py,然后用命令spark-submitSparkContext.py运行,结果如图5-5所示。5.5分层抽样将上述代码保存成SparkContex735.6流数据显著性检验Spark提供一些检验的在线实现,以支持A/B检验等用例。这些检验可以在SparkStreamingDStream[(Boolean,Double)]上执行,其中每个元组的第一个元素表示对照组(false)或实验组(true),第二个元素是观测值。流显著性检验支持以下参数:peacePeriod:要忽略的流数据中,初始数据点的数量,用于减轻新异效应。windowSize:执行假设检验的先前批次数。设置为0将使用所有先前批次执行累积处理。5.6流数据显著性检验Spark提供一些检验的在线实745.6流数据显著性检验【例5-6】StreamingTest提供的流数据假设检验。importorg.apache.spark.SparkConfimportorg.apache.spark.mllib.stat.test.{BinarySample,StreamingTest}importorg.apache.spark.streaming.{Seconds,StreamingContext}importorg.apache.spark.util.Utils5.6流数据显著性检验【例5-6】Streaming755.6流数据显著性检验objectStreamingTestExample{defmain(args:Array[String]){if(args.length!=3){//scalastyle:offprintlnSystem.err.println("Usage:StreamingTestExample"+"<dataDir><batchDuration><numBatchesTimeout>")5.6流数据显著性检验objectStreaming765.6流数据显著性检验//scalastyle:onprintlnSystem.exit(1)}valdataDir=args(0)valbatchDuration=Seconds(args(1).toLong)valnumBatchesTimeout=args(2).toInt5.6流数据显著性检验//scalastyle:775.6流数据显著性检验valconf=newSparkConf().setMaster("local").setAppName("StreamingTestExample")valssc=newStreamingContext(conf,batchDuration)ssc.checkpoint{valdir=Utils.createTempDir()dir.toString}5.6流数据显著性检验valconf=new785.6流数据显著性检验//$exampleon$valdata=ssc.textFileStream(dataDir).map(line=>line.split(",")match{caseArray(label,value)=>BinarySample(label.toBoolean,value.toDouble)})valstreamingTest=newStreamingTest().setPeacePeriod(0).setWindowSize(0).setTestMethod("welch")5.6流数据显著性检验//$exampleon$795.6流数据显著性检验valout=streamingTest.registerStream(data)out.print()//$exampleoff$//StopprocessingiftestbecomessignificantorwetimeoutvartimeoutCounter=numBatchesTimeoutout.foreachRDD{rdd=>timeoutCounter-=1valanySignificant=rdd.map(_.pValue<0.05).fold(false)(_||_)if(timeoutCounter==0||anySignificant)rdd.context.stop()}ssc.start()ssc.awaitTermination()}}5.6流数据显著性检验valout=stream805.6流数据显著性检验将上述代码保存成StreamingTestExample.scala,然后使用命令spark-submit--classorg.apache.spark.examples.mllib.Stre

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论