第3章 Spark编程基础_第1页
第3章 Spark编程基础_第2页
第3章 Spark编程基础_第3页
第3章 Spark编程基础_第4页
第3章 Spark编程基础_第5页
已阅读5页,还剩60页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Spark编程基础2021年,某公司为了提高员工工作的积极性,将对公司员工进行一次调薪,需要根据员工在2020年的薪资情况及在职表现重新调整薪资,对于爱岗敬业的公司员工,公司拟根据其业绩分析情况予以不同程度涨薪。公司有员工2020年上半年薪资文件(Employee_salary_first_half.csv)和下半年薪资文件(Employee_salary_second_half.csv),两份文件的数据格式和数据字段均相同,以员工2020年上半年的薪资文件Employee_salary_first_half.csv为例,文件共有10个数据字段。任务背景字段名称说明

字段名称说明EmpID员工IDGROSS总薪资Name姓名Net_Pay实际薪资Gender性别Deduction薪资扣除部分Date_of_Birth出生日期Designation职位Age年龄Department部门为了保证较高的数据处理效率,将使用Spark统计每一位员工2020年的薪资情况。本章将介绍SparkRDD的创建方法、RDD和键值对RDD的转换操作和行动操作的基础使用,并通过Spark编程实现员工薪资数据的统计分析。任务背景1查询上半年实际薪资排名前3的员工信息目录读取员工薪资数据创建RDD2查询上半年或下半年实际薪资大于20万元的员工姓名3RDD是一个容错的、只读的、可进行并行操作的数据结构,是一个分布在集群各个节点中的存放元素的集合。RDD的创建有3种不同的方法。第一种是将程序中已存在的Seq集合(如集合、列表、数组)转换成RDD。第二种是对已有RDD进行转换得到新的RDD,这两种方法都是通过内存中已有的集合创建RDD的。第三种是直接读取外部存储系统的数据创建RDD。为方便后续的员工薪资分析,本节的任务是读取员工上半年和下半年的薪资数据创建RDD。任务描述parallelize()方法有两个输入参数,说明如下。要转化的集合,必须是Seq集合。Seq表示序列,指的是一类具有一定长度的、可迭代访问的对象,其中每个数据元素均带有一个从0开始的、固定的索引。分区数。若不设分区数,则RDD的分区数默认为该程序分配到的资源的CPU核心数。从内存中读取数据创建RDD1.parallelize()makeRDD()方法有两种使用方式。第一种方式的使用与parallelize()方法一致;第二种方式是通过接收一个是Seq[(T,Seq[String])]参数类型创建RDD。第二种方式生成的RDD中保存的是T的值,Seq[String]部分的数据会按照Seq[(T,Seq[String])]的顺序存放到各个分区中,一个Seq[String]对应存放至一个分区,并为数据提供位置信息,通过preferredLocations()方法可以根据位置信息查看每一个分区的值。调用makeRDD()时不可以直接指定RDD的分区个数,分区的个数与Seq[String]参数的个数是保持一致的。从内存中读取数据创建RDD2.makeRDD()从外部存储系统中读取数据创建RDD是指直接读取存放在文件系统中的数据文件创建RDD。从内存中读取数据创建RDD的方法常用于测试,从外部存储系统中读取数据创建RDD才是用于实践操作的常用方法。从外部存储系统中读取数据创建RDD可以有很多种数据来源,可通过SparkContext对象的textFile()方法读取数据集,该方法支持多种类型的数据集,如目录、文本文件、压缩文件和通配符匹配的文件等,并且允许设定分区个数。从外部存储系统中读取数据创建RDD分别读取HDFS文件和Linux本地文件的数据并创建RDD,具体操作如下。通过HDFS文件创建RDD直接通过textFile()方法读取HDFS文件的位置即可。通过Linux本地文件创建RDD本地文件的读取也是通过sc.textFile("路径")的方法实现的,在路径前面加上“file://”表示从Linux本地文件系统读取。在IntelliJIDEA开发环境中可以直接读取本地文件;但在spark-shell中,要求在所有节点的相同位置保存该文件才可以读取它.从外部存储系统中读取数据创建RDD读取员工上、下半年薪资数据,并创建RDD。由于数据比较多,因此适合通过读取HDFS上的数据创建。首先需要将数据上传至HDFS的/user/root目录下。在spark-shell中读取HDFS上的员工上、下半年薪资数据,创建RDD。任务实现1查询上半年实际薪资排名前3的员工信息目录读取员工薪资数据创建RDD2查询上半年或下半年实际薪资大于20万元的员工姓名3SparkRDD提供了丰富的操作方法用于操作分布式的数据集合,包括转换操作和行动操作两部分。转换操作可以将一个RDD转换为一个新的RDD,但是转换操作是懒操作,不会立刻执行计算;行动操作是用于触发转换操作的操作,这时才会真正开始进行计算。本节的任务如下。使用RDD的基本操作完成对员工上半年实际薪资的排名。找出薪资排名前3的员工信息。任务描述map()方法是一种基础的RDD转换操作,可以对RDD中的每一个数据元素通过某种函数进行转换并返回新的RDD。map()方法是转换操作,不会立即进行计算。转换操作是创建RDD的第二种方法,通过转换已有RDD生成新的RDD。因为RDD是一个不可变的集合,所以如果对RDD数据进行了某种转换,那么会生成一个新的RDD。使用map()方法转换数据sortBy()方法用于对标准RDD进行排序,有3个可输入参数,说明如下。第1个参数是一个函数f:(T)=>K,左边是要被排序对象中的每一个元素,右边返回的值是元素中要进行排序的值。第2个参数是ascending,决定排序后RDD中的元素是升序的还是降序的,默认是true,即升序排序,如果需要降序排序那么需要将参数的值设置为false。第3个参数是numPartitions,决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的分区个数相等,即this.partitions.size。第一个参数是必须输入的,而后面的两个参数可以不输入。使用sortBy()方法进行排序collect()方法是一种行动操作,可以将RDD中所有元素转换成数组并返回到Driver端,适用于返回处理后的少量数据。因为需要从集群各个节点收集数据到本地,经过网络传输,并且加载到Driver内存中,所以如果数据量比较大,会给网络传输造成很大的压力。因此,数据量较大时,尽量不使用collect()方法,否则可能导致Driver端出现内存溢出问题。使用collect()方法查询数据collect()方法有以下两种操作方式。collect:直接调用collect返回该RDD中的所有元素,返回类型是一个Array[T]数组。collect[U:ClassTag](f:PartialFunction[T,U]):RDD[U]。这种方式需要提供一个标准的偏函数,将元素保存至一个RDD中。首先定义一个函数one,用于将collect方法得到的数组中数值为1的值替换为“one”,将其他值替换为“other”。使用collect()方法查询数据flatMap()方法将函数参数应用于RDD之中的每一个元素,将返回的迭代器(如数组、列表等)中的所有元素构成新的RDD。使用flatMap()方法时先进行map(映射)再进行flat(扁平化)操作,数据会先经过跟map一样的操作,为每一条输入返回一个迭代器(可迭代的数据类型),然后将所得到的不同级别的迭代器中的元素全部当成同级别的元素,返回一个元素级别全部相同的RDD。这个转换操作通常用来切分单词。使用flatMap()方法转换数据take(N)方法用于获取RDD的前N个元素,返回数据为数组。take()与collect()方法的原理相似,collect()方法用于获取全部数据,take()方法获取指定个数的数据。获取RDD的前5个元素使用take()方法查询某几个值查询上半年实际薪资排名前3的员工信息,需要对上半年的实际薪资进行排序,而创建RDD时,textFile()方法是将每一行数据作为一条记录存储的,所以在排序前需要先对数据进行转换,实现步骤如下。读取CSV文件,将第一行字段名称删除。将数据按分隔符“,”分隔,取出第2列员工姓名和第7列实际薪资数据,并将实际薪资数据转换成Int类型数据。通过sortBy()方法根据实际薪资进行降序排列。通过take()方法获取上半年实际薪资排名前3的员工信息。任务实现1查询上半年实际薪资排名前3的员工信息目录读取员工薪资数据创建RDD2查询上半年或下半年实际薪资大于20万元的员工姓名3Spark的转换操作和行动操作,除了可以针对一个RDD进行操作,也可以进行RDD与RDD之间的操作。本节的任务如下。查询上半年或下半年实际薪资大于20万元的员工姓名。将最终结果合并为一个RDD。任务描述union()方法是一种转换操作,用于将两个RDD合并成一个,不进行去重操作,而且两个RDD中每个元素中的值的个数、数据类型需要保持一致。使用union()方法合并两个RDD使用union()方法合并多个RDDfilter()方法是一种转换操作,用于过滤RDD中的元素。filter()方法需要一个参数,这个参数是一个用于过滤的函数,该函数的返回值为Boolean类型。filter()方法将返回值为true的元素保留,将返回值为false的元素过滤掉,最后返回一个存储符合过滤条件的所有元素的新RDD。创建一个RDD,并且过滤掉每个元组第二个值小于等于1的元素。使用filter()方法进行过滤distinct()方法是一种转换操作,用于RDD的数据去重,去除两个完全相同的元素,没有参数。创建一个带有重复数据的RDD,并使用distinct()方法去重。使用distinct()方法进行去重Spark中的集合操作常用方法(转换操作)使用简单的集合操作方法描述union()参数是RDD,合并两个RDD的所有元素intersection()参数是RDD,求出两个RDD的共同元素subtract()参数是RDD,将原RDD里和参数RDD里相同的元素去掉cartesian()参数是RDD,求两个RDD的笛卡儿积intersection()方法用于求出两个RDD的共同元素,即找出两个RDD的交集,参数是另一个RDD,先后顺序与结果无关。创建两个RDD,其中有相同的元素,通过intersection()方法求出两个RDD的交集。使用简单的集合操作intersection()方法subtract()方法用于将前一个RDD中在后一个RDD出现的元素删除,可以认为是求补集的操作,返回值为前一个RDD去除与后一个RDD相同元素后的剩余值所组成的新的RDD。两个RDD的顺序会影响结果。创建两个RDD,分别为rdd1和rdd2,包含相同元素和不同元素,通过subtract()方法求rdd1和rdd2彼此的补集。使用简单的集合操作subtract()方法cartesian()方法可将两个集合的元素两两组合成一组,即求笛卡儿积。创建两个RDD,分别有4个元素,通过cartesian()方法求两个RDD的笛卡儿积。使用简单的集合操作cartesian()方法输出上半年或下半年实际薪资大于20万元的员工姓名。首先需要过滤出两个RDD中实际薪资大于20万元的员工姓名。再将两个RDD得到的员工姓名合并到一个RDD中,对员工姓名进行去重。即可得到上半年或下半年实际薪资大于20万元的员工姓名。任务实现4查询每位员工2020年的月均实际薪资目录输出每位员工2020年的总实际薪资5存储汇总后的员工薪资为文本文件6键值对RDD存储二元组,二元组分为键和值,RDD的基本转换操作对于键值对RDD也同样适用。因为键值对RDD中包含的是二元组,所以需要传递的函数会由原来的操作单个元素的函数改为操作二元组的函数。本节的任务如下。计算每位员工2020年的总实际薪资,要求对上、下半年员工的实际薪资进行相加。任务描述Spark的大部分RDD操作都支持所有种类的单值RDD,但是有少部分特殊的操作只能作用于键值对类型的RDD。顾名思义,键值对RDD由一组组的键值对组成,这些RDD被称为PairRDD。PairRDD提供了并行操作各个键或跨节点重新进行数据分组的操作接口。例如,PairRDD提供了reduceByKey()方法,可以分别规约每个键对应的数据,还有join()方法,可以把两个RDD中键相同的元素组合在一起,合并为一个RDD。了解键值对RDD有很多种创建键值对RDD的方式,很多存储键值对的数据格式会在读取时直接返回由其键值对组成的PairRDD。当需要将一个普通的RDD转化为一个PairRDD时可以使用map函数来进行操作,传递的函数需要返回键值对。创建键值对RDD使用键值对RDD的keys和values方法键值对RDD,包含键和值两个部分。Spark提供了两种方法,分别获取键值对RDD的键和值。keys方法返回一个仅包含键的RDD。values方法返回一个仅包含值的RDD。当数据集以键值对形式展现时,合并统计键相同的值是很常用的操作。reduceByKey()方法用于合并具有相同键的值,作用对象是键值对,并且只对每个键的值进行处理,当RDD中有多个键相同的键值对时,则会对每个键对应的值进行处理。reduceByKey()方法需要接收一个输入函数,键值对RDD相同键的值会根据函数进行合并并且创建一个新的RDD作为返回结果。使用键值对RDD的reduceByKey()方法在进行处理时,reduceByKey()方法将相同键的前两个值传给输入函数,产生一个新的返回值,新产生的返回值与RDD中相同键的下一个值组成两个元素,再传给输入函数,直到最后每个键只有一个对应的值为止。reduceByKey()方法不是一种行动操作,而是一种转换操作。使用键值对RDD的reduceByKey()方法groupByKey()方法用于对具有相同键的值进行分组,可以对同一组的数据进行计数、求和等操作。对于一个由类型K的键和类型V的值组成的RDD,通过groupByKey()方法得到的RDD类型是[K,Iterable[V]]。使用键值对RDD的groupByKey()方法在读取上、下半年员工薪资数据并将其转换为RDD的过程中,已经将数据转换成键值对RDD。统计每位员工2020年的总实际薪资,首先需要将数据合并到一个RDD中,通过相同的键对同一个员工的上半年实际薪资和下半年实际薪资进行累加,实现步骤如下。获取上、下半年员工薪资数据并将其转换为RDD,分别为split_first和split_second。使用union()方法将两个RDD合并成一个新的RDD。通过reduceByKey()方法统计员工总实际薪资并输出结果。任务实现4查询每位员工2020年的月均实际薪资目录输出每位员工2020年的总实际薪资5存储汇总后的员工薪资为文本文件6在Spark中,键值对RDD提供了很多基于多个RDD的键进行操作的方法。本节的任务是输出每位员工2020年的每月平均实际薪资,需要先计算每位员工2020年的总实际薪资,再求出每位员工2020年的月均实际薪资。任务描述将有键的一组数据与另一组有键的数据根据键进行连接,是对键值对数据常用的操作之一。与合并不同,连接会对键相同的值进行合并,连接方式多种多样,包含内连接、右外连接、左外连接、全外连接,不同的连接方式需要使用不同的连接方法。连接方法如下表。使用join()方法连接两个RDD连接方法描述join()对两个RDD进行内连接rightOuterJoin()对两个RDD进行连接操作,确保第二个RDD的键必须存在(右外连接)leftOuterJoin()对两个RDD进行连接操作,确保第一个RDD的键必须存在(左外连接)fullOuterJoin()对两个RDD进行全外连接join()方法用于根据键对两个RDD进行内连接,将两个RDD中键相同的数据的值存放在一个元组中,最后只返回两个RDD中都存在的键的连接结果。例如,在两个RDD中分别有键值对(K,V)和(K,W),通过join()方法连接会返回(K,(V,W))。创建两个RDD,含有相同键和不同的键,通过join()方法进行内连接。使用join()方法连接两个RDD(1)join()方法rightOuterJoin()方法用于根据键对两个RDD进行右外连接,连接结果是右边RDD的所有键的连接结果,不管这些键在左边RDD中是否存在。在rightOuterJoin()方法中,如果在左边RDD中有对应的键,那么连接结果中值显示为Some类型值;如果没有,那么显示为None值。使用join()方法连接两个RDD(2)rightOuterJoin()方法leftOuterJoin()方法用于根据键对两个RDD进行左外连接,与rightOuterJoin()方法相反,返回结果保留左边RDD的所有键。使用join()方法连接两个RDD(3)leftOuterJoin()方法fullOuterJoin()方法用于对两个RDD进行全外连接,保留两个RDD中所有键的连接结果。使用join()方法连接两个RDD(4)fullOuterJoin()方法zip()方法用于将两个RDD组合成键值对RDD,要求两个RDD的分区数量以及元素数量相同,否则会抛出异常。将两个RDD组合成Key/Value形式的RDD,这里要求两个RDD的partition数量以及元素数量都相同,否则会抛出异常。使用zip()方法组合两个RDDcombineByKey()方法是Spark中一个比较核心的高级方法,键值对的其他一些高级方法底层均是使用combineByKey()方法实现的,如groupByKey()方法、reduceByKey()方法等。combineByKey()方法用于将键相同的数据聚合,并且允许返回类型与输入数据的类型不同的返回值。combineByKey()方法的使用方式如下。combineByKey(createCombiner,mergeValue,mergeCombiners,numPartitions=None)使用combineByKey()方法合并相同键的值combineByKey()方法接收3个重要的参数,具体说明如下。createCombiner:V=>C,V是键值对RDD中的值部分,将该值转换为另一种类型的值C,C会作为每一个键的累加器的初始值。mergeValue:(C,V)=>C,该函数将元素V聚合到之前的元素C(createCombiner)上(这个操作在每个分区内进行)。mergeCombiners:(C,C)=>C,该函数将两个元素C进行合并(这个操作在不同分区间进行)。使用combineByKey()方法合并相同键的值由于合并操作会遍历分区中所有的元素,因此每个元素(这里指的是键值对)的键只有两种情况:以前没出现过或以前出现过。对于这两种情况,3个参数的执行情况描述如下。如果以前没出现过,则执行的是createCombiner()方法,createCombiner()方法会在新遇到的键对应的累加器中赋予初始值,否则执行mergeValue()方法。对于已经出现过的键,调用mergeValue()方法进行合并操作,对该键的累加器对应的当前值(C)与新值(V)进行合并。由于每个分区都是独立处理的,因此对于同一个键可以有多个累加器。如果有两个或更多的分区都有对应同一个键的累加器,就需要使用用户提供的mergeCombiners()方法对各个分区的结果(全是C)进行合并。使用combineByKey()方法合并相同键的值lookup(key:K)方法作用于键值对RDD,返回指定键的所有值。使用lookup()方法查找指定键的值查询每位员工2020年的月均实际薪资需要先筛选出上、下半年的员工薪资数据中的员工姓名和实际薪资两个字段数据并创建RDD,然后将筛选后的两个RDD合并,再根据员工姓名对实际薪资求和,最后查询出2020年的每位员工的月均实际薪资,具体实现步骤如下。获取两个RDD,即split_first和split_second,使用union()方法合并两个RDD。使用combineByKey()方法计算每位员工2020年的月均实际薪资。任务实现4查询每位员工2020年的月均实际薪资目录输出每位员工2020年的总实际薪资5存储汇总后的员工薪资为文本文件6在实际生产环境中,需要读取的文本格式不仅包含普通的文本文件,还包含其他格式的文件,如JS

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论