第7章 内存计算框架Spark_第1页
第7章 内存计算框架Spark_第2页
第7章 内存计算框架Spark_第3页
第7章 内存计算框架Spark_第4页
第7章 内存计算框架Spark_第5页
已阅读5页,还剩62页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第7章内存计算框架Spark7.1.1Spark简介嵌入式系统基本概念

Spark最初由UCBerkeleyAMPlab(加州大学伯克利分校的AMP实验室)于2009年开发,可用于构建大型的、低延迟的数据分析应用程序,是基于内存计算的大数据并行计算框架。7.1.1Spark简介嵌入式系统基本概念

Spark具有如下几个主要特点:1.运行速度快与MapReduce相比,Spark可以支持包括Map和Reduce在内的更多操作,这些操作相互连接形成一个有向无环图(DirectedAcyclicGraph,DAG),各个操作的中间结果被保存在内存中,因此,Spark的处理速度比MapReduce快得多。2.容易使用Spark可以使用Scala、Java、Python、R和SOL进行编程,还可以通过SparkShell进行交互式编程。7.1.1Spark简介嵌入式系统基本概念

3.通用性Spark提供了完整而强大的技术栈,包括SQL查询、流式计算、机器学习和图算法等组件,开发者可以在同一个应用程序中无缝组合地使用这些组件。4.运行模式多样Spark支持多种运行模式:本地运行模式、分布式运行模式。Spark集群的底层资源可以借助于外部的框架进行管理,比如,Spark可以使用Hadoop的YARN和AppacheMesos作为它的资源管理和调度器,并且可以访问HDFS、Cassandra、HBase、Hive等多种数据源。7.1.2Spark生态系统Spark的设计遵循“一个软件栈满足不同应用场景”的理念,逐渐形成了一套完整的生态系统,Spark生态系统如图7-1所示。

Spark的设计遵循“一个软件栈满足不同应用场景”的理念,逐渐形成了一套完整的生态系统,Spark生态系统如图7-1所示。图7-1Spark生态系统7.1.3Spark运行架构嵌入式系统基本概念

图7-2Spark运行架构7.2Scala基础嵌入式系统基本概念由于Spark是由Scala语言编写的,Scala语言是Spark编程的首选语言,为了后续更好地学习Spark,需要首先学习Scala语言。7.2.1Scala简介嵌入式系统基本概念

Scala是一种将面向对象和函数式编程结合在一起的高级语言,旨在以简洁、优雅和类型安全的方式表达通用编程模式。Spark支持使用Scala、Java、Python和R语言进行编程。Spark编程首选语言是Scala,因为Spark这个软件本身就是使用Scala语言开发的,Scala具有强大的并发性,支持函数式编程,可以更好地支持分布式系统,并且支持高效的交互式编程。采用Scala语言编写Spark应用程序,可以获得很好的性能。7.2.1Scala简介嵌入式系统基本概念

Scala是一种基于JVM的跨平台编程语言,Scala编译器可以将Scala源码编译成符合JVM虚拟机规范的中间字节码文件,在JVM平台上解释和运行;Scala语言的API可无缝兼容Java的API。7.2.2Scala编程嵌入式系统基本概念

Scala中使用关键字val和var声明变量。val类似Java中的final变量,也就是常量,一旦初始化将不能修改;var类似Java中的非final变量,可以被多次赋值,多次修改。Scala语言使用灵活,支持自动类型推测,非常简洁高效。如定义一个初值为5的整型常量,下面两种写法是等效的:vala=5vala:Int=57.2.2Scala编程嵌入式系统基本概念【例7-1】从控制台读入数据:使用readInt、readDouble、readLine输入整数、实数、字符串,使用print、println输出已读入的数据。scala>importio.StdIn._scala>varx=readInt()scala>vary=readDouble()scala>varstr=readLine("pleaseinputyourname:")scala>print(x)scala>println(y)scala>println(str)7.2.2Scala编程嵌入式系统基本概念

【例7-2】创建一个列表list1,包含10个元素,这10个元素是乱序的,然后这些元素进行以下操作并生成新的列表,list1保持不变:遍历,排序,反转,求和(多种方法,包括用循环结构),对所有元素乘2,对所有元素加1,过滤出只包含偶数的列表,拆分成大小为3的子列表。7.2.2Scala编程嵌入式系统基本概念

//创建一个列表list1scala>vallist1=List(3,9,2,5,7,6,1,8,0,4)//遍历scala>list1.foreach(println)//排序scala>vallist2=list1.sorted//反转scala>vallist3=list1.reverse7.2.2Scala编程嵌入式系统基本概念

