大数据导论-思维、技术与应用-第9章-大数据实时处理SPARK_第1页
大数据导论-思维、技术与应用-第9章-大数据实时处理SPARK_第2页
大数据导论-思维、技术与应用-第9章-大数据实时处理SPARK_第3页
大数据导论-思维、技术与应用-第9章-大数据实时处理SPARK_第4页
大数据导论-思维、技术与应用-第9章-大数据实时处理SPARK_第5页
已阅读5页,还剩35页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

大数据导论第九章CONTENTS目录PART01Spark简介PART02RDD概念PART03Spark运行架构和机制PART04Spark生态系统PART05Spark编程实践PART06作业PART01Spark简介Spark是加州大学伯克利分校AMP实验室开发的通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目。Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了SparkSQL、SparkStreaming、MLlib和GraphX等组件,逐渐形成大数据处理一站式解决平台。Spark简介Spark优势:1.Spark提供了内存计算,减少写入磁盘的需求,提高了处理效率,更适合交互式分析2.Spark提供了一个统一的框架管理不同来源的数据以及支持更多的计算类型3.Spark操作比Hadoop更容易,而且提供的数据集类型更加丰富4.Spark基于DAG的调度机制比Hadoop更优越,可以使用更多管道数据,控制中间结果行为

Spark简介Spark简介Spark适用场景1.多次操作特定数据集的应用场合

解释:Spark是基于内存的迭代计算框架,需要反复操作的次数越多,所需读取的数据量越大,受益越大;数据量小但是计算密集度较大的场合,受益就相对较小2.数据量不大,但是需要实时统计分析需求3.由于RDD特性,Spark不适用于增量修改的场景

例如:Web服务的存储或者是增量的Web爬虫和索引PART02RDD概念RDD是Spark提供的最重要的抽象概念,它是一种有容错机制的特殊数据集合,可以分布在集群的节点上,以函数式操作集合的方式进行各种并行操作。RDD概念RDD的基本概念1.RDD是一个分布式对象集合,本质上是一个只读的分区记录集合。2.每个RDD可以分成多个分区,每个分区就是一个数据集片段。3.一个RDD的不同分区可以保存到集群中的不同的节点上,从而可以在集群中的不同节点上进行并行计算。RDD概念RDD属性:是只读的,不能修改,只能通过转换操作生成新的RDD。是分布式的,可以分布在多台机器上,进行并行处理。是弹性的,计算过程中内存不够时它会和磁盘进行数据交换。是基于内存的,可以全部或部分缓存在内存中,在多次计算间重用。RDD实质上是一种更为通用的迭代并行计算框架,用户可以显示控制计算的中间结果,然后将其自由运用于之后的计算。RDD概念RDD基本操作1.构建操作第一类方式从内存里构造RDD,使用makeRDD和parallelize方法例:valrdd01=sc.makeRDD(List(1,2,3,4,5,6))第二类方式是通过文件系统构造RDD例:valrdd:RDD[String]=sc.textFile("file:///D:/sparkdata.txt",1)2.转换操作RDD的转换操作是返回新RDD的操作。转换出来的RDD是惰性求值的,只有在行动操作中用到这些RDD时才会被计算RDD概念

