《通信数据分析与实战》课件-第三章 Spark RDD_第1页
《通信数据分析与实战》课件-第三章 Spark RDD_第2页
《通信数据分析与实战》课件-第三章 Spark RDD_第3页
《通信数据分析与实战》课件-第三章 Spark RDD_第4页
《通信数据分析与实战》课件-第三章 Spark RDD_第5页
已阅读5页,还剩71页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

通信数据分析与实战SparkRDD第三章第1节2知道RDD的作用理解RDD的五大特征学习目标TARGETRDD的概述传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式要进行大量的磁盘IO操作。Spark中的RDD可以很好的解决这一缺点。RDD(ResilientDistributedDataset),即弹性分布式数据集,是一个容错的、并行的数据结构,可以让用户显式地将数据存储到磁盘和内存中,并且还能控制数据的分区。对于迭代式计算和交互式数据挖掘,RDD可以将中间计算的数据结果保存在内存中,若是后面需要中间结果参与计算时,则可以直接从内存中读取,从而可以极大地提高计算速度。RDD是Spark提供的最重要的抽象概念,我们可以将RDD理解为一个分布式存储在集群中的大型数据集合,不同RDD之间可以通过转换操作形成依赖关系实现管道化,从而避免了中间结果的I/O操作,提高数据处理的速度和性能。RDD的五大特征

分区列表计算函数依赖其他RDDKV类型分区器优先位置列表RDD的五大特征分区列表每个RDD被分为多个分区(Partitions),这些分区运行在集群中的不同节点,每个分区都会被一个计算任务处理,分区数决定了并行计算的数量,创建RDD时可以指定RDD分区的个数。如果不指定分区数量,当RDD从集合创建时,默认分区数量为该程序所分配到的资源的CPU核数(每个Core可以承载2~4个Partition),如果是从HDFS文件创建,默认为文件的Block数。RDD的五大特征每个分区都有一个计算函数

Spark的RDD的计算函数是以分片为基本单位的,每个RDD都会实现compute函数,对具体的分片进行计算。RDD的五大特征依赖于其他RDD

RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。RDD的五大特征K-V数据类型的RDD分区器当前Spark中实现了两种类型的分区函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于(Key,Value)的RDD,才会有Partitioner(分区),非(Key,Value)的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分区数量,也决定了parentRDDShuffle输出时的分区数量。RDD的五大特征每个分区都有一个优先位置列表优先位置列表会存储每个Partition的优先位置,对于一个HDFS文件来说,就是每个Partition块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。10小结知道RDD的作用理解RDD的五大特征通信数据分析与实战SparkRDD第三章第2节12掌握从本地系统创建RDD掌握并行集合创建RDD学习目标TARGETRDD的创建方式从本地-HDFS文件系统加载数据创建通过并行集合创建RDD从文件系统加载数据创建RDDSpark可以从Hadoop支持的任何存储源中加载数据去创建RDD,包括本地文件系统和HDFS等文件系统。我们通过Spark中的SparkContext对象调用textFile()方法加载数据创建RDD。scala>valtest=sc.textFile("file:///export/data/test.txt")test:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[1]attextFileat<console>:241、从本地文件中加载数据创建RDD从文件系统加载数据创建RDDscala>valtestRDD=sc.textFile("/data/test.txt")testRDD:org.apache.spark.rdd.RDD[String]=/data/test.txtMapPartitionsRDD[1]attextFileat<console>:242、从HDFS中加载数据创建RDD通过并行集合创建RDDscala>valarray=Array(1,2,3,4,5)array:Array[Int]=Array(1,2,3,4,5)scala>valarrRDD=sc.parallelize(array)arrRDD:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[6]atparallelizeat<console>:26Spark可以通过并行集合创建RDD。即从一个已经存在的集合、数组上,通过SparkContext对象调用parallelize()方法创建RDD。17小结掌握从本地系统创建RDD掌握并行集合创建RDD通信数据分析与实战SparkRDD第三章第3节19知道RDD的处理过程熟悉RDD的转换算子熟悉RDD的行动算子学习目标TARGETRDD的处理过程Spark用Scala语言实现了RDD的API,程序开发者可以通过调用API对RDD进行操作处理。RDD经过一系列的“转换”操作,每一次转换都会产生不同的RDD,以供给下一次“转换”操作使用,直到最后一个RDD经过“行动”操作才会被真正计算处理,并输出到外部数据源中,若是中间的数据结果需要复用,则可以进行缓存处理,将数据缓存到内存中。RDD的转换算子RDD处理过程中的“转换”操作主要用于根据已有RDD创建新的RDD,每一次通过Transformation算子计算后都会返回一个新RDD,供给下一个转换算子使用。下面,通过一张表来列举一些常用转换算子操作的API,具体如下。RDD的转换算子下面,我们通过结合具体的示例对这些转换算子API进行详细讲解。filter(func)操作会筛选出满足函数func的元素,并返回一个新的数据集。假设,有一个文件test.txt,下面,通过一张图来描述如何通过filter算子操作,筛选出包含单词“spark”的元素。test.txtRDD(lines)RDD(linesWithSpark)hadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparksparkyuminyuanlongpingsparksc.textFile()lines.filter()RDD的转换算子