//求和vallist4=list1.sum//求和scala>vallist5=list1.reduce(_+_)//用循环结构求和scala>varh=0for(i<-0tolist1.length-1)h=h+list1(i)scala>println("h="+h)7.2.2Scala编程嵌入式系统基本概念//每个元素乘以2scala>vallist5=list1.map(_*2)//每个元素加1scala>vallist6=list1.map(_+1)//过滤出只包含偶数的列表scala>vallist7=list1.filter(x=>x%2==0)7.2.2Scala编程嵌入式系统基本概念

//拆分成大小为3的子列表scala>valged=list1.grouped(3)scala>ged.nextscala>ged.nextscala>ged.nextscala>ged.next7.4RDD编程基础嵌入式系统基本概念

RDD(ResilientDistributedDataset),弹性分布式数据集,是分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合。RDD是Spark的核心概念。7.4.1SparkShell的启动和退出嵌入式系统基本概念

图7-17SparkShell成功启动后的界面7.4.2RDD创建嵌入式系统基本概念

1.从对象集合创建RDDSpark可以调用SparkContext的parallelize(),以一个已经存在的集合(列表)为数据源,创建RDD。例如,通过一个List列表创建RDD,命令如图7-18所示。图7-18parallelize创建RDD7.4.2RDD创建嵌入式系统基本概念

2.从外部存储创建RDDSpark的textFile()方法可以从文件系统中加载数据创建RDD,下面演示从HDFS和Linux本地加载数据创建RDD。(1)从HDFS中加载数据例如,在HDFS上有一个文件“/test2/f1.txt”,读取该文件创建一个RDD,命令如图7-19所示。图7-19从HDFS中加载数据7.4.2RDD创建嵌入式系统基本概念

(2)从Linux本地加载数据例如,在Linux本地有一个文件“/user/data/f2.txt”,读取该文件创建一个RDD,命令如图7-20所示。图7-20从Linux本地加载数据7.4.3常用的RDD操作嵌入式系统基本概念RDD的操作分为行动操作(Action)和转换操作(Transformation)。1.行动操作Spark程序执行到行动操作时,才会从文件中加载数据,完成一次又一次转换操作,最后完成行动操作。由于转换操作需要行动操作才能查看命令执行的结果,所以先介绍行动操作。7.4.3常用的RDD操作嵌入式系统基本概念

(1)count()返回RDD中的元素个数(2)collect()以数组的形式返回数据集中的所有元素(3)first()返回数据集中的第一个元素(4)take(n)返回一个数组,数组元素由数据集的前n个元素组成(5)reduce(func)通过函数func聚合数据集中的元素(6)foreach(func)对RDD中的每一个元素运行给定的函数func7.4.3常用的RDD操作嵌入式系统基本概念例如,统计rdd1中的元素个数,以数组的形式返回rdd1的所有元素,取rdd1的第一个元素,取rdd1的前3个元素,求rdd1中所有元素之和,输出rdd1的所有元素,代码及执行结果如图7-21所示。RDD的操作如果没有参数,后面的括号是可以省略的。7.4.3常用的RDD操作嵌入式系统基本概念

图7-21RDD的行动操作7.4.3常用的RDD操作嵌入式系统基本概念2.转换操作对于RDD而言,每一次转换操作都会产生新的RDD,供给下一个“转换”使用。转换操作是惰性操作,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会触发“从头到尾”的真正的计算。7.4.3常用的RDD操作嵌入式系统基本概念

(1)filter(func)例如,过滤出rdd1中大于3的元素,并输出结果,可用两种方法实现,代码及执行结果如图7-22所示。7.4.3常用的RDD操作嵌入式系统基本概念图7-22过滤出rdd1中大于3的元素7.4.3常用的RDD操作嵌入式系统基本概念

(2)map(func)例如,将rdd1中每一个元素加5,并输出结果,代码及执行结果如图7-23所示。图7-23将rdd1中每一个元素加57.4.3常用的RDD操作嵌入式系统基本概念

(3)flatMap(func)flatMap(func)操作是将RDD中的每一个元素传递到函数func中,将返回的迭代器(数组、列表)中的所有元素构成新的RDD。例如,用map(func)和flatMap(func)分割字符串,代码及执行结果如图7-24所示。7.4.3常用的RDD操作嵌入式系统基本概念

图7-24用map(func)和flatMap(func)分割字符串7.4.3常用的RDD操作嵌入式系统基本概念(4)groupByKey()groupByKey()应用于(key,value)键值对的RDD,可以将相同key的元素聚集到一起,返回一个新的(key,Iterable)形式的RDD。例如,有两位员工zhangsan和lisi,zhangsan的工资和奖金分别为5000元、1500元,lisi的工资和奖金分别为6000元、2000元,统计zhangsan和lisi的总收入,代码及执行结果如图7-25所示。7.4.3常用的RDD操作嵌入式系统基本概念

