2023大数据技术之Spark_第1页
2023大数据技术之Spark_第2页
2023大数据技术之Spark_第3页
2023大数据技术之Spark_第4页
2023大数据技术之Spark_第5页
已阅读5页,还剩185页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

PAGEPAGE1大数据技术Spark基础解析第1章Spark概述什么是Spark什么是Spark1、定义Spark是一种基于内存的快速、通用、可扩展的大数据分析引擎。2、历史2009年诞生于加州大学伯克利分校AMPLab,项目采用Scala编写。2010年开源;2013年6月成为Apache孵化项目2014年2月成为Apache顶级项目。Spark内置模块Spark内置模块SparkSQL结构化数据SparkStreaming实时计算SparkMlib机器学习SparkSQL结构化数据SparkStreaming实时计算SparkMlib机器学习SparkGraghX图计算SparkCore独立调度器YARNMesos——————————————————————————————————————————————————————————PAGEPAGE2SparkCore:SparkSparkCore(ResilientDistributedDataSetRDD)的API定义。SparkSQL:是Spark用来操作结构化数据的程序包。通过SparkSQL,我们可以使用SQL或者ApacheHive版本的SQL方言(HQL)来查询数据。SparkSQL支持多种数据源,比如Hive表、Parquet以及JSON等。SparkStreaming:是Spark提供的对实时数据进行流式计算的组件。提供了用来操作数据流的API,并且与SparkCore中的RDDAPI高度对应。SparkMLlib:提供常见的机器学习(ML)功能的程序库。包括分类、回归、聚类、协同过滤等,还提供了模型评估、数据导入等额外的支持功能。集群管理器:Spark设计为可以高效地在一个计算节点到数千个计算节点之间伸缩计算。为了实现这样的要求,同时获得最大灵活性,Spark支持在各种集群管理器(ClusterManager)上运行,包括HadoopYARN、ApacheMesos,以及Spark自带的一个简易调度器,叫作独立调度器。SparkHortonworks、IBM、Intel、Cloudera、MapRPivotalGraphXSpark8000Spark——————————————————————————————————————————————————————————PAGEPAGE3Spark特点SparkSpark特点快:与Hadoop的MapReduce相比,Spark基于内存的运算要快100倍以上,基于硬盘的运算也要快10倍以上。Spark实现了高效的DAG执行引擎,可以通过基于内存来高效处理数据流。计算的中间结果是存在于内存中的。易用:Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的Shell,可以非常方便地在这些Shell中使用Spark集群来验证解决问题的方法。通用:Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(SparkSQL)、实时流处理(SparkStreaming)、机器学习(SparkMLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。减少了开发和维护的人力成本和部署平台的物力成本。兼容性:Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的和ApacheMesos作为它的资源管理和调度器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。第2章Spark运行模式Spark安装地址\h//docs/2.1.1//downloads.html——————————————————————————————————————————————————————————PAGEPAGE4Local模式概述Local概述Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。它可以通过以下集中方式设置Master。local:所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式;local[K]:指定使用几个线程来运行计算,比如local[4]就是运行4个Worker线程。通常我们的Cpu有几个Core,就指定几个线程,最大化利用Cpu的计算能力;local[*]:这种模式直接帮你按照Cpu最多Cores来设置线程数了。安装使用[atguigu@hadoop102sorfware]$tar-zxvf[atguigu@hadoop102sorfware]$tar-zxvfspark-2.1.1-bin-hadoop2.7.tgz-C/opt/module/[atguigu@hadoop102module]$mvspark-2.1.1-bin-hadoop2.7spark官方求PI[atguigu@hadoop102spark]$bin/spark-submit\[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--executor-memory1G\--total-executor-cores2\./examples/jars/spark-examples_2.11-2.1.1.jar\100bin/spark-submit\--class<main-class>--master<master-url>\--deploy-mode<deploy-mode>\--conf<key>=<value>\...#otheroptions<application-jar>\[application-arguments]--masterMasterLocal--class:你的应用的启动类(如org.apache.spark.examples.SparkPi)--deploy-mode:是否发布你的驱动到worker节点(cluster)或者作为一个本地客户端(client)(default:client)*--conf:任意的Spark配置属性,格式key=value.如果值包含空格,可以加引号“key=value”application-jar:打包好的应用jar,包含依赖.这个URL在集群中全局可见。比如hdfs://共享存储系统,如果是file://path,那么所有的节点的path都包含同样的jarapplication-arguments:传给main()方法的参数--executor-memory1G指定每个executor可用内存为1G--total-executor-cores2executorcup2个该算法是利用蒙特·卡罗算法求PI[atguigu@hadoop102spark]$mkdirinput helloatguiguhellospark在input下创建3个文件1.txt和2.txt,并输入以下内容helloatguiguhellosparkspark-shell[atguigu@hadoop102spark]$bin/spark-shell[atguigu@hadoop102spark]$bin/spark-shellUsingSpark'sdefaultlog4jprofile:org/apache/spark/log4j-pertiesSettingdefaultloglevelto"WARN".Toadjustlogginglevelusesc.setLogLevel(newLevel).ForSparkR,usesetLogLevel(newLevel).18/09/2908:50:52WARNNativeCodeLoader:Unabletoloadnative-hadooplibraryforyourplatform...usingbuiltin-javaclasseswhereapplicable18/09/2908:50:58WARNObjectStore:Failedtogetdatabaseglobal_temp,returningNoSuchObjectExceptionSparkcontextWebUIavailableat02:4040Sparkcontextavailableas'sc'(master=local[*],appid=local-1538182253312).Sparksessionavailableas'spark'.Welcometo// //_\\/_\/_`/_\\/_\/_`//'_///./\_,_/_//_/\_\version2.1.1/_/UsingScalaversion2.11.8(JavaHotSpot(TM)64-BitServerVM,Java1.8.0_144)Typeinexpressionstohavethemevaluated.Type:helpformoreinformation.scala>[atguigu@hadoop102spark]$jps3627SparkSubmit4047Jps开启另一个CRD[atguigu@hadoop102spark]$jps3627SparkSubmit4047Jps可登录hadoop102:4040查看程序运行scala>sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collectres0:Array[(String,scala>sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collectres0:Array[(String,Int)]=Array((hadoop,6),(oozie,3),(spark,3),(hive,3),(atguigu,3),(hbase,6))scala>可登录hadoop102:4040查看程序运行提交流程1)提交任务分析:——————————————————————————————————————————————————————————PAGEPAGE9Spark通用运行简易流程Spark通用运行简易流程Driver运行反向注册任务提交ClientDriver初始化sc执行任务textFile()flatMap()map()…注册应用程序启动Executor资源管理者执行任务textFile()flatMap()map()…Driver(驱动器)SparkmainSparkContext、创建RDD,以及进行RDDsparkshellSparkshellSpark序,就是在SparkshellscSparkContextSpark1)Executor3)4)UI展示应用运行状况Executor(执行器)SparkExecutor是一个工作进程,负责在Spark作业中运行任务,任务间相互独立。Spark应用启动时,Executor节点被同时启动,并且始终伴随着整个Spark应用的生命周期而存在。如果有Executor节点发生了故障或崩溃,Spark应用也可以继续执行,会将出错节点上的任务调度到其他Executor节点上继续运行。主要负责:Spark(BlockManager)RDDRDD是直接缓存在Executor数据流程textFile("input"):读取本地文件input文件夹数据;flatMap(_.split("")):压平操作,按照空格分割符将一行数据映射成一个个单词;map((_,1)):对每一个元素操作,将单词映射为元组;reduceByKey(_+_):按照key将值进行聚合,相加;WordCount案例分析sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collecttextFile()flatMap()WordCount案例分析sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collecttextFile()flatMap()map()reduceByKey()collect()1.txthellohelloatguigu helloatguigu atguiguhellospark hellospark hellospark(hello,1)(hello,1)(spark,1)(hello,4)(atguigu,2)(hello,4)(atguigu,2)(spark,2)2.txthelloatguiguhellosparkhelloatguiguhellosparkhelloatguiguhellospark(hello,1)(hello,1)(spark,1)(spark,2)Standalone模式概述构建一个由Master+Slave构成的Spark集群,Spark运行在集群中。StandaloneStandalone运行模式介绍6.报告Task状态,直至结束WorkerExecutorA5.分配Task4.注册2.申请资源,然后启动ExecutorBackendTaskClient1.注册Task3.报告Executor状态SparkContextMaster7.注销Worker安装使用sparkconf[atguigu@hadoop102module]$cdspark/conf/ [atguigu@hadoop102conf]$mvslaves.templateslaves[atguigu@hadoop102conf]$mvspark-env.sh.templatespark-env.sh[atguigu@hadoop102conf]$mvslaves.templateslaves[atguigu@hadoop102conf]$mvspark-env.sh.templatespark-env.shslave文件,添加work[atguigu@hadoop102conf]$vimslaves[atguigu@hadoop102conf]$vimslaveshadoop102hadoop103hadoop104[atguigu@hadoop102conf]$vimspark-env.shSPARK_MASTER_HOST=hadoop101SPARK_MASTER_PORT=7077[atguigu@hadoop102conf]$vimspark-env.shSPARK_MASTER_HOST=hadoop101SPARK_MASTER_PORT=7077spark包[atguigu@hadoop102module]$xsyncspark/ [atguigu@hadoop102spark]$sbin/start-all.sh[atguigu@hadoop102spark]$util.sh================atguigu@hadoop102================3330Jps[atguigu@hadoop102spark]$sbin/start-all.sh[atguigu@hadoop102spark]$util.sh================atguigu@hadoop102================3330Jps3238Worker3163Master================atguigu@hadoop103================2966Jps2908Worker—————————————————————————————================atguigu@hadoop104================—————————————————————————————================atguigu@hadoop104================PAGEPAGE102978Worker2978Worker3036Jps网页查看:hadoop102:8080exportJAVA_HOME=XXXX注意:如果遇到“JAVA_HOMEnotset”异常,可以在sbin目录下的spark-config.sh文件中加入如下配置:exportJAVA_HOME=XXXX[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masterspark://hadoop102:7077\--executor-memory1G\--total-executor-cores2\./examples/jars/spark-examples_2.11-2.1.1.jar\100[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masterspark://hadoop102:7077\--executor-memory1G\--total-executor-cores2\./examples/jars/spark-examples_2.11-2.1.1.jar\100/opt/module/spark/bin/spark-shell\--masterspark://hadoop101:7077\--executor-memory1g\--total-executor-cores2/opt/module/spark/bin/spark-shell\--masterspark://hadoop101:7077\--executor-memory1g\--total-executor-cores2参数:--masterspark://hadoop102:7077指定要连接的集群的masterscala>sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collectres0:Array[(String,scala>sc.textFile("input").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collectres0:Array[(String,Int)]=Array((hadoop,6),(oozie,3),(spark,3),(hive,3),(atguigu,3),(hbase,6))scala>JobHistoryServer配置[atguigu@hadoop102conf]$mvspark-defaults.conf.templatespark-defaults.confspark-default.conf.template名称[atguigu@hadoop102conf]$mvspark-defaults.conf.templatespark-defaults.confspark-default.confLog:[atguigu@hadoop102conf]$vispark-defaults.confspark.eventLog.enabled[atguigu@hadoop102conf]$vispark-defaults.confspark.eventLog.enabled true——————————————————————————————————————————————————————————PAGEPAGE15spark.eventLog.dirspark.eventLog.dir hdfs://hadoop102:9000/directory注意:HDFS上的目录需要提前存在。[atguigu@hadoop102conf]$vispark-env.shexportSPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080-Dspark.history.retainedApplications=30-Dspark.history.fs.logDirectory=hdfs://hadoop101:9000/directory"[atguigu@hadoop102conf]$vispark-env.shexportSPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080-Dspark.history.retainedApplications=30-Dspark.history.fs.logDirectory=hdfs://hadoop101:9000/directory"参数描述:spark.eventLog.dir:Application在运行过程中所有的信息均记录在该属性指定的路径下spark.history.ui.port=18080WEBUI访问的端口号为18080spark.history.fs.logDirectory=hdfs://hadoop102:9000/directory配置了该属性后,在start-history-server.sh时就无需再显式的指定路径,SparkHistoryServer页面只展示该指定路径下的信息spark.history.retainedApplications=30指定保存Application历史记录的个数,如果超过这个值,旧的应用程序信息将被删除,这个是内存中的应用数,而不是页面上显示的应用数。[atguigu@hadoop102conf]$xsyncspark-defaults.conf[atguigu@hadoop102conf]$xsyncspark-env.sh[atguigu@hadoop102conf]$xsyncspark-defaults.conf[atguigu@hadoop102conf]$xsyncspark-env.sh[atguigu@hadoop102spark]$sbin/start-history-server.sh[atguigu@hadoop102spark]$sbin/start-history-server.sh[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masterspark://hadoop101:7077\--executor-memory1G\--total-executor-cores2\./examples/jars/spark-examples_2.11-2.1.1.jar\100hadoop102:18080HA配置zookeeper

图1HA架构图[atguigu@hadoop102conf]$vispark-env.sh注释掉如下内容:#SPARK_MASTER_HOST=hadoop102#SPARK_MASTER_PORT=7077添加上如下内容:exportSPARK_DAEMON_JAVA_OPTS="[atguigu@hadoop102conf]$vispark-env.sh注释掉如下内容:#SPARK_MASTER_HOST=hadoop102#SPARK_MASTER_PORT=7077添加上如下内容:exportSPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER-Dspark.deploy.zookeeper.url=hadoop101,hadoop102,hadoop103-Dspark.deploy.zookeeper.dir=/spark"[atguigu@hadoop102conf]$xsyncspark-env.sh 在hadoop102[atguigu@hadoop102spark]$sbin/start-all.sh 在hadoop103master节点[atguigu@hadoop103spark]$sbin/start-master.sh /opt/module/spark/bin/spark-shell\--masterspark://hadoop101:7077,hadoop102:7077\--executor-memory2g\--total-executor-cores2spark/opt/module/spark/bin/spark-shell\--masterspark://hadoop101:7077,hadoop102:7077\--executor-memory2g\--total-executor-cores2模式概述Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出Yarn运行模式介绍4.AppStatus直至结束NodeManager2.RM选择一个NM启动AM,AM启动Yarn运行模式介绍4.AppStatus直至结束NodeManager2.RM选择一个NM启动AM,AM启动Driver(即初始化sc)SparkAppMasterSparkContext3.AM启动Executor5.注销 并分配TaskNodeManagerClient1.AppSubmitResourceManagerSpark SparkExecutor NodeManagerSpark安装使用[atguigu@hadoop102hadoop]$viyarn-site.xml<!--true<property>[atguigu@hadoop102hadoop]$viyarn-site.xml<!--true<property><name>yarn.nodemanager.pmem-check-enabled</name><value>false</value></property><!--true<property><name>yarn.nodemanager.vmem-check-enabled</name><value>false</value><value>false</value></property>[atguigu@hadoop102conf]$vispark-env.shYARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop[atguigu@hadoop102conf]$vispark-env.shYARN_CONF_DIR=/opt/module/hadoop-2.7.2/etc/hadoop[atguigu@hadoop102[atguigu@hadoop102conf]$xsync/opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml[atguigu@hadoop102conf]$xsyncspark-env.sh[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\./examples/jars/spark-examples_2.11-2.1.1.jar\100注意:在提交任务之前需启动HDFS以及YARN集群。日志查看spark-defaults.confspark.yarn.historyServer.address=hadoop101:18080spark.history.ui.port=4000添加如下内容:spark.yarn.historyServer.address=hadoop101:18080spark.history.ui.port=4000spark[atguigu@hadoop102spark]$sbin/stop-history-server.shstoppingorg.apache.spark.deploy.history.HistoryServer[atguigu@hadoop102spark]$sbin/start-history-server.sh[atguigu@hadoop102spark]$sbin/stop-history-server.shstoppingorg.apache.spark.deploy.history.HistoryServer[atguigu@hadoop102spark]$sbin/start-history-server.shstartingorg.apache.spark.deploy.history.HistoryServer,loggingto/opt/module/spark/logs/spark-atguigu-org.apache.spark.deploy.history.HistoryServer-1-hadoop102.out[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\./examples/jars/spark-examples_2.11-2.1.1.jar\100[atguigu@hadoop102spark]$bin/spark-submit\--classorg.apache.spark.examples.SparkPi\--masteryarn\--deploy-modeclient\./examples/jars/spark-examples_2.11-2.1.1.jar\100Mesos模式SparkMesosSparkyarn调度。几种模式对比模式Spark安装机器数需启动的进程所属者Local1无SparkStandalone3Master及WorkerSparkYarn1Yarn及HDFSHadoop第3章案例实操SparkShellIDEjarMavenjar编写WordCount程序<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId>Maven<dependencies><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.1.1</version><version>2.1.1</version></dependency></dependencies><build><finalName>WordCount</finalName><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin></plugins></build>packagecom.atguiguimportorg.apache.spark.{SparkConf,SparkContext}objectWordCount{packagecom.atguiguimportorg.apache.spark.{SparkConf,SparkContext}objectWordCount{defmain(args:Array[String]):Unit={//1.SparkConfAppvalconf=newSparkConf().setAppName("WC")//2.SparkContextSparkAppvalsc=newSparkContext(conf)//3.用sc创建RDD执相的transformation和actionsc.textFile(args(0)).flatMap(_.split(" 1)).reduceByKey(_+_,1).sortBy(_._2,false).saveAsTextFile(args(1))//4.关闭连接sc.stop()}}打包插件<plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><archive><manifest><mainClass>WordCount</mainClass></manifest></archive>——————————————————————————————————————————————————————————PAGEPAGE17<descriptorRefs><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin>bin/spark-submit\--classWordCount\bin/spark-submit\--classWordCount\--masterspark://hadoop102:7077\WordCount.jar\/word.txt\/out本地调试Spark程序调试需要使用local提交模式,即将本机当做运行环境,MasterWorker都为本机。运行时直接加断点调试即可。如下:SparkConf的时候设置额外属性,表明本地执行:valconf=newSparkConf().setAppName("WC").setMaster("local[*]")如果本机操作系统是windows,如果在程序中使用了hadoop相关的东西,比如写入文件到HDFS,则会遇到如下异常:出现这个问题的原因,并不是程序的错误,而是用到了hadoop相关的服务,解决办法是将附加里面的hadoop-common-bin-2.7.3-x64.zip解压到任意目录。在IDEA中配置RunConfiguration,添加HADOOP_HOME变量——————————————————————————————————————————————————————————(作者:尚硅谷大数据研发部)版本:V1.2第1章RDD概述什么是RDD(ResilientDistributedDataset)SparkRDD的属性on;;RDD;PartitionerRDD;一个列表,存储存取每个Paon(peeedocaoRDD特点RDD表示只读的分区的数据集,对RDD进行改动,只能通过RDDRDD得到一个新的RDDRDDRDDs之间RDD弹性存储的弹性:内存与磁盘的自动切换;容错的弹性:数据丢失可以自动恢复;计算的弹性:计算出错重试机制;分片的弹性:可根据需要重新分片。分区RDDcomputeRDDcomputeRDDRDDcompute换逻辑将其他RDD只读如下图所示,RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。由一个RDD转换到另一个RDD,可以通过丰富的操作算子实现,不再像MapReduce那样只能写map和reduce了。RDDtransformationsRDDRDDRDDRDD算结果或者将RDDRDD依赖RDDsRDDRDDsRDDsRDDRDD)缓存如果在应用程序中多次使用同一个,可以将该RDDRDDRDDRDD-1RDD-nhdfs,RDD-1RDD-1RDD-m前的RDD-0CheckPointRDDRDDRDDcheckpointcheckpoint后的RDD不需要知道它的父RDDscheckpoint第2章RDD编程编程模型SparkRDDRDDtransformations定义RDDactionsRDDaction可以是向应用程序返回结果(count,collect等),或者是向存储系统保存数据(saveAsTextFile等)。在Spark中,只有遇到action,才会执行RDD的计算(即延迟计算),这样在运行时可以通过管道的方式传输多个转换。SparkDriverDriver中定义了一个或多个RDDRDDRDDRDD的创建在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD;从外部存储创建RDD;从其他RDD创建。从集合中创建scala>valrdd=sc.parallelize(Array(1,2,3,4,5,6,7,8))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]atparallelizeat<console>:24从集合中创建RDD,SparkparallelizemakeRDD1)parallelize()scala>valrdd=sc.parallelize(Array(1,2,3,4,5,6,7,8))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[0]atparallelizeat<console>:24scala>valrdd1=sc.makeRDD(Array(1,2,3,4,5,6,7,8))rdd1:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1]atmakeRDDat<console>:242)makeRDD()scala>valrdd1=sc.makeRDD(Array(1,2,3,4,5,6,7,8))rdd1:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[1]atmakeRDDat<console>:24由外部存储系统的数据集创建包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBasescala>valrdd2=sc.textFile("hdfs://hadoop102:9000/RELEASE")rdd2: org.apache.spark.rdd.RDD[String]scala>valrdd2=sc.textFile("hdfs://hadoop102:9000/RELEASE")rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// MapPartitionsRDD[4]attextFileat<console>:24从其他RDD创建详见2.3节RDD的转换(面试开发重点)RDD整体上分为Value类型和Key-Value类型类型map(func)案例作用:返回一个新的RDDRDDfunc1-10数组的RDD形成新的RDDscala>scala>varsource =sc.parallelize(1to10)source:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[8]atparallelizeat<console>:24打印scala>source.collect()scala>source.collect()res7:Array[Int]=Array(1,2,3,4,5,6,7,8,9,10)scala>valmapadd=source.map(_*2)mapadd:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[9]atmapat<console>:26scala>valmapadd=source.map(_*2)mapadd:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[9]atmapat<console>:26scala>mapadd.collect()scala>mapadd.collect()res8:Array[Int]=Array(2,4,6,8,10,12,14,16,18,20)mapPartitions(func)案例map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDDfuncIterator[T]=>Iterator[U]NM个分map的函数的将被调用N次mapPartitionsM次,需求:创建一个RDD组成新的RDDscala>valrdd=sc.parallelize(Array(1,2,3,4))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:24创建一个scala>valrdd=sc.parallelize(Array(1,2,3,4))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:24组成新的RDDscala>rdd.mapPartitions(x=>x.map(_*2))scala>rdd.mapPartitions(x=>x.map(_*2))res3:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[6]atmapPartitionsat<console>:27scala>res3.collectres4:Array[Int]=Array(2,4,6,8)打印新的scala>res3.collectres4:Array[Int]=Array(2,4,6,8)mapPartitionsWithIndex(func)案例作用:类似于mapPartitionsfuncT的RDDfunc(Int,Interator[T])Iterator[U];需求:创建一个RDDRDDscala>valrdd=sc.parallelize(Array(1,2,3,4))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:24创建一个scala>valrdd=sc.parallelize(Array(1,2,3,4))rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[4]atparallelizeat<console>:24RDDscala>valindexRdd=rdd.mapPartitionsWithIndex((index,items)=>(items.map((index,_)))) indexRdd:indexRdd:org.apache.spark.rdd.RDD[(Int,Int)]=MapPartitionsRDD[5]atmapPartitionsWithIndexat<console>:26scala>indexRdd.collectres2:Array[(Int,Int)]=Array((0,1),(0,2),(1,3),(1,4))打印新的scala>indexRdd.collectres2:Array[(Int,Int)]=Array((0,1),(0,2),(1,3),(1,4))flatMap(func)案例作用:类似于map0(func应)1-5RDDflatMap创建一个新的RDD,新的RDD为原RDD的每个元素的扩展(1->1,2->1,2……5->1,2,3,4,5)scala>valsourceFlat=sc.parallelize(1to5)sourceFlat:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[12]atparallelizeatscala>valsourceFlat=sc.parallelize(1to5)sourceFlat:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[12]atparallelizeat<console>:24打印scala>sourceFlat.collect()scala>sourceFlat.collect()res11:Array[Int]=Array(1,2,3,4,5)scala>valflatMap=sourceFlat.flatMap(1to_)flatMap:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[13]atflatMapat<console>:26(3)根据原RDD创建新RDD(1->1,2->1,2……5->1,2,3,4,5scala>valflatMap=sourceFlat.flatMap(1to_)flatMap:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[13]atflatMapat<console>:26(4)打印新RDDscala>flatMap.collect()scala>flatMap.collect()res12:Array[Int]=Array(1,1,2,1,2,3,1,2,3,4,1,2,3,4,5)map()和mapPartition()的区别mapPartition()RDD数据才能释放,可能导致OOM。mapPartition()glom案例RDDRDD[Array[T]]4个分区的RDDscala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[65]atparallelizeat<console>:24scala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[65]atparallelizeat<console>:24Driver端打印scala>rdd.glom().collect()scala>rdd.glom().collect()res25:Array[Array[Int]]=Array(Array(1,2,3,4),Array(5,6,7,8),Array(9,10,12),Array(13,14,15,16))14,15,16))groupBy(func)案例key需求:创建一个RDD2scala>valrdd=sc.parallelize(1to4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[65]atparallelizeat<console>:24scala>valrdd=sc.parallelize(1to4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[65]atparallelizeat<console>:242scala>valgroup=rdd.groupBy(_%2)scala>valgroup=rdd.groupBy(_%2)group:org.apache.spark.rdd.RDD[(Int,Iterable[Int])]=ShuffledRDD[2]atgroupByat<console>:26scala>group.collectres0:Array[(Int,Iterable[Int])]=Array((0,CompactBuffer(2,4)),(1,CompactBuffer(1,3)))scala>group.collectres0:Array[(Int,Iterable[Int])]=Array((0,CompactBuffer(2,4)),(1,CompactBuffer(1,3)))filter(func)案例RDD,该RDDfunctrueDD(xao)scala>varsourceFilter=sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))sourceFilter:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[10]atparallelizeat<console>:24scala>varsourceFilter=sc.parallelize(Array("xiaoming","xiaojiang","xiaohe","dazhi"))sourceFilter:org.apache.spark.rdd.RDD[String]=ParallelCollectionRDD[10]atparallelizeat<console>:24打印scala>sourceFilter.collect()scala>sourceFilter.collect()res9:Array[String]=Array(xiaoming,xiaojiang,xiaohe,dazhi)scala>valfilter=sourceFilter.filter(_.contains("xiao"))filter:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[11]atfilterat<console>:26xiao”子串的形成一个新的scala>valfilter=sourceFilter.filter(_.contains("xiao"))filter:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[11]atfilterat<console>:26RDDscala>filter.collect()scala>filter.collect()res10:Array[String]=Array(xiaoming,xiaojiang,xiaohe)sample(withReplacement,fraction,seed)案例fractionwithReplacementtruefalseseedD110scala>valrdd=sc.parallelize(1to10)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[20]atparallelizeat<console>:24创建scala>valrdd=sc.parallelize(1to10)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[20]atparallelizeat<console>:24打印scala>rdd.collect()scala>rdd.collect()res15:Array[Int]=Array(1,2,3,4,5,6,7,8,9,10)scala>varsample1=rdd.sample(true,0.4,2)sample1:scala>varsample1=rdd.sample(true,0.4,2)sample1: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[21] at sample at<console>:26scala>sample1.collect()scala>sample1.collect()res16:Array[Int]=Array(1,2,2,7,7,8,9)scala>varsample2=rdd.sample(false,0.2,3)sample2:scala>varsample2=rdd.sample(false,0.2,3)sample2: org.apache.spark.rdd.RDD[Int] = PartitionwiseSampledRDD[22] at sample at<console>:26scala>sample2.collect()res17:Array[Int]=Array(1,scala>sample2.collect()res17:Array[Int]=Array(1,9)distinct([numTasks]))案例作用:对源RDDRDD8需求:创建一个RDDdistinct()scala>valdistinctRdd=sc.parallelize(List(1,2,1,5,2,9,6,1))distinctRdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[34]atparallelizeat<console>:24scala>valdistinctRdd=sc.parallelize(List(1,2,1,5,2,9,6,1))distinctRdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[34]atparallelizeat<console>:24RDD)scala>valunionRDD=distinctRdd.distinct()scala>valunionRDD=distinctRdd.distinct()unionRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[37]atdistinctat<console>:26scala>unionRDD.collect()res20:Array[Int]=Array(1,9,5,6,2)scala>unionRDD.collect()res20:Array[Int]=Array(1,9,5,6,2)RDD(2)scala>valunionRDD=distinctRdd.distinct(2)scala>valunionRDD=distinctRdd.distinct(2)unionRDD:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[40]atdistinctat<console>:26scala>unionRDD.collect()res21:Array[Int]=Array(6,2,1,9,5)scala>unionRDD.collect()res21:Array[Int]=Array(6,2,1,9,5)coalesce(numPartitions)案例4个分区的RDDscala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[54]atparallelizeat<console>:24创建一个scala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[54]atparallelizeat<console>:24查看RDDscala>rdd.partitions.sizeres20:Int=4scala>rdd.partitions.sizeres20:Int=4scala>valcoalesceRDD=rdd.coalesce(3)coalesceRDD:org.apache.spark.rdd.RDD[Int]=CoalescedRDD[55]atcoalesceat<console>:26RDDscala>valcoalesceRDD=rdd.coalesce(3)coalesceRDD:org.apache.spark.rdd.RDD[Int]=CoalescedRDD[55]atcoalesceat<console>:26RDDscala>coalesceRDD.partitions.sizeres21:Int=3scala>coalesceRDD.partitions.sizeres21:Int=3repartition(numPartitions)案例4个分区的RDDscala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[56]atparallelizeat<console>:24创建一个scala>valrdd=sc.parallelize(1to16,4)rdd:org.apache.spark.rdd.RDD[Int]=ParallelCollectionRDD[56]atparallelizeat<console>:24查看RDDscala>rdd.partitions.sizeres22:Int=4scala>rdd.partitions.sizeres22:Int=4scala>valrerdd=rdd.repartition(2)rerdd:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[60]atrepartitionat<console>:26RDDscala>valrerdd=rdd.repartition(2)rerdd:org.apache.spark.rdd.RDD[Int]=MapPartitionsRDD[60]atrepartitionat<console>:26RDDscala>rerdd.partitions.sizeres23:Int=2scala>rerdd.partitions.sizeres23:Int=2coalesce和repartition的区别coalesceshuffleshuffle:Booleanfalse/true决定。defrepartition(numPartitions:Int)(implicitord:Ordering[T]=null):RDD[T]=withScope{coalesce(numPartitions,shuffle=true)}repartitioncoalesceshuffledefrepartition

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论