通过从test.txt文件中加载数据的方式创建RDD,然后通过filter操作筛选出满足条件的元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:

scala>vallines=sc.textFile("file:///export/data/test.txt")

lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txt

MapPartitionsRDD[1]attextFileat<console>:24

scala>vallinesWithSpark=lines.filter(line=>line.contains("spark"))

linesWithSpark:org.apache.spark.rdd.RDD[String]=MapPartitionsRDD[2]at

filterat<console>:25RDD的转换算子

map(func)操作将每个元素传递到函数func中,并将结果返回为一个新的数据集。有一个文件test.txt,接下来,通过一张图来描述如何通过map算子操作把文件内容拆分成一个个的单词并封装在数组对象中,具体过程如下

test.txtRDD(lines)RDD(words)hadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshiArray(“Hadoop”,“spark”)Array(“tuyouyou”,”yuanshi”)Array(“spark”,”yumin”)Array(“yuanlongping”,“spark”)Array(“zhongnanshan”,“yuanshi”)sc.textFile()lines.map()RDD的转换算子通过从test.txt文件中加载数据的方式创建RDD,然后通过map操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[4]attextFileat<console>:24

scala>valwords=lines.map(line=>line.split(""))words:org.apache.spark.rdd.RDD[Array[String]]=MapPartitionsRDD[13]amapat<console>:25RDD的转换算子flatMap(func)与map(func)相似,但是每个输入的元素都可以映射到0或者多个输出的结果。有一个文件test.txt,接下来,通过一张图来描述如何通过flatMap算子操作,把文件内容拆分成一个个的单词test.txtRDD(lines)RDD(words)hadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshihadoopsparktuyouyouyuanshisparkyuminyuanlongpingsparkzhongnanshanyuanshiArray(“Hadoop”,“spark”)Array(“tuyouyou”,”yuanshi”)Array(“spark”,”yumin”)Array(“yuanlongping”,“spark”)Array(“zhongnanshan”,“yuanshi”)sc.textFile()lines.map()hadoopsparkTuyouyouyuanshiSparkyuminyuanlongpingsparkZhongnanshanyuanshilines.flatmap()lines.flat()RDD的转换算子通过从test.txt文件中加载数据的方式创建RDD,然后通过flatMap操作将文件的每一行内容都拆分成一个个的单词元素,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[5]attextFileat<console>:24

scala>valwords=lines.flatMap(line=>line.split(""))words:org.apache.spark.rdd.RDD[Array[String]]=MapPartitionsRDD[14]atmapat<console>:25RDD的转换算子

groupByKey()主要用于(Key,Value)键值对的数据集,将具有相同Key的Value进行分组,会返回一个新的(Key,Iterable)形式的数据集。同样以文件test.txt为例,接下来,通过一张图来描述如何通过groupByKey算子操作,将文件内容中的所有单词进行分组.

RDD(words)RDD(groupWords)(“Hadoop”,1)(“spark”,1)(“tuyouyou”,1)(“yuanshi”,1)(“spark”,1)(“yumin”,1)(“yuanlongping”,1)(“spark”,1)(“zhongnanshan”,1)(“yuanshi”,1)(“hadoop”,1)(“spark”,(1,1,1))(“tuyouyou”,1)(“yuanshi”,(1,1))(“yumin”,1)(“yaunlongping”,1)(“zhongnanshan”,1)Words.groupByKey()RDD的转换算子通过groupByKey操作把(Key,Value)键值对类型的RDD,按单词将单词出现的次数进行分组,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[6]attextFileat<console>:24

scala>valwords=lines.flatMap(line=>line.split("")).map(word=>(word,1))ords:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[15]atmapat<console>:25

scala>valgroupWords=words.groupByKey()groupWords:org.apache.spark.rdd.RDD[(String,Iterable[Int])]=ShuffledRDD[16]atgroupByKeyat<console>:25RDD的转换算子

