it教程网spark专-亚太研究院共享完毕与mpi作者张天元_第1页
it教程网spark专-亚太研究院共享完毕与mpi作者张天元_第2页
it教程网spark专-亚太研究院共享完毕与mpi作者张天元_第3页
it教程网spark专-亚太研究院共享完毕与mpi作者张天元_第4页
it教程网spark专-亚太研究院共享完毕与mpi作者张天元_第5页
免费预览已结束,剩余27页可下载查看

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、Life is short, you need spark!前言世界上第一本由网络社区多人协作完成的系统性的 Spark 专刊在您的面前!-Spark 亚太Hadoop 和 Spark 联合A winning combination !,而了的大数据世界基石和,Spark + Hadoop =正在悄悄发生变化,这种变化趋势和最终状态是由 Hadoop 负责数据和资源管理,由 Spark 负责多元化的不同规模的数据计算,而正如您所知计算是大数据的精髓之所在!Spark 是大数据领域最活跃最热门的高效的大数据通用计算,基于 RDD,Spark 成功的构建起了、多元化的大数据处理体系,在“One S

2、tack to rule themall”的引领下,Spark 成功的使用 Spark SQL、Spark Streaming、MLLib、GraphX近乎完美的解决了大数据中 Batch Prosing、Streaming Prosing、Ad-hocQuery 等三大问题,更为美妙的是在 Spark 中 Spark SQL、Spark Streaming、MLLib、GraphX 四大子框架和库之间可以无缝的共享数据和操作,这是据都无可匹敌的优势。任何大数在实际的生产环境中世界上已经出现很多一千个以上节点的 Spark 集群,以 eBay为例,eBay 的 Spark 集群节点已经超过 2

3、000 个,Yahoo!等公司也在大规模的使用Spark,国内的淘宝、腾讯、网易、京东、大众点评、优酷土豆等也在生产环境下深度使用 Spark。2014 Spark Summit 上的信息,Spark 已经获得世界 20 家顶级公司的支持,这些公司中包括el、IBM 等,同时更重要的是包括了最大的四个Hadoop商都提供了对非常强有力的支持 Spark 的支持。与 Spark 火爆程度形成鲜明对比的是 Spark的严重稀缺,这一情况尤其严重,这种的稀缺一方面是由于 Spark 技术在 2013、2014 年才被大陆这边的IT 实际接触,另一方面是由于匮乏 Spark 相关的足够出色的中文资料和

4、系统化的培训。为此,Spark 亚太期公益大讲堂”,具体ht和 51CTO 联合推出了“Spark 亚太信息请参考:/course/course_id-1659.html决胜大数据时代 1002 / 33: 4006-998-758Life is short, you need spark!与此同时,由 Spark 亚太提出,结合网络社区的力量,构建起了 Spark 专刊编写团队,成员们历经 2 个月左右的艰苦努力和反复修改,Spark 专刊 V1.0 终于完成。在此,我谨代表 Spark 亚太致以深刻的敬意!向编写团队所有成员热情而专业的工作当然,作为世界上第一本相对系统 Spark 专刊,

5、议或者意见都可以发邮件到 marketinSpark 专刊编写团队,也请发邮件到 marketin之处在所难免,大家有任何建;同时如果您想加入进行申请。Spark 专刊的编写是一个持续新的、不断版本迭代的过程,每一次家提供更高质量的 Spark 专刊书籍。都会尽全力给大最后,也是最重要的,请允许我荣幸的介绍一下的 Spark 专刊书籍第一个版本编写的团队成员,不分先后,他们(括号内为每个人负责编写的)分别是:王家虎(Scala 语言)(Spark 内核)(Spark on Yarn)(sparkSQL)张宇(Spark 多语言编程)张天元(Spark 与 MPI)陈李黄伟(Tachyon)军(

6、Spark MLlib 机器学习)忠(Spark 学习最佳路径)王家林(Spark 架构设计) 王家林(Spark Streaming)王家林(Spark 生态系统)王家林(图计算,请参考书籍Spark+GraphX 大规模图计算和图挖掘)Life is short, You need Spark!Spark 亚太院长王家林2014 年 10 月3 / 33Life is short, you need spark!Spark 亚太决胜大数据时代 100 期公益大讲堂简介作为下一代云计算的技术,Spark 性能超 Hadoop 百倍,算法实现仅有其 1/10或 1/100,是可以Hadoop

7、的目前唯一替代者,能够做 Hadoop 做的一切事情,同时速度比Hadoop 快了 100 倍以上。目前 Spark 已经构建了自己的整个大数据处理生态系统,国外一些大型互联网公司已经部署了 Spark。甚至连 Hadoop 的早期主要贡献者Yahoo 现在也在多个项目中部署使用 Spark;国内的淘宝、优酷土豆、网易、Baidu、腾讯、网等已经使用 Spark 技术用于自己的商业生产系统中,国内外的应用开始越来越广泛。Spark 正在逐渐成熟,并在这个领域扮演更加重要的角色, 刚刚结束的2014 Spark Summit 上的信息,Spark 已经获得世界 20 家顶级公司的支持,这些公司中