图7-25groupByKey()示例7.4.3常用的RDD操作嵌入式系统基本概念(5)reduceByKey(func)reduceByKey(func)应用于(key,value)键值对的RDD时,返回一个新的(key,value)形式的RDD,其中的每个值是将每个key传递到函数func中进行聚合后得到的结果。用reduceByKey(func)重做上例,代码及执行结果如图7-26所示。7.4.3常用的RDD操作嵌入式系统基本概念图7-26reduceByKey(func)示例7.4.3常用的RDD操作嵌入式系统基本概念

(6)sortBy()sortBy()将RDD中的元素按照某一字段进行排序,其中第一个参数为排序字段,第二个参数是一个布尔值,指定升序(默认)或降序。若第二个参数是false则为降序排列。例如,列表中存放了一些(key,value)键值对,用该列表创建RDD,然后对该RDD中的元素按照第二个字段降序排列,代码及执行结果如图7-27所示。7.4.3常用的RDD操作嵌入式系统基本概念图7-27sortBy()示例7.4.3常用的RDD操作嵌入式系统基本概念

(7)union()union()的参数是RDD,它将两个RDD合并为一个新的RDD,两个RDD中的数据类型要保持一致。例如,创建两个RDD,并将这两个RDD合并成一个新的RDD,代码及执行结果如图7-28所示。7.4.3常用的RDD操作嵌入式系统基本概念

图7-28union()示例7.4.3常用的RDD操作嵌入式系统基本概念

(8)mapValues(func)mapValues(func)对键值对RDD中的每个value都应用函数func,但是key不会发生变化。例如,将键值对RDD中的每个value加2,代码及执行结果如图7-29所示。7.4.3常用的RDD操作嵌入式系统基本概念

图7-29mapValues(func)示例7.4.3常用的RDD操作嵌入式系统基本概念

(9)keys键值对RDD中的每个元素都是(key,value)形式,keys操作只会把键值对RDD中的key返回,形成一个新的RDD。(10)values键值对RDD中的每个元素都是(key,value)形式,values操作只会把键值对RDD中的value返回,形成一个新的RDD。例如,分别输出键值对RDD中的每个元素的键和值,代码及执行结果如图7-30所示。7.4.3常用的RDD操作嵌入式系统基本概念

图7-30keys和values示例7.4.4RDD编程实例嵌入式系统基本概念

【例7-3】假设在HDFS的/test2目录下,有多个文本文件,每个文件存储多个英文单词,单词之间用空格分隔。请统计每个单词出现的次数。7.4.4RDD编程实例嵌入式系统基本概念

scala>vallines=sc.textFile("/test2")scala>valrddfalt=lines.flatMap(x=>x.split(""))scala>valwordCount=rddfalt.map(x=>(x,1)).reduceByKey((a,b)=>a+b)scala>wordCount.collect()scala>wordCount.foreach(println)7.4.4RDD编程实例嵌入式系统基本概念

【例7-4】hadoop.txt和spark.txt分别存储了同一个班的《Hadoop基础》和《Spark编程基础》成绩,其中第1列表示学号,第2列表示成绩。请在SparkShell中通过编程来完成以下操作:7.4.4RDD编程实例嵌入式系统基本概念

1.从本地文件系统中加载hadoop.txtscala>valhadoop1=sc.textFile("file:///home/shxx/data/hadoop.txt")2.从本地文件系统中加载spark.txtscala>valspark1=sc.textFile("file:///home/shxx/data/spark.txt")7.4.4RDD编程实例嵌入式系统基本概念

3.查询学生spark成绩中的前3名使用map转换数据,每条数据被分割成2列,表示学号和成绩,分隔符为“,”,存储为二元组格式,成绩要求转化为Int类型。scala>valm_spark=spark1.map{x=>valline=x.split(",");(line(0),line(1).trim.toInt)}通过sortBy对元组中的成绩列降序排序scala>valsort_spark=m_spark.sortBy(x=>x._2,false)通过take()操作取出每个RDD的前3个值scala>sort_spark.take(3).foreach(println)7.4.4RDD编程实例嵌入式系统基本概念4.查询spark成绩90分以上的学生scala>valfilter_spark=m_spark.filter(x=>x._2>90)scala>filter_spark.foreach(println)5.查询spark成绩的最高分和最低分scala>valmax_spark=m_spark.values.maxscala>valmin_spark=m_spark.values.min7.4.4RDD编程实例嵌入式系统基本概念6.求spark课程的平均成绩scala>valp1_spark=m_spark.values.map(x=>(x,1.0)).reduce((x,y)=>(x._1+y._1,x._2+y._2))scala>valp2_spark=p1_spark._1/p1_spark._27.4.4RDD编程实例嵌入式系统基本概念7.输出每位学生所有科目的总成绩scala>valm_hadoop=hadoop1.map{x=>valline=x.split(",");(line(0),line(1).trim.toInt)}scala>valunion_hs=m_spark.union(m_hadoop)scala>valall_score=union_hs.reduceByKey((a,b)=>a+b)scala>all_score.foreach(println)7.4.4RDD编程实例嵌入式系统基本概念