reduceByKey()主要用于(Key,Value)键值对的数据集,返回的是一个新的(Key,Iterable)形式的数据集,该数据集是每个Key传递给函数func进行聚合运算后得到的结果。同样以文件test.txt,接下来,通过一张图来描述如何通过reduceByKey算子操作统计单词出现的次数。RDD(words)RDD(reduceWords)(“hadoop”,1)(“spark”,1)(“tuyouyou”,1)(“yuanshi”,1)(“spark”,1)(“yumin”,1)(“yuanlongping”,1)(“spark”,1)(“zhongnanshan”,1)(“yuanshi”,1)(“hadoop”,1)(“spark”,3)(“tuyouyou”,1)(“yuanshi”,2)(“yumin”,1)(“yaunlongping”,1)(“zhongnanshan”,1)Words.reduceByKey()RDD的转换算子通过reduceByKey操作把(Key,Value)键值对类型的RDD,按单词Key将单词出现的次数Value进行聚合,这些元素组成的集合是一个新的RDD。接下来,通过代码来进行演示,具体代码如下:scala>vallines=sc.textFile("file:///export/data/test.txt")lines:org.apache.spark.rdd.RDD[String]=file:///export/data/test.txtMapPartitionsRDD[7]attextFileat<console>:24

scala>valwords=lines.flatMap(line=>line.split("")).map(word=>(word,1))words:org.apache.spark.rdd.RDD[(String,Int)]=MapPartitionsRDD[16]atmapat<console>:25

scala>valreduceWords=words.reduceByKey((a,b)=>a+b)reduceWords:org.apache.spark.rdd.RDD[(String,Int)]=ShuffledRDD[17]atreduceByKeyat<console>:25RDD的行动算子行动算子主要是将在数据集上运行计算后的数值返回到驱动程序,从而触发真正的计算。下面,通过一张表来列举一些常用行动算子操作的API,具体如下。RDD的行动算子count()主要用于返回数据集中的元素个数。假设,现有一个arrRdd,如果要统计arrRdd元素的个数,示例代码如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.count()es0:Long=5RDD的行动算子first()主要用于返回数组的第一个元素。现有一个arrRdd,如果要获取arrRdd中第一个元素,示例代码如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.first()res1:Int=1RDD的行动算子

take()主要用于以数组的形式返回数组集中的前n个元素。现有一个arrRdd,如果要获取arrRdd中的前三个元素,示例代码如下scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.take(3)res2:Array[Int]=Array(1,2,3)RDD的行动算子reduce()主要用于通过函数func(输入两个参数并返回一个值)聚合数据集中的元素。现有一个arrRdd,如果要对arrRdd中的元素进行聚合,示例代码如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.reduce((a,b)=>a+b)res3:Int=15RDD的行动算子collect()主要用于以数组的形式返回数据集中的所有元素。现有一个rdd,如果希望rdd中的元素以数组的形式输出,示例代码如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.collect()res4:Array[Int]=Array(1,2,3,4,5)RDD的行动算子

foreach()主要用于将数据集中的每个元素传递到函数func中运行。现有一个arrRdd,如果希望遍历输出arrRdd中的元素,示例代码如下:scala>valarrRdd=sc.parallelize(Array(1,2,3,4,5))arrRdd:org.apache.spark.rdd.RDD[Int]=ParallelcollectionRDD[0]atparallelizeat<console>:24

scala>arrRdd.foreach(x=>println(x))

1

2

3

4