8、包括el、IBM 等,同时更重要的是包括了最大的四个 Hadoop商都提供了对非常强有力的支持 Spark 的支持.鉴于 Spark 的巨大价值和潜力,同时由于国内极度缺乏Spark,Spark 亚太在完成了对 Spark 源码的彻底研究的同时,不断在实际环境中使用 Spark 的各种特性的基础之上,推出了 Spark 亚太决胜大数据时代 100 期公益大讲堂,希望能够帮助大家了解Spark 的技术。同时,对Spark培养有近一步需求的企业和个人,以公开课和企业内训的方式,来帮助大家进行 Spark 技能的。同样,也为企业提供Spark 亚太的顾问式服务及Spark 一站式项目解决方案和实施方

9、案。决胜大数据时代 100 期公益大讲堂是国内第一个 Spark 课程免费线上讲座,每周一期,从 7 月份起,每周四晚 20:00-21:30,与大家不见不散!老师将就Spark 内核剖析、源码解读、性能优化及商业实战案例等货不容错过!内容与大家,干时间:从 7 月份起,每周一期,每周四晚 20:00-21:30形式:腾讯课堂学习条件:对云计算大数据感的技术课程学习地址:ht/course/course_id-1659.html4 / 33: 4006-998-758Life is short, you need spark!Spark 与MPI()目录第 1 章1.11.2绪论6计算科学界需

10、要大规模并行计算6现行计算科学并行标准:MPI71.2.11.2.21.2.3MPI 代码模版7MPI 程序的编译运行9MPI 数据通讯101.3新的并行计算的兴起:Spark111.3.11.3.21.3.3Spark 的:RDD11Spark 程序的编译运行13Spark 生态环境15第 2 章Spark 的优势162.1简易的编程模型162.1.12.1.22.1.32.22.3隐式的计算资源管理和分配16简洁的方法调用16面象的编程. 17稳定的运行.18计算资源的弹性扩展19第 3 章3.13.23.3从MPI 到Spark19Spark 编程.19MPI 中并行数据处理的Spark

11、 实现20MPI 中常用消息传递函数的Spark 实现20聚合通讯21点对点通讯23同步243.3.13.3.23.3.3第 4 章4.14.2样例比较:圆周率的积分计算24计算方法24代码比较25MPI 源代码25Spark 源代码27运行效率比较29第 5 章总结与展望30参考资料:315 / 33Spark 亚太群Life is short, you need spark!第 1 章绪论1.1计算科学界需要大规模并行计算随着计算机科学技术的发展,基于计算机数值计算的计算物理、计算化学等计算科学也日益成为了与理论科学、实验科学并列的一大科学领域。计算科学在逐渐扩大其的同BLAS、时,也催生

12、了一系列数学工具如 R 语言、Mathematica,数学的推动作用,使得LAPACK、ARPACK 等。它们对计算科学的发展起了人可以很容易的应用计算机的运算能力解决一些理论和实验无法解决。图 1.1. 计算科学实例:左:苯的最低未占分子轨道;右:X-43A 试验机於 7算流体力学(CFD)等值线图速度时的计自从计算机诞生之日起,它的计算能力就以摩尔定律所认为,计算机的计算能力能够保持这样的发展速度。然而,单个的那样飞速发展。人们一度的运算速度已经非常缓慢,逐渐接近于单个运算的极限。与此同时,随着计算科学的发展,计算科学对于计算机的计算能力需求不断攀升。单个的计算能力已经远远不能满足现代计算

13、科学的需求。比如想要计算一个蛋白质分子的折叠,计算量极大,如果仅仅使用一个算不完。,可能一辈子也为了提高计算能力,并行计算应运而生。让成千上万的计算并行得完成一个任务,已经成为科学计算的必然选择。为了支持并行计算,以 MPI、OpenMP 为代表的一系列应运而生,而前者更是成为了科学计算的大规模并行业界标准。例如世界最快的天河二号由 16000 个节点组成,由MPICH 3.0.4 提供并行计算支持。MPI 是 1990 年代的产物,至今已经有 20 年历史了。这是在计算科学界最成并行计算。然而,进入 21 世纪之后,并行计算技术开始有了新的发展。2007 年左右,以6 / 33: 4006-

14、998-758Life is short, you need spark!CUDA 和OpenCL 为代表的通用GPU 并行计算技术开始兴起。每个GPU 上可以有数百甚至上千的计算,因此装配有高性能GPU 的工作站也可以作为一个小型的超级计算机来使用。由于达到同样的计算能力,GPU 比 CPU 价格低数倍,使得计算成本大幅度降低。另外,理器el 公司开始推出hi 协处理器,将 60-70 个计算集成在一起,可以在处进行小规模的并行计算。这两类技术在先前介绍的天河二号以及其他众多超级计算机中已经有所使用,也是天河二号得以登上世界超级计算机榜的重要原因。在2005 年了MapReduce 大规模数