函数名作用示例结果map()将函数应用于RDD每个元素,返回值是新的RDDrdd1.map(x=>x+1){2,3,4,4}flatMap()将函数应用于RDD每个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDDrdd1.flatmap(x=>x.to(3)){1,2,3,2,3,3,3}filter()函数会过滤掉不符合条件的元素,返回值是新的RDDrdd1.filter(x=>x!=1){2,3,3}distinct()将RDD里的元素进行去重操作rdd1.distinct()(1,2,3)union()生成包含两个RDD所有元素的新RDDrdd1.union(rdd2){1,2,3,3,3,4,5}intersection()求出两个RDD的共同元素ersection(rdd2){3}subtract()将原RDD里和参数RDD里相同的元素去掉rdd1.subtract(rdd2){1,2}cartesian()求两个RDD的笛卡儿积rdd1.cartesian(rdd2){(1,3),(1,4),…,(3,5)}RDD概念3.行动操作行动操作用于执行计算并按指定的方式输出结果。行动操作接受RDD,但是返回非RDD,即输出一个值或者结果。在RDD执行过程中,真正的计算发生在行动操作RDD概念函数名作用示例结果collect()返回RDD所有元素rdd.collect(){1,2,3,3}count()RDD里元素个数rdd.count()4countByValue()各元素在RDD中出现次数rdd.countByValu(){(1,1),(2,1),(3,2})}take(num)从RDD中返回num个元素rdd.take(2){1,2}top(num)从RDD中,按照默认(降序)或者指定的排序返回最前面num个元素rdd.top(2){3,3}reduce()并行整合所有RDD数据,例如求和操作rdd.reduce((x,y)=>x+y)9fold(zero)(func)和reduce功能一样,需要提供初始值rdd.fold(0)((x,y)=>x+y)9foreach(func)对RDD每个元素都使用特定函数rdd1.foreach(x=>println(x))打印每一个元素saveAsTextFile(path)将数据集的元素,以文本的形式保存到文件系统。rdd1.saveAsTextFile(file://home/test)

saveAsSequenceFile(path)将数据集的元素,以顺序文件格式保存到指定的目录下。saveAsSequenceFile(hdfs://home/test)

RDD概念RDD血缘关系:RDD的最重要的特性之一就是血缘关系(Lineage),血缘关系描述了一个RDD是如何从父RDD计算得来的。如果某个RDD丢失了,则可以根据血缘关系,从父RDD计算得来。RDD概念RDD依赖类型RDD在血缘关系的依赖分为:窄依赖宽依赖窄依赖是指父RDD的每个分区都只被子RDD的一个分区所使用。宽依赖就是指父RDD的分区被多个子RDD的分区所依赖。RDD概念阶段划分从Action开始,遇到窄依赖操作,则划分到同一个执行阶段;遇到宽依赖操作,则划分一个新的执行阶段。后面的Stage需要等待所有的前面的Stage执行完之后才可以执行RDD概念RDD缓存Spark可以使用persist和cache方法将任意RDD缓存到内存、磁盘文件系统中。缓存是容错的,如果一个RDD分片丢失,可以通过构建它的转换自动重构。被缓存的RDD被使用时,存取速度会被大大加速。一般Executor内存的60%做cache,剩下的40%用来执行任务。cache是persist的特例,将该RDD缓存到内存中。而persist可以让用户根据需求指定一个持久化级别级别使用空间CPU时间是否在内存是否在磁盘MEMORY_ONLY高低是否MEMORY_ONLY_SER低高是否MEMORY_AND_DISK高中部分部分MEMORY_AND_DISK_SER低高部分部分DISK_ONLY低高否是RDD概念

spark持久化级别的选择:1.如果RDD可以很好的与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是CPU使用效率最高的选项,它使得RDD的操作尽可能的快。2.如果不行,试着使用MEMORY_ONLY_SER并且选择一个快速序列化库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问。RDD概念

spark持久化级别的选择:3.尽可能不要存储到硬盘上,除非计算数据集函数的计算量特别大,或它们过滤了大量的数据。否则,重新计算一个分区的速度,和与从硬盘中读取基本差不多快。4.如果想有快速故障恢复能力,使用复制存储级别。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让任务在RDD上持续运行,而不需要等待丢失的分区被重新计算。5.在不使用cachedRDD的时候,及时使用unpersist方法来释放它PART03Spark运行架构和机制Spark运行架构和机制Spark总体架构Spark运行架构包括集群资源管理器(ClusterManager)、多个运行作业任务的工作节点(WorkerNode)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)。Spark运行架构和机制

Driver(应用控制器)是运行SparkApplication(应用)的main()函数,它会创建SparkContext。SparkContext负责和ClusterManager(集群管理器)通信,进行资源的申请、任务的分配和监控等。资源管理器负责申请和管理在WorkerNode(工作节点)上运行应用所需的资源。目前有Spark原生的资源管理器、Mesos资源管理器和HadoopYARN资源管理器三种。Spark运行架构和机制

Executor(执行进程)是Application运行在WorkerNode上的一个进程,负责运行Task(任务),并且负责将数据存在内存或者磁盘上,每个Application都有各自独立的一批Executor。每个Executor则包含了一定数量的资源来运行分配给它的任务。每个WorkerNode上的Executor服务于不同。的Application,它们之间是不可以共享数据的Spark运行架构和机制SparkApplication的基本概念一个SparkApplication就是用户编写的Spark应用程序,由一个Driver和若干个Job组成。每个Job都被分隔成多个彼此依赖称之为Stage的Task。一个Task就是一个工作单元,可以发送给一个Executor执行。Task是用来执行应用的实际计算工作。Spark运行架构和机制Spark运行流程Spark运行架构和机制RDD运行过程:构建DAG阶段DAG调度阶段Task调度阶段执行阶段Spark生态圈是伯克利APMLab实验室打造的,力图在算法(Algorithms)、机器(Machines)、人(People)之间通过大规模集成来展现大数据应用的一个平台PART04Spark生态系统Spark生态系统SparkCoreSparkStreamingSparkSQLSparkMllibSparkGraphXPART05Spark编程实践Spark编程实践SparkShell使用1.启动SparkShell:./spark-shell2.利用本地文件系统的一个文本文件创建一个新RDD3.执行动作操作,计算文档中有多少行

textFile.count()4.执行动作操作,获取文档中第一行内容

textFile.first()5.转换操作会将一个RDD转换成一个新的RDD

varnewRDD=textFile.filter(line=>line.contains(""))

vartextFile=sc.textFile("");Spark编程实践SparkShell的WordCount实现valfile=sc.textFile("")valcount=file.flatMap(line=>line.split("")).map(word=>(word,1)).reduceByKey(_+_)count.collect()Spark编程实践importorg.apache.spark.SparkContextimportorg.apache.spark.SparkContext._importorg.apache.spark.SparkConfobjectSimpleApp{defmain(args:Array[String]){vallogFile="file:///usr/local/spark/README.md"//Shouldbesomefileonyoursystemvalconf=newSparkConf().setAppName("SimpleApplication")valsc=newSparkContext(conf)vallogData=sc.textFile(logFile,2).cache()valnumAs=logData.filter(line=>line.contains("a")).count()valnumBs=logData.filter(line=>line.contains("b")).count()println("Lineswitha:%s,Lineswithb:%s".format(numAs,numBs))}}编写Scala应用程序Spark编程实践编写Java应用程序importorg.apache.spark.api.java.*;importorg.apache.spark.api.java.function.Function;publicclassSimpleApp{publicstaticvoidmain(String[]args){StringlogFile="file:///usr/local/spark/README.md";//ShouldbesomefileonyoursystemJavaSparkContextsc=newJavaSparkContext("local","SimpleApp","file:///usr/local/spark/",newString[]{"target/simple-project-1.0.jar"});JavaRDD<String>logData=sc.textFile(logFile).cache();longnumAs=logData.filter(newFunction<String,Boolean>(){publicBooleancall(Strings){returns.contains("a");}}).count();longnumBs=logData.filter(newFunction<String,Boolean>(){publicBooleancall(Strings){returns.contains("b");}}).count();System.out.println("Lineswitha:"+numAs+",lineswithb:"+numBs);}} PART06作业

作业作业:1.Hadoop的主要缺陷是什么?Spark的主要优势是什么?2.Spark的主要使用场景有哪些?3.什么是RDD?它有哪些主要属性?产生RDD的主要原因是什么?4.RDD有哪几类操作?各自的主要作用是什么?5.什么是Spark的惰性计算?有什么优势?6.什么是RDD的血缘关系?Spark保存RDD血缘关系的用途是什么?7.什么是DAG?请举例说明。8.RDD的依赖关系有宽依赖和窄依赖两大类,请解释什么是宽依赖,什么是窄依赖?把依赖关系分解为宽依赖和窄依赖的目的是什么?作业作业:9.图9.13展示的转换中,哪些是宽依赖?哪些是窄依赖?并解释原因作业作业:10.Spark是如何把DAG划分成多个阶段的?划分成多个阶段的依据是什么?划分成多个阶段的目的是什么?11.请对图9.14DAG进行阶段划分,并解释阶段划分的原因。作业

12.RDD的缓存的主要目的是什么?有哪几类缓存级别?13.请画图描述Spark的运行架构,并解释每一个模块的主要作用。14.请画图描述Spark应用的运行流程,并简单描述每一步的任务。15.假定rdd1={2,3,3,5},rdd2={3,4,5},请写出下面转换操作的结果函数名作用示例结果map()将函数应用于RDD每个元素,返回值是新的R

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论