539小结知道RDD的处理过程熟悉RDD的转换算子熟悉RDD的行动算子通信数据分析与实战SparkRDD第三章第4节41知道RDD的分区作用熟悉RDD的分区方式了解RDD的自定义分区学习目标TARGETRDD的分区在分布式程序中,网络通信的开销是很大的,因此控制数据分布以获得最少的网络传输可以极大的提升程序的整体性能,Spark程序可以通过控制RDD分区方式来减少通信开销。Spark中所有的RDD都可以进行分区,系统会根据一个针对键的函数对元素进行分区。虽然Spark不能控制每个键具体划分到哪个节点上,但是可以确保相同的键出现在同一个分区上。RDD分区的作用RDD的分区RDD分区的默认数目Mesos模式RDD的分区原则是分区的个数尽量等于集群中的CPU核心(Core)数目。对于不同的Spark部署模式而言,都可以通过设置spark.default.parallelism这个参数值来配置默认的分区数目。默认为本地机器的CPU数目,若设置了local[N],则默认为NLocal模式Standalone/yarn模式默认的分区数是8。在“集群中所有CPU核数总和”和“2”这两者中取较大值作为默认值RDD的分区RDD分区的方式Spark框架为RDD提供了两种分区方式,分别是哈希分区(HashPartitioner)和范围分区(RangePartitioner)。哈希分区是根据哈希值进行分区;范围分区是将一定范围的数据映射到一个分区中。这两种分区方式已经可以满足大多数应用场景的需求。与此同时,Spark也支持自定义分区方式,即通过一个自定义的Partitioner对象来控制RDD的分区,从而进一步减少通信开销。RDD的分区RDD分区的方式需要注意的是,RDD的分区函数是针对(Key,Value)类型的RDD,分区函数根据Key对RDD元素进行分区。因此,当需要对一些非(Key,Value)类型的RDD进行自定义分区时,需要先把RDD元素转换为(Key,Value)类型,再通过分区函数进行分区操作。如果想要实现自定义分区,就需要定义一个类,使得这个自定义的类继承org.apache.spark.Partitioner类,并实现其中的3个方法,具体如下:(1).defnumPartitions:Int:用于返回创建的分区个数。(2).defgetPartition(Key:Any):用于对输入的Key做处理,并返回该Key的分区ID,分区ID的范围是0~numPartitions-1。(3).equals(other:Any):用于Spark判断自定义的Partitioner对象和其他的Partitioner对象是否相同,从而判断两个RDD的分区方式是否相同。46小结知道RDD的分区作用熟悉RDD的分区方式了解RDD的自定义分区通信数据分析与实战SparkRDD第三章第5节48知道RDD的窄依赖关系知道RDD的宽依赖关系学习目标TARGETRDD的依赖关系宽依赖窄依赖RDD的依赖关系窄依赖是指父RDD的每一个分区最多被一个子RDD的分区使用,即OneToOneDependencies。窄依赖的表现一般分为两类,第一类表现为一个父RDD的分区对应于一个子RDD的分区;第二类表现为多个父RDD的分区对应于一个子RDD的分区。一个父RDD的一个分区不可能对应一个子RDD的多个分区。为了便于理解,我们通常把窄依赖形象的比喻为独生子女。窄依赖RDD的依赖关系RDD做map、filter和union算子操作时,是属于窄依赖的第一类表现;而RDD做join算子操作(对输入进行协同划分)时,是属于窄依赖表现的第二类。输入协同划分是指多个父RDD的某一个分区的所有Key,被划分到子RDD的同一分区。当子RDD做算子操作,因为某个分区操作失败导致数据丢失时,只需要重新对父RDD中对应的分区做算子操作即可恢复数据。窄依赖RDD的依赖关系宽依赖是指子RDD的每一个分区都会使用所有父RDD的所有分区或多个分区,即OneToManyDependecies。为了便于理解,我们通常把宽依赖形象的比喻为超生。宽依赖父RDD做groupByKey和join(输入未协同划分)算子操作时,子RDD的每一个分区都会依赖于所有父RDD的所有分区。当子RDD做算子操作,因为某个分区操作失败导致数据丢失时,则需要重新对父RDD中的所有分区进行算子操作才能恢复数据。RDD的依赖关系Join算子操作既可以属于窄依赖,也可以属于宽依赖.

当join算子操作后,分区数量没有变化则为窄依赖(如joinwithinputsco-partitioned,输入协同划分)当join算子操作后,分区数量发生变化则为宽依赖(如joinwithinputsnotco-partitioned,输入非协同划分)宽窄依赖的注意点54小结知道RDD的窄依赖关系知道RDD的宽依赖关系通信数据分析与实战SparkRDD第三章第6节56熟悉RDD的持久化机制知道RDD的容错机制学习目标TARGETRDD的机制容错机制持久化机制RDD的机制在Spark中,RDD是采用惰性求值,即每次调用行动算子操作,都会从头开始计算,这对迭代计算来说代价很大,因为迭代计算经常需要多次重复的使用同一组数据集,所以为了避免重复计算的开销,让Spark对数据集进行持久化操作。RDD的持久化操作有两种方法,分别是cache()方法和persist()方法。persist()方法的存储级别是通过StorageLevel对象设置的。cache()方法的存储级别是使用默认的存储级别(即StorageLevel.MEMORY_ONLY)。持久化机制RDD的机制持久化机制RDD的机制持久化机制使用persist()方法对RDD进行持久化RDD的机制持久化机制使用cache()方法对RDD进行持久化RDD的机制容错机制持久化机制RDD的机制容错机制当Spark集群中的某一个节点由于宕机导致数据丢失,则可以通过Spark中的RDD进行容错恢复已经丢失的数据。RDD提供了两种故障恢复的方式,分别是血统(Lineage)方式和设置检查点(checkpoint)方式。RDD的机制容错机制血统方式(Lineage)根据RDD之间依赖关系对丢失数据的RDD进行数据恢复。若丢失数据的子RDD进行窄依赖运算,则只需要把丢失数据的父RDD的对应分区进行重新计算,不依赖其他节点,并且在计算过程中不存在冗余计算;若丢失数据的RDD进行宽依赖运算,则需要父RDD所有分区都要进行从头到尾计算,计算过程中存在冗余计算。RDD的机制容错机制设置检查点(checkPoint)方式本质是将RDD写入磁盘存储。当RDD进行宽依赖运算时,只要在中间阶段设置一个进行检查点容错,即Spark中的sparkContext调用setCheckpoint()方法,设置容错文件系统目录作为检查点checkpoint,将checkpoint的数据写入之前设置的容错文件系统中进行持久化存储,若后面有节点宕机导致分区数据丢失,则以从做

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论