15、据并行处理架构。而后,MapReduce的一个开源实现 Hadoop 使得 MapReduce 得到长足发展。这种并行计算在大规模数据处理方面取得了巨大的成功。国内外众多企业开始应用 Hadoop 运行它们的业务,如Amazon 也推出了它们的基于Hadoop 的MapReduce。近日,Spark 进入人们的视野。Spark 并不拘泥于 MapReduce 服务,它提供了的功能如机器学习、图计算等。Spark 采用内存进行数据分布和计算,将 MapReduce 的运行速度提高了约 100 倍。然而,在计算科学界,Spark 仍然没有引起计算科学工作者的注意。本文将对 Spark 进行简单介绍

16、,并且阐述它在未来有可能取代 MPI,成为计算科学进行大规模并行计算的新标准。1.2现行计算科学并行标准:MPI说到并行计算,不得不向大家简要阐述一下现行的计算科学进行并行计算的业界标准:MPI。MPI 全称 Message Passingerface,即消息传递接口。它本身是一个标准,两个流行的实现是 MPICH 和OpenMPI(注意不要与OpenMP 搞混)。MPI 标准定义了库函数的语法和语义,以供在众多编程语言中进行实现,如Fortran,C,C+等。它采用将同一程序运行在众多进程上的方式来实现。这些进程不仅可以处于同一个共享内存架构的计算机中,也可以运行在由网络互联的不同机器上。每

17、个进拥有各自的数据,在不需要通讯时,各个进程异步地运行相同的代码,对不同的数据进行相似的处理;在需要其他进程上的数据时,通过 MPI 标准中定义的数据通讯函数对数据进行迁移和处理,也可对操作进行同步。1.2.1MPI 代码模版进程是 MPI 并行处理的运行单元,进程之间以进程号作为区分。在最初的标准中,进程的数量是固定的,由运行程序时指定。这在后来的版本中有所修改,新版本的 MPI 支持动态产生新的进程。在运行时,每个进程事实上在运行完全相同的程序代码,仅仅在运行时通过检查自身的进程,来对数据进行本进程独立的操作。代码段 1.1 展示了最简单的MPI 并行程序代码模版。7 / 33Spark

18、亚太群Life is short, you need spark!代码段 1.1. MPI C 语言代码模版#include mpi.hmain(argc,char *argv)myid, numprocs;. / other declarationsMPI_Init(&argc,&argv);m_size(M_WORLD,&numprocs);M_WORLD,&myid);m_r(/ initialize and get current pros id and total number of proses. / do some workif (myid = .) . / for the sp

19、el pros, do some work. / do some workif (myid = .) . / for the spe.l pros, do some workMPI_Finalize();return 0;8 / 33: 4006-998-758Life is short, you need spark!1.2.2MPI 程序的编译运行对于C 语言编写的MPI 程序,程序编译指令为:$ mpicc -O2 -o cpi ./cpi.c它与gcc令几乎相同,编译选项也类似,只是将gcc 改为 mpicc 罢了。由于程序被成很多份运行在不同的进程上,在程序开始时,每个进程首先调用M

20、PI_Init 来初始化环境,然后就调用m_size 和m_r来分别获取该进程的进程号,和总的进程数目。总的进程数目是在调用程序$ mpirun -n 4 ./cpi令行中指定的,如:上面这个命令制定了开启 4 个进程来运行这个程序。一般来说,编写 MPI 并行程序要求程序在任何进程数目下都可以正确运行。另外,当进程数目与数目相同时运行速度最快,但进程数目可以低于数目,这样有的派不上用场;进程数目也可以高于数目,这样就有多个进程共用一个的情况。如果在运行程序时采用 top 指令查看进程,就可以看到图 1.2 所示显示。图 1.2. top 指令查看 $ mpirun -n 4 ./cpi 运行

21、时的进程状态图 1.2 中可以看到,有 4 个cpi 进程,由于它们同时被创建,因此它们的进程号是连续的。注意这里的进程号是操作系统的进程号,在 MPI 程序中的进程号是从 0 开始连续的。也可以采用与正常运行程序相同令来运行 mpicc 编译的程序,如:$ ./cpi这样运行时就是按照普通串行程序的方式进行运行的,基本相当于mpirun -n 1 ./cpi。9 / 33Spark 亚太群Life is short, you need spark!由于在编程时保证了不同进程数目情况下程序都能正确运行,因此这样运行理应也可以得到正确的结果。1.2.3MPI 数据通讯在程序定义变量部分,所有的变

22、量数据对于进程来说都是私有的。一般来说,同一个程序的两个进程会有完全相同的变量名称。但是这些变量的值在不同的进程上不同。无法从一个进程上直接另外一个进程的数据,这保证了进程之间相互独立的关系。当需要与其他进程进行数据交流时,有一系列 MPI 定义的库函数来完成运行。这些函数是 MPI 中最常用的函数:数据通讯函数,分为点对点通讯函数和聚合通讯函数两大类。表 1.1 列出了 MPI 中最重要的通讯函数。当然,MPI 有上百个库函数,下面列出来的是最常用的一些函数,其它函数许多都是下面列出来函数的扩展,来实现更加复杂的功能,或者是一些辅助函数、文件读写函数。表 1.1. MPI 中常用的通讯函数从