8.输出每位学生的平均成绩scala>valpj_score=all_score.mapValues(x=>x/2.0)scala>pj_score.foreach(println)7.5IDEA的安装和使用嵌入式系统基本概念IDEA全称IntelliJIDEA,是一个通用的集成开发环境,IntelliJIDEA在业界被公认为最好的scala和java开发工具之一。我们可以使用IDEA开发Spark独立应用程序,7.5.2IDEA的使用嵌入式系统基本概念在$SPARK_HOME/examples/src/main/scala/org/apache/spark/examples目录下,Spark自带了名为“SparkPi”的例子程序,该例子采用分布式估算的方法求圆周率。计算原理为:假设有一个正方形,边长是x,该正方形里面有一个内切圆,可以算出正方形的面积S等于x²,内切圆的面积C等于Pi×(x/2)²,于是就有Pi=4×C/S。可以利用计算机随机产生大量位于正方形内部的点,通过点的数量去近似表示面积。假设位于正方形中点的数量为Ps,落在圆内的点的数量为Pc,则当随机点的数量趋近于无穷时,4×Pc/Ps将逼近于Pi。7.5.2IDEA的使用嵌入式系统基本概念importscala.math.randomimportorg.apache.spark.sql.SparkSession/**Computesanapproximationtopi*/objectSparkPi{defmain(args:Array[String]):Unit={valspark=SparkSession.builder.appName("SparkPi")/**如果在IDEA本地上运行需要加上.master("local"),如果要提交到Spark集群一定不能加.master("local")*/.master("local").getOrCreate()7.5.2IDEA的使用嵌入式系统基本概念

valslices=if(args.length>0)args(0).toIntelse2valn=math.min(100000L*slices,Int.MaxValue).toInt//avoidoverflowvalcount=spark.sparkContext.parallelize(1untiln,slices).map{i=>valx=random*2-1valy=random*2-1if(x*x+y*y<=1)1else0}.reduce(_+_)println(s"Piisroughly${4.0*count/(n-1)}")spark.stop()}}1.4.3YARN嵌入式系统基本概念

YARN(YetAnotherResourceNegotiator,另一种资源协调者)是一种新的Hadoop资源管理器,它是一个通用资源管理系统,可为上层应用提供统一的资源管理和调度,它的引入为集群在资源利用率、资源统一管理和数据共享等方面带来了很大好处。YARN采用主从架构(Master/Slave),包括:ResourceManager、ApplicationMaster和NodeManager三个核心组件。1.5Hadoop生态系统嵌入式系统基本概念

图1-1hadoop的生态系统1.5.1HBase嵌入式系统基本概念HBase(HadoopDatabase,hadoop数据库)是一个针对非结构化数据的可伸缩、高可靠、高性能、分布式和面向列的开源数据库,一般采用HDFS作为底层数据存储系统。HBase是针对GoogleBigtable的开源实现,二者采用相同数据模型,具有强大的非结构化数据存储能力。HBase使用ZooKeeper进行管理,它保障查询速度的一个关键因素就是RowKey的设计是否合理。HBase中保存的数据可以使用MapReduce来处理。1.5.2Hive嵌入式系统基本概念

Hive是一个基于Hadoop的数据仓库工具,最早由Facebook开发并使用。Hive可以将结构化的数据文件映射为一张数据库表,并提供SQL查询功能,这套SQL称为HiveQL(HiveQueryLanguage)。当采用MapReduce作为执行引擎时,Hive能将HiveQL语句转换成一系列MapReduce作业,并提交到Hadoop集群上运行。Hive大大降低了学习门槛,同时也提升了开发效率,非常适合数据仓库的统计分析。1.5.3Pig嵌入式系统基本概念

Pig是一种数据流语言和运行环境,用于检索大规模的数据集。Pig定义了一种数据流语言—PigLatin,将脚本转换为MapReduce任务在Hadoop上执行,适合于

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论