23、上面可以看出,MPI 最重要的函数都是用来处理数据的移动,如果程序运行所需要的数据本地有,就在本地进行处理。如果本地没有相应的数据,就把数据通过库函数传递数10 / 33: 4006-998-758函数名描述点对点通讯MPI_Send数据发送进程 i 将一块数据从发送到进程j,与Recv 配对MPI_Recv数据接收进程 i 进程j 接收一块数据,与 Send 配对聚合通讯一对多MPI_Bcast数据广播将某一进程的一块数据发送到所有进程MPI_Scatter数据散发将某一进程的数据序列分块分发,第 i 块数据发送到进程 i多对一MPI_Gather数据收集将所有进程的数据收集组合到某一进程,

24、数据散发的逆操作MPI_Reduce数据规约将所有进程的数据按照某种运算操作规约到某一进程多对多MPI_Allgather数据全收集将所有进程的数据块收集组合到所有进程,相当于收集广播MPI_Alltoall数据全收集散发对所有 n 个进程中的 n 块数据的数据块,进程 i 将第 j 块数据发送到进程 j 的第 i 数据块,相当于方阵转秩操作同步MPI_Barrier同步运行到该函数暂停,等待所有进程运行到该函数后继续运行Life is short, you need spark!据到本地进行处理。MPI 模型可以总结为“代码不动数据动”,即进程分布于不同的计算机上,进程运行需要其他进程上的数

25、据时,将其他进程上的数据移动到本地继续进行处理。1.3新的并行计算的兴起:Spark2009 年,Spark 作为UC BerkeleyLab 的研究项目立项,并且在 2010 年开源。了一系列研究性文章。Lab 仍然在继续进行优化Spark 和在Spark 基础上进行开发的研究,比如 Spark SQL、Spark Streaming、MLLib、GraphX 等。在 Spark 发布之后,2014 年初,Spark 成为 Apache开发在进行着Spark 的开发工作。的顶级项目。大约有来自 50 个公司的超过 250 个1.3.1Spark 的:RDDSpark 的是RDD,是 resi

26、nt distributed dataset(弹性分布式数据集)的简写。RDD 是可并行的容错数据集,Spark,包括其上层架构都是以 RDD 为基础的。RDD 可以理解为包含许多相同数据类型的分布在集群上的数据元素集合,它隐式地对数据进行了分布式,使得对RDD 的操作就像对一个本地数据集进行操作那样简单,而不需要关心背后众多数据在集群中的分布。RDD 中的数据在集群中不同的节点可以有多份拷贝,这有两大好处:1、在集群中某些节点失效时仍然保持数据的完整性;2、增加在本地可以开销,加快运行速度。到所需数据的概率,减少网络对于数据结构RDD,Spark中定义了两大类方法 1:Transformat

27、ion与Action。其中,常用的Transformation与Action列于表 1.2。表 1.2. Spark 中常用的 RDD 数据操作方法1C,Fortran 使用者可能对“方法”的概念不了解,它指对象的成员函数。可以直接理解为函数即可。11 / 33Spark 亚太群方法名描述Transformationmap(func)对所有元素应用 func 函数,返回新的 RDDflatMap(func)弹性对所有元素应用 func 函数,func 函数可能返回 0多个结果,组合到新的 RDD 返回Life is short, you need spark!从表1.2 列出的RDD 方法可以

28、看出,Spark 中由于数据是隐式分布在分布式环境中的,因此RDD 的操作主要是对于元素的操作,而没有关注元间的通讯,因为分布式环境中的通讯是隐式实现的。由于Spark 基于Scala,而 Scala 是一种函数式编程语言。因此,向RDD 中传递函数对数据操作就显得非常自然。与 MPI 将数据传递给操作数据的进程的模型相反,在 Spark 中,运行的模型是将对数据的操作(函数)传递给数据。因此,Spark 模型可以总结为“数据不动代码动”。值得一提的是,虽然 Spark 是由 Scala 语言编写完成的,它也有两种流行语言:Java与的 API。因此,使用 Spark 的人通常不需要为了使用S

29、park 而多学门语言。而且Spark 安装十分方便,事实上只需要对的包解压,即可进行本地的Spark 操作。从表 1.2 也可以看出,所有的 Transformation 方法都返回新的 RDD。这里有两明:1、所有的 RDD 都是不可变的,也就是说,无法修改RDD 中的元素,要个特性需对元素进行操作,只能生成新的 RDD;2、所有的 Transformation 方法都是懒(lazy)执行的,这是Scala 的一个特性,即Transformation 方法只有在需要它的结果时,它才会被计算。如果一个 Transformation 方法的结果到程序结束也没有被使用,那这个 Transform

30、ation 方法不会被计算。而 Action 方法则不同,Action 方法的返回值都不是新的 RDD,而是值或序列,12 / 33: 4006-998-758filter(func)过滤返回新的包含所有应用func 函数返回值为true 的元素的 RDDreduceByKey(func, numTasks)逐键规约对于元素类型为(K,V)对的 RDD,对所有的 K,对 V按照 func 函数进行规约操作,返回(K,V)对的 RDDjoin(otherDataset, numTasks)连接对于元素类型为(K,V)、(K,W)的两个 RDD,返回元素类型为(K,(V,W),包含所有(V,W)对

31、的 RDDAction()首元素返回 RDD 中第一个元素reduce(func)规约对所有的元素,按照(V,V) = V 类型的函数 func 进行规约,返回 V 类型的值collect()收集返回一个包含有 RDD 中所有元素的 Arraycount()计数返回 RDD 中元素的个数foreach(func)逐元素操作对于每个元素进行 func 操作,不返回takeOrdered(n,ordering)排序选取返回按照默认或自定义排序后的前n 个元素Life is short, you need spark!或是将数据进行等。Action 方法都是被立刻执行的。一般来说,在程序中,Acti

32、on 方法之前一般都有多个懒执行的 Transformation 方法,在调用 Action 方法时, Transformation 方法才真正被执行。综合来看,Spark 的编程模型为先基于原始数据生成RDD,然后对 RDD 进行一系列Transformation 转换,最后对 RDD 进行Action 操作对数据进行汇总得到结果,输出到控制台或到本地或分布式的文件系统中。这里简单提一下 Spark 文件操作。Spark 不仅可以对本地的文件系统进行读写操作,还可以对分布式文件系统或数据源进行操作,如 HDFS、Cassandra、HBase、Amazon S3 等。对于分布式的文件系统,读

33、写操作也可以以高效的方式并行进行,但这不是本文的重点。请参考或其它资料了解相关信息。1.3.2Spark 程序的编译运行Spark 程序执行在 Java 虚拟机(JVM)之上,可以将 Spark 源代码编译成为打包的.jar文件(Scala 或java)或.py 文件(应用程序,最简单的办法是启动Spark SScala:)提交给 Spark进行运行。然而,对于小型进行交互式运行:$ ./bin/spark-s或:$ ./bin/pyspark这样就进入了 Scala S或S环境,同时得到一个预定义的 sc:SparkContext。可以使用这个变量进行Spark 操作。如果是采用源文件编译的

34、方式来编译运行Spark 程序,则需要自行定义SparkContext。一个Scala 语言的Spark 程序模版如下:代码段 1.2. Spark Scala 语言代码模版importimport import.apache.spark.SparkContext.apache.spark.SparkContext._.apache.spark.SparkConfobject . / . name of object13 / 33Spark 亚太群Life is short, you need spark!def main(args: ArrayString) val conf = new S

35、parkConf().setAppName(.) / . name of app val sc = new SparkContext(conf)/ initialize SparkContext. / Spark program codessc.stop()在代码开端,初始化了 Spark 环境,而后就可以基于 Spark最后结束Spark 环境。另外,还需要import 相关的类。对数据进行操作,在Scala 或java 源代码一般使用 sbt 或 mvn 进行编译,sbt 指令为:$ sbt package具体编译细节请参考或相关书籍,此处略去。编译后得到.jar 文件,就可以提交给Spa

36、rk进行运行,示例命令为:$ spark-submit -class Pi ./pi_2.10-1.0.jar源代码不需要编译,可以直接提交给 Spark个.jar 文件参数换为相应的.py 文件即可。,命令只需要把上面的最后一提交后就由Spark 并行地运行程序,它运行在java 虚拟机之上。无论是采用提交文件方式还是在 spark s中运行的方式,使用top 指令查看进程,可以看到如图 1.3 所示显示:图 1.3. top 指令查看 $ spark-submit -class Pi ./pi_2.10-1.0.jar 运行时的进程状态与 MPI 不同,top 指令中仅仅显示了一个 jav

37、a 进程,但从 CPU 使用率上看,它使用了测试机器上的全部,因此Spark 即使在单机上也是并行运行的。14 / 33: 4006-998-758Life is short, you need spark!1.3.3Spark 生态环境Spark 的简单来说就是数据结构 RDD 以及 RDD 相关的一系列操作。在这个架构之上,活跃的 Spark 开发者已经开发出了多个库,使得 Spark 形成了一个丰富的生态系统。图 1.4. Spark 生态环境如图 1.4 所示,基于Spark 发展起来了Spark SQL 用于进行传统关系型数据库操作、Spark Streaming 用于实时流处理、M

38、LLib 以RDD 为基础实现了矩阵并用于机器学习。另外在发展中的项目还有SparkR 用于在 R 语言中并行处理数学计算问题、GraphX 用于进行图计算等。其中,MLLib 与SparkR 对于计算科学最为有用,前者定义了计算科学中具有重要意义的矩阵,后者提供了数学研究者常用的 R 语言支持。这些库使得计算科学工作者不需要完全从RDD 开始编写自己的程序,而可以直接调用相应的类进行自己的工作。当然,这些Spark 库目前还不完善,如 MLLib 矩阵还没有提供求矩阵特征值的相关操作。但 Spark 是一个年轻的项目,相信这些类库会在日后不断完善,供计算科学工作者使用。15 / 33Spar

39、k 亚太群Life is short, you need spark!第 2 章Spark 的优势2.1简易的编程模型2.1.1隐式的计算资源管理和分配在Spark 中,数据是分布式在集群中的,但与 MPI 中不同,用户并不需要了解和指定数据在集群中究竟是如何分配的。所有的数据分布细节都隐藏在 RDD 数据结构背后,由Spark通过它的算法进行分配、平衡以及优化。这对于编程者来说是个大好消息,因为在 MPI 中,程序设计的很大一部分任务是设计数据在节点间的分配问题。这样定制出来的数据分配可能是高效的,也可能是低效的,但无论如何都消耗掉了计算科学工作者的大量时间,使它们不能把自己的工作重心放在发

40、展方法上,而是考虑如何把自己的方法实现上。由于不用关心数据的分配使得 Spark 的使用门槛非常低,只需要在原先串行的程序上稍作修改,将循环改为RDD 操作,即可完成串行程序向并行程序的转变。2.1.2简洁的方法调用用过 MPI 的人都知道,MPI 的库函数普遍具有很多参数需要明确。在表 1.1 列出的常用函数中,除了 MPI_Barrier 这个函数(它不涉及到数据发送接收)只有 1 个参数外,其它函数的参数数目最少为 5 个,通常为 7 个左右。最简单的通讯函数 MPI_Bcast 函数签名(C 语言)如下:MPI_Bcast(void *buf,count, MPI_Daype dayp

41、e,root,m comm)其中各参数的意义如表 2.1 所示表 2.1. MPI_Bcast 函数各参数含义16 / 33: 4006-998-758参数含义明确void *buf通信消息缓冲区的起始地址数据在哪里存?count将广播接收的数据个数数据有多长?MPI_Daype daype广播接收数据的数据类型数据是什么类型?root广播数据的根进程的进程号数据从哪里发?m comm通信器:包含所有参与本次通讯的进程数据谁接收?Life is short, you need spark!事实上,几乎所有的 MPI 通讯函数都需要明确与表 2.1 中类似的一些问题。其它的常用通讯函数的参数数目

42、。这么多参数对于编程者来说总是很不友好的。这是由 MPI 编程的本质决定的。由于 MPI 以进程为中心,进程以进程号为唯一标识,需要进行通讯的进程的集合通信器,在通讯时,总是需要明确数据是从哪些进程发送到哪些进程的。另外,对于所传输的数据,还必须明确数据在哪里?多大?什么类型?这些问题成为了 MPI 使用者编程时花最多时间考虑。另外,由于调用 MPI 程序的进程数目是在调用MPI 程序时指定的,而不是在程序指定的,这就带来很烦:编程者没办法事先知道进程的数量。所以,上面的函数调用参数中大多数在程序中本身就是变量,通常需要在程序运行开始时通过函数调用m_size(M_WORLD,&numproc

43、s);m_r(M_WORLD,&myid);取得进程总数和当前进程号,再通过这些信息计算出每个进程多少数据、数据发送的源进程、目标进程号,才能确定一个数据通讯函数中的参数。这对于编程者来说是巨大的。在Spark 所提供的所有RDD 方法中,除了两个取样方法:sle(withReplacement, fraction, seed)与takeSle(withReplacement, num, seed)需要 3 个参数外,其它的所有方法最多只需要一个参数 2。这使得程序的编写变得非常容易。这是因为RDD是隐式分布的,Spark本身就帮忙处理了大多数进程问题与数据位置、类型、大小问题。因此,RDD方

44、法的调用非常简单。也使得Spark成为了一个非常容易掌握的并行编程工具。2.1.3面象的编程由于的构造越来越复杂,传统的面向过程编程模式越来越难以适应迅速发展的代码2从表 1.2 可以看出,许多方法在其函数签名中了多于一个参数,但是处于方括号中的参数不是必须的。如reduceByKey(func, numTasks)的第二个参数numTasks指定任务个数就不是必须的,事实上一般人调用这些方法也不会去指定任务个数,Spark 会自动确定这个参数,通常比自行指定的参数效率要高。17 / 33Spark 亚太群Life is short, you need spark!规模,面象编程在 1990

45、年代中期成为主流的编程模式。然而,在计算科学界,由于对于科学问题的演算本身是一步一步来解决的,本身似乎用面向过程的来编程解决更加自然。导致直到现在,面向过程编程仍然是计算科学程序的主要编程模式,Fortran 与 C 仍然是主要的编程语言。但是,即使是计算科学程序,经过几十年的发展,现在也变得越来越复杂。已经有的研究组放弃面向过程编程,转向采用面象的方式进行计算科学程序的架构,如由 Emory等大学合作开发的计算化学程序 Psi4。作者认为,计算科学界正在经历由面向过程编程向面象编程的模式转变。MPI 是在面向过程式编程的背景下设计完成的。因此,它的仍然采用的是面向过程的,以进程为中心,数据与

46、操作分离。这样,MPI 编写的程序如果要添加新的功能,扩展起来比较。而对于 Spark 则不同,Spark 采用面象的编程编写,以数据为中心。Spark象特性的语言。因此,采用 Spark所支持的三种语言:Scala、Java、都是具有面的程序可以非常容易地进行扩展。对于计算科学工作者来说,有新的想要向已有的程序里面添加,是十分普遍的需求。因此,采用易扩展的Spark 可以让研究工作更加轻松。2.2稳定的运行由于计算科学所涉及的计算量极大,一个计算科学的程序运行数天甚至数月是非常普遍的现象。然而,如果在这个过程中,有一个计算节点出错,则长时间的计算就可能功亏一篑。这种情况有可能发生,需要强大的

47、并行计算来对异常状况进行容错。然而,MPI 本身几乎没有容错能力,这是因为 MPI 以进程为中心,而进程与节点是在程序运行初期绑定的。如果某个节点出错,极可能导致整个程序的异常退出,这是计算科学工作者极其不愿看到的状况。在 Spark 中则不用担心这个问题,因为 Spark具有强大的容错能力。这些容错能力都隐藏在RDD 这个数据结构之后。其主要机制如下:1、RDD 中的数据在集群的不同节点的内存中有多份拷贝,这样可以在某些节点失效的情况下保持数据的完整性。如果由于某些节点失效引起数据损失,这些损失的数据可以从原始数据的拷贝中重新计算恢复。所有的操作都是容错的。一旦操作完成,保证结果的正确性。2

48、、3、这样,程序在运行时可以自动从节点失效中迅速恢复过来,通过将任务分配给其它节点,将数据恢复并且继续进行计算。其中,检测节点失效是通过心跳包的机制来完成的,Spark 默认 15s 对集群中所有机器进行一次失效检测。如果检测到节点失效,则立即将失效节点未完成的工作分配给其它节点完成。18 / 33: 4006-998-758Life is short, you need spark!这样的容错机制,使得计算科学工作者可以摆脱节点失效而使整个程序瘫痪科学计算保驾护航。为2.3计算资源的弹性扩展在 MPI 中,不仅进程数目是在程序运行任务提交前就确定好的,用于进行计算的节点也是程序运行前就指定好

49、的。在程序运行中,如果在集群中增加一些节点,MPI 程序是无法利用新加入的计算资源的。在运行的而 Spark 程序则不同,Spark 程序是由 Spark进行调度运行的,因此,可以在程序运行时向集群加入新的节点,正在运行中的程序可以及时到扩展的计算资源。这可能对于一个使用私有集群计算的计算科学并不会经常性的更新。然而如果计算运行在云来说似乎并没用,计算资源上,如 Amazon EC2 呢?如果希望程序能够计算快一点,就可以很方便的向云服务商多申请几个节点,直接加入正在运行中的Spark上,加速程序的运行。第 3 章从 MPI 到 Spark3.1Spark 编程Spark 与MPI 是不同的编

50、程模型。因此,Spark 有不同于 MPI 的编程。在 Spark中,数据是隐式分布在集群中,并以RDD 数据结构的形式体现。Spark 编程,其实就是构建 RDD,对 RDD 进行 transformation 转化,不断对元素进行变换形成新的 RDD。在必要时使用action 操作,从RDD 中提取所需的信息,得到结果。Spark 的实现语言 Scala 是一种函数式编程语言,所以 Spark 也有许多函数式编程的特性。比如将函数作为参数传递是很普通的事情。另外,Scala 也是一种面象的语言,面象编程的主要特点是封装、继承、多态。其中,封装是指将数据与对数据的操作封装在一起。在 Spar

51、k 中,数据是以 RDD 的形式体现的,而对数据的操作则是需要通过向 RDD传递函数来实现的。MPI 的实现语言主要是Fortran 与C、C+,虽然 C+都具有面象的特性,但整个 MPI 还是面向过程性质的。因此,MPI 中,进程是主体,而不断在进程之间传递的是数据。这些编程的不同,使得不能按照MPI 的编程来照搬到Spark 上来。因此,从使用MPI 转向使用Spark,需要编程进行转变。因为Spark 简单易懂,在编程上因为不需要处理数据的分配,比MPI 要容易,所以只19 / 33Spark 亚太群Life is short, you need spark!要认真体会,掌握Spark

52、编程并不需要太长时间。3.2MPI 中并行数据处理的Spark 实现在MPI 中,程序源代码本身就是一系列的数据操作。在大部分时候程序的各个进是按照相同的程序代码运行在不同的数据集上。在数据不在本地时,再通过消息传递函数从其它进程得到所需数据。MPI 运行时,首先将源代码到各个进程,之后运行源代码就不再修改,只是数据在不断传递。在 Spark 中则是数据隐式分布在系统中,通过指令向数据传递函数操作数据。Spark 中,也有很大一部分工作是RDD 元间并行进行的。这些并行处理通过一些RDD 的Transformation 方法来实现,最常用的是map(func)、flatMap(func)与fi

53、lter(func)。它们都接收一个函数作为参数。map(func)与 flatMap(func)都并行地对 RDD 中的每一个元素应用func 函数,这就相当于 MPI 中不同进程的并行数据处理。只是 MPI 中对数据的操作是在进程上的,而Spark 中是通过 RDD 方法传递函数进行处理的。map(func)与 flatMap(func)的区别是前者函数 func 应用于一个元素一定返回一个元素,而后者函数 func 应用于一个元素可能返回不定量个元素(一个元素序列)。filter(func)接收一个函数,并并行地对 RDD 中所有元素应用该函数。这个函数被要求返回bool 型变量true

54、false。形成的新 RDD 中只保留func 返回true 的元素。这三个函数基本上完成了类似于 MPI 中并行数据处理部分的工作。然而,当操作涉及到不止一个元素时,则需要通过调用其它 RDD 方法如 reduceByKey(func, numTasks)、 join(otherDataset, numTasks)等。3.3MPI 中常用消息传递函数的Spark 实现Spark 与 MPI 是不同的两种编程模型,前者以数据为中心,后者以进程为中心。它们自然有各自不同的编程风格,因此,完全用Spark 实现 MPI 函数是不合适的,事实上也完全没有必要。本质地,MPI 中的消息传递函数都是从进

55、程传递到进程的,然而在 Spark 中,进程这个概念本身就十分模糊,你可以认为实际的每个处理RDD 中每一个元素都是一个进程。是一个进程,也可以认为你可以直接跳过这一节去看下一章的样例比较。然而作者希望能够为 MPI 使用者提供一个从 MPI 到Spark 的桥梁,让他们在 Spark 中找到那些他们所熟悉的,同时又有些新含义的元素。在接下来几个小节的叙述中,你可能会觉得在寻求 RDD 方法与 MPI 函数之间关系时,进程的概念有些。需要明确的是,作者并没有打算通过 Spark 代码实现 MPI的库函数,而是去寻找它们之间的一些对应关系。20 / 33: 4006-998-758Life is

56、 short, you need spark!3.3.1聚合通讯虽然在MPI 中,点对点通讯更加基本。但在Spark 中,更普遍能找到的是聚合通讯函数的,而找到点对点通讯却比较。因此这里先介绍聚合通讯。一对多:MPI_Bcast(数据广播):这个函数的作用是将一个进程上的一个元素这个值。到所有的进程上,使所有的进拿到在 Spark 中,并没有一个 RDD 方法来对应这种行为,而是存在一个广播变量(Broadcast Variable)的概念。它将一个只读变量缓冲到所有的机器上,以方便使用。广播变量的创建方法为SparkContext.broadcast(v),示例如下:scala val br

57、oadcastVar = sc.broadcast(Array(1, 2, 3)如果需要广播变量的值,可以调用 value 方法:scala broadcastVar.valueres0: Array = Array(1, 2, 3)广播变量可以认为是MPI_Bcast 函数的直接对应,但使用它的目的与 MPI_Bcast 不完全相同。MPI_Bcast 的主要目的是把一个元素传递给所有的进程,供进程操作所使用。而Spark 中广播变量的主要目的是在需要多次数据时避免重复传送大量数据。这是因为MPI 传递的是数据,而Spark 传递的是操作,MPI_Bcast 传递数据是为了供操作而使用,而

58、Spark 将操作传递给数据对数据进行处理。因此,Spark 在传递操作时可以将需要“广播”的数据包含在所传递的操作中。由于 Spark 中对RDD 传递的函数(操作)会应用于 RDD 中的所有元素,因此它所携带的数据也相当于被广播了。在第 4 章代码段4.3 中会看到类似的携带“广播”数据的函数传递,而在对应的 MPI 程序中是调用 MPI_Bcast将变量广播的。MPI_Scatter(数据散发):这个函数将本地的数据切片散发给所有的进程,每个进持有原数据的一部分。它有一个类似的函数 MPI_Scatterv,功能基本相同,只是前者数据切分成长度相同的数据块分发给各个进程,后者可将数据切分

59、成不同长度的数据块。它的本质是对本地数据进行分布式,在 Spark 中实际对应着使用本地数据生成21 / 33Spark 亚太群Life is short, you need spark!RDD。而 RDD 就是分布式的数据集。不过在RDD 中,没有明确的进程归属的概念,即并没有明确哪部分数据是属于哪个进程的。事实上,可以有多个进Spark 中可以由程序中的数据生成RDD,如:val data = Array(1, 2, 3, 4, 5)拥有同一个数据的拷贝。val distData = sc.parallelize(data)也可以由本地的文件生成RDD,如:val distFile = s

60、c.textFile(daxt)这样就完成了本地数据向分布式数据的转换,这也是 MPI_Scatter 函数所实现的。多对一:MPI_Gather(数据收集):这个函数将分布于各个进程的数据收集组合到某一进程。是MPI_Scatter 的逆操作。它有一个类似的函数 MPI_Gatherv,功能基本相同,只是后者从各个进程接收到的数据可以长短不一。它的本质是对分布式的数据本地化,在 Spark 中对应着collect()这个 action,它将 RDD中的所有元素组一个 Array 返回。完成分布式数据到本地数据的转化。MPI_Reduce(数据规约):这个函数将分布于各个进程的数据通过某种操作

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论