大数据挖掘导论与案例课件-第8章 大数据挖掘关键技术_第1页
大数据挖掘导论与案例课件-第8章 大数据挖掘关键技术_第2页
大数据挖掘导论与案例课件-第8章 大数据挖掘关键技术_第3页
大数据挖掘导论与案例课件-第8章 大数据挖掘关键技术_第4页
大数据挖掘导论与案例课件-第8章 大数据挖掘关键技术_第5页
已阅读5页,还剩83页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

第1章绪论第2章数据分析与可视化技术第3章认识数据第4章数据预处理第5章分类概念与方法第6章关联分析概念与方法第7章聚类分析概念与方法第8章大数据挖掘关键技术第9章案例分析第8章大数据挖掘关键技术大数据挖掘导论与案例学习目标/Target掌握Spark的安装、了解运行原理、掌握RDD编程,了解SparkSQL、Streaming和ML掌握Hadoop分布计算架构的安装、部署,了解HDFS原理、理解MapReduce框架和计算模型的原理和优缺点目录/Contents01大规模并行处理02Spark内存计算大规模并行处理8.18.1.1Hadoop安装Hadoop安装方式(1)单机模式

在一台运行Linux操作系统的物理机,或者在Windows操作系统中架设虚拟化平台,虚拟出运行Linux操作系统的虚拟机,在虚拟机上安装Hadoop系统。该模式常用于大数据应用程序的前期开发和测试。(2)单机伪分布模式

在一台运行Linux操作系统的物理机或虚拟机上,用不同的进程模拟Hadoop系统中分分布式运行中的NameNode、DataNode、JobTracker、TaskTracker等节点,模拟Hadoop集群的运行模式,该模式常用于大数据应用程序的测试。(3)分布式集群模式

在集群环境中安装运行Hadoop系统,集群中的每个计算机运行Linux操作系统,该模式常用于大数据应用程序的实际运行,完成大数据分析和计算任务。8.1.1Hadoop安装Hadoop安装环境

在Windows操作系统中,使用VirtualBox6.1.18虚拟化平台,在虚拟机中安装Ubuntu20.04.3版本的Linux操作系统,安装构建单机伪分布式模式Hadoop系统的基本步骤如下:(1)创建用户

在Ubuntu操作系统以root用的身份,创建hadoop用户,紧接着创建一个专门的用户组,命名为hadoop,并将hadoop用户加入hadoop用户组中,基本的命令如下:[root@ubuntu~]#sudouseradd-mhadoop–d/home/hadoop。其中hadoop是用户名,-d指明hadoop用户的home目录为/home/hadoop,该目录为hadoop用户在Ubuntu系统中的根目录。[root@ubuntu~]#passwdhadoop[密码],设置hadoop用户的密码。[root@ubuntu~]#sudogroupaddhadoop。创建hadoop用户组。[root@ubuntu~]#sudousermod-a-Ghadoophadoop。将hadoop用户加入hadoop用户组。

使用vim/etc/sudoers命令打开文件,在文件末尾加入hadoopALL=(ALL:ALL)ALL语句,使hadoop用户与root用户具有系统管理权限。8.1.1Hadoop安装Hadoop安装环境(2)配置SSH

在伪分布模式和分布式集群模式中,为了实现Hadoop集群中,所有节点可以免密码登录,需要配置SSH。在root用户中,使用如下命令安装Openssh。[root@ubuntu~]#sudoapt-getinstallopenssh-server-y[root@ubuntu~]$ssh-keygen-trsa#

使用该命令后,系统会提示多次确定,完成后将在/home/hadoop/.ssh目录中生成id_rsa认证文件,将该文件复制成名为authorized_keys的文件,并执行sshlocalhost命令测试。如果出现如上图所示的提示,即不需要数据用户密码,则配置正确,如果仍需要输入密码或提示错误,则删除.ssh/文件夹重新进行认证配置。[hadoop@ubuntu~]$catid_rsa.pub>>authorized_keys[hadoop@ubuntu~]$sshlocalhost8.1.1Hadoop安装配置Hadoop

切换至root用户,下载JDK,使用命令tar-zxvfjdk-8u161-linux-x64.tar将JDK解压未/usr/local/目录中,将JDBK文件夹重命名文件夹为jdk1.8.0。

在Hadoop网站中下载hadoop-3.3.1.tar.gz安装包文件,将其解压在/usr/local/目录中,将解压后的Haoop文件夹重命名文件夹为hadoop-3.3.1。使用chown-Rhadoop:hadoop/usr/local/hadoop-3.3.1命令,将hadoop-3.3.1文件夹的所属用户修改为hadoop。8.1.1Hadoop安装配置HadoopHadoop系统中的配置文件,集中存放在hadoop-3.3.1文件夹的etc/hadoop目录中,主要涉及以下几个配置文件。(1)etc/hadoop/hadoop-env.sh,在该文件中配置JDK安装路径,方法为在该文件的最后加上以下语句:exportJAVA_HOME=/usr/local/jdk1.8.0(2)~/.bashrc,在hadoop用户的根目录,即root用户的/home/hadoop目录中,找到hadoop用户所属的系统环境变量配置文件~/.bashrc,将JDK和hadoop环境变量加入该配置文件,保存后使用source~/.bashrc命令,使其立即生效。加入内容如下:PATH=$PATH:$HOME/binexportJAVA_HOME=/usr/local/jdk1.8.0exportHADOOP_HOME=/usr/local//hadoop-3.3.1exportPATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbinexportCLASSPATH=$JAVA_HOME/lib:.=8.1.1Hadoop安装配置Hadoop(3)etc/hadoop/core-site.xml,该配置文件主要完成HDFS中管理节点(NameNode)的IP和端口配置,该文件的文件结构为XML,在<configuration></configuration>节点内部加入HDFS中NameNode的IP地址、端口,缓冲区的大小以及存放临时文件的目录等,core-site.xml的基本配置如下:<property><name>fs.defaultFS</name><value>hdfs://localhost:9000</value></property><property><name>hadoop.tmp.dir</name><value>/home/hadoop/tmp</value></property>8.1.1Hadoop安装配置Hadoop(4)etc/hadoop/hdfs-site.xml,该配置文件主要完成HDFS分布式文件系统的数据备份数量、文件服务器地址和端口、Namenode和Datanode的数据存放路径、文件存储块的大小等配置,hdfs-site.xml的基本配置如下:<property><name>dfs.replication</name><value>2</value></property><property><name>dfs.permissions</name><value>false</value></property><property> <name>dfs.http.address</name> <value>:50070</value></property>8.1.1Hadoop安装配置Hadoop(5)etc/hadoop/mapred-site.xml,该文件主要完成Hadoop的MapReduce框架设定、配置MapReduce任务运行的内存大小、最大可用CPU核数等,使用yarn作为伪分布式Hadoop集群框架时的配置如下:<property><name></name><value>yarn</value></property>8.1.1Hadoop安装配置Hadoop(6)etc/hadoop/yarn-site.xml,该文件主要配置资源管理器ResourceManager的地址,和其它分布式节点NodeManager资源配置、日志级别、任务调度器等。如资源管理器使用yarn管理伪分布式集群的yarn-site.xml配置如下:<property><name>yarn.nodemanager.aux-services</name><value>mapreduce_shuffle</value></property><property><name>yarn.resourcemanager.hostname</name><value>localhost</value></property>8.1.1Hadoop安装配置Hadoop

如果在运行Hadoop系统的MapReduce框架时,提示“找不到或无法加载主类org.apache.hadoop.mapreduce.v2.app.MRAppMaster”错误,则需要在命令行中执行hadoopclasspath命令,将该命令的执行结果作为值,加入yarn-site.xml配置文件yarn.application.classpath属性中,配置示例如下。<property><name>yarn.application.classpath</name><value>[执行hadoopclasspath命令的返回结果]</value></property>8.1.1Hadoop安装格式化HDFS文件系统

完成以上Hadoop的配置后,执行如下format命令,格式化HDFS分布式文件系统,分布式文件系统的格式化是测试Hadoop配置的第一步,根据格式化过程中的提示,可检查配置是否成功。[hadoop@ubuntu~]$hadoopnamenode–format

如果格式化成功,该命令将返回NameNode的信息,其中会有“…hasbeensuccessfullyformatted.”和“Exittingwithstatus0”的提示。如果提出错误,则需要根据提示修改配置文件,并删除配置中设定的hadoop.tmp.dir目录中的数据后,重新执行格式化。8.1.1Hadoop安装启动Hadoop

启动Hadoop使用start-all.sh命令。启动后,使用jps命令可查看如图8.2所示的5个Hadoop相关进程是否运行正常。如果进程均存在,则Hadoop配置完成。停止运行Hadoop可使用stop-all.sh图8.2jps命令查看Hadoop进程8.1.1Hadoop安装运行程序测试使用vim命令在Hadoop系统管理节点中创建两个文件,文件内容如下:file1:hellohadoophelloworldfile2:hellosparkhellostreaming将创建的两个文件上传到HDFS对应的目录中,HDFS的使用命令如下:[hadoop@ubuntu~]$hdfsdfs-mkdir/input,创建目录存放输入数据,其中“/”不能省。[hadoop@ubuntu~]$hdfsdfs-putfile*/input,将文件上传到HDFS文件系统中的/input目录中。切换到hadoop安装路径的/share/hadoop/mapreduce目录,找到Hadoop自带的hadoop-mapreduce-examples-3.3.1.jar文件,其中包含wordcount简单的词频统计程序。执行[hadoop@ubuntu~]$hadoopjarhadoop-mapreduce-examples-3.3.1.jarwordcount/input/output命令,该命令执行wordcount程序,读取/input目录中的输入文件,自动在HDFS中创建输出目录/output,将运行结果保存在/output目录中,查看运行结果的方法如图8.3所示。图8.3Hadoop测试实例运行结果8.1.1Hadoop安装查看集群状态通过浏览器可查看Hadoop集群的运行情况,HDFS管理端的Web地址为http://localhost:50070,其中localhost是NameNode节点的地址,如图8.4(a)所示;Yarn的管理界面的Web地址为http://localhost:8088,可以看到节点数目、提交任务的执行情况,输出错误提示等,如图8.6(b)所示。

(a)HDFS图形界面

(b)Yarn图形界面图8.4Hadoop集群图形管理界面8.1.2HDFS分布式文件系统HadoopDistributedFileSystem是一个分布式文件系统,简称HDFS。HDFS有高容错性,设计用来部署在低廉的硬件上,以流式数据访问模式来存储超大文件,提供高吞吐量来访问应用程序的数据,适合使用于具有超大数据集的应用程序中。

HDFS上的文件被划分为块,作为独立的存储单元,称为数据块(block),典型的大小是64MB。按照块划分数据存储有诸多好处,一个文件的大小可以大于网络中任意一个磁盘的容量,文件的所有块不需要存储在同一个磁盘上,利用集群上的任意一个磁盘进行存储,简化了存储管理,同时元数据不需要和块一起存储,用一个独立的功能模块管理块的元数据。数据块更加适合数据备份,进而提供数据容错能力和提高可用性。8.1.2HDFS分布式文件系统HDFS集群主要由一个NameNode和多个DataNode组成

NameNode提供元数据、命名空间、数据备份、数据块管理的服务,

DataNode存储实际的数据,客户端访问NameNode以获取文件的元数据和属性,而文件内容的输入输出操作直接和DataNode进行交互。HDFS采用Master/Slave的架构来存储数据HDFS体系结构主要由四个部分组成,分别为HDFSClient、NameNode、DataNode和SecondaryNameNode,NameNode是一个中心服务器,负责管理文件系统的命名空间及客户端对文件的访问,集群中的DataNode是运行一个进程与NameNode交互,且进行数据块读写的节点,负责管理它所在节点上的数据块,SecondaryNameNode辅助NameNode完成数据备份和编辑日志文件等8.1.2HDFS分布式文件系统图8.5HDFS体系结构8.1.2HDFS分布式文件系统HDFS文件系统的相关操作使用文件执行命令来实现,命令的书写方式有hadoopfs、hadoopdfs或hdfsdfs,三种写法的作用相同,以常用的hdfsdfs为例,最常用的形式如下:(1)hdfsdfs-ls,显示当前目录结构,-ls-R递归显示目录结构;(2)hdfsdfs-mkdir,创建目录;(3)hdfsdfs-rm,删除文件,-rm-R递归删除目录和文件;(4)hdfsdfs-put[localsrc][dst],从本地加载文件到HDFS;(5)hdfsdfs-get[dst][localsrc],从HDFS导出文件到本地;(6)hdfsdfs-copyFromLocal[localsrc][dst],从本地加载文件到HDFS,与put一致;(7)hdfsdfs-copyToLocal[dst][localsrc],从HDFS导出文件到本地,与get一致;(8)hdfsdfs-cat,查看文件内容;(9)hdfsdfs-du,统计指定目录下各文件的大小,单位是字节。-du-s汇总文件大小,-du-h指定显示单位;(10)hdfsdfs-tail,显示文件末尾;(11)hdfsdfs-cp[src][dst],从源目录复制文件到目标目录;(12)hdfsdfs-mv[src][dst],从源目录移动文件到目标目录。图8.6MapReduce工作流程MapReduce工作流程8.1.3MapReduce计算模型MapReduce基于HDFS文件系统,进行大规模分布式文件处理的核心技术,是Hadoop生态系统中的核心技术。MapReduce工作原理8.1.3MapReduce计算模型图8.7MapReduce的工作原理MapReduce工作原理8.1.3MapReduce计算模型从逻辑角度来看MapReduce的作业运行过程,主要分为5个阶段,分别是数据分片(inputsplit)、Map、Shuffle、Reduce和输出五个阶段。(1)输入数据(inputsplit)

对数据进行分片,即将输入数据切分为大小相等的数据块。输入分片和HDFS的block关系密切,每片数据作为单个MapTask的输入,分片完成后多个MapTask便可以同时工作。(2)Map

每个MapTask在读入各自的数据后进行计算处理,并将中间结果保存在HDFS中,数据(如文本文件中的行,或数据表中的行)将以键值对<key,value>的形式传入map方法,本阶段计算完成后,同样以键值对形式保存中间结果。键值对中的key决定了中间结果发送到哪个ReduceTask进行合并,且key和ReduceTask是多对一的关系,具有相同key的数据会被发送给同一个ReduceTask,单个ReduceTask可能会接收到多个key值的数据。MapReduce工作原理8.1.3MapReduce计算模型(3)Shuffle

在进入Reduce阶段之前,MapReduce框架会对数据按照key值排序,使具有相同key的数据彼此相邻。如果指定了合并操作(Combiner),框架会调用Combiner将具有相同key的数据进行合并,Combiner的输入、输出的参数必须与Reduce保持一致,通常也叫做洗牌(Shuffle)。分区的意义则在于尽量减少每次写入磁盘的数据量和复制到下一阶段节点之间传输的数据量。(4)ReduceReduce阶段,相同key的键值对被发送至同一个ReduceTask,同一个ReduceTask会接收来自多个MapTask的键值对,接收数据后数据的格式为键值对<key,[valuelist]>,其中[valuelist]为具有相同键的值列表,每个ReduceTask对key相同的[valuelist]进行Reduce变成一个值输出。(5)输出

输出阶段定义输出文件格式,以格式化方式将分析和计算的结果输出到HDFS中。常用的方法是将数据转换为字符串格式输出。MapReduce工作原理8.1.3MapReduce计算模型MapReduce实例#!/usr/bin/envpython

importsys

forlineinsys.stdin:

line=line.strip()

words=line.split()

forwordinwords:

print("%s\t%s"%(word,1))

用Python实现第一个MapReduce应用程序单词计数WordCount,该程序也被称为MapReduce计算模型的“HelloWorld”程序,分为mapper.py文件和reducer.py文件两个子程序的设计,将两个文件存储在本地磁盘的/home/hadoop/code目录中。mapper.py通过sys.stdin读取标准文件流,将文件中的行用strip()方法去除两边空格,split()方法将文本行中的字符串按照空格分割为单词,并对每个单词组装成(word,1)形式的<key,value>,print()函数输出到标准输出流。MapReduce工作原理8.1.3MapReduce计算模型MapReduce实例#!/usr/bin/envpython

importsys

current_word=None

current_count=0

word=None

forlineinsys.stdin:

line=line.strip()

word,count=line.split("\t",1)

try:

count=int(count)

exceptValueError:

continue

ifcurrent_word==word:

current_count+=count

else:

ifcurrent_word:

print("%s\t%s"%(current_word,current_count))

current_count=count

current_word=word

ifword==current_word:

print("%s\t%s"%(current_word,current_count))reducer.py通过sys.stdin读取标准文件流,从文件流中读取行,用strip()方法去除两边空格,split("\t",1)方法将文本行中的字符串按照制表符分割为单词,并将每个单词组装成(word,1)形式的word和count,为了统计计算,将count中的字符强制转化为整数类型,如果当前的单词等于已经读入的单词,则累加count,否则将读入的单词作为键值key,生成新的键值对,最后通过print()函数输出到标准输出流。MapReduce工作原理8.1.3MapReduce计算模型图8.8Map和Reduce程序测试

保存mapper.py和reducer.py文件后,需要在vim命令编辑窗口,使用setff命令检查两个文件的格式。在Ubuntu系统中需要设置文件格式为unix,可以先采用如图8.8所示的方法,在本地验证mapper.py和reducer.py代码的正确性。MapReduce工作原理8.1.3MapReduce计算模型

在Hadoop平台上执行Python版本的MapReduce程序的方法有两种:

一种与Java编写的程序运行方法一样,用Jython把Python程序打包为jar文件,该jar包可以像Hadoop执行Java编写的程序包一样执行;

另一种方式是使用Hadoop中自带的流接口HadoopAPI运行,使用HadoopAPI执行示例为,在Hadoop安装目录的share/hadoop/tools/lib目录中包含了hadoop-streaming-3.3.1.jar流处理文件,即使用如下命令执行代码,设定HadoopAPI、map、reduce、数据和输出路径,运行成功后结果存储在HDFS的/output目录中。hadoopjarshare/hadoop/tools/lib/hadoop-streaming-3.3.1.jar\-file/home/hadoop/code/mapper.py\-mapper/home/hadoop/code/mapper.py\-file/home/hadoop/code/reducer.py\-reducer/home/hadoop/code/reducer.py\-input/input-output/outputSpark内存计算8.28.2.1Spark安装

安装Spark之前需要安装Linux系统、Java环境和Hadoop环境。Ubuntu中自带了Python3.8,在Spark网站中下载spark-3.1.2-bin-hadoop3.2.tgz安装包,使用解压命令tar-zxvfspark-3.1.2-bin-hadoop3.2.tgz解压安装包至路径/usr/local目录中,将文件夹重命名为spark-3.1.2,并使用chown-Rhadoop:hadoop/usr/local/spark-3.1.2命令修改安装目录的所属用户。由于所有例子均使用python语言实现,故安装Spark的python版本pyspark。Pyspark安装8.2.1Spark安装

在Spark安装目录下找到conf文件夹,使用命令cpspark-env.sh.templatespark-env.sh复制一份配置文件,编辑spark-env.sh配置文件,在文件最后面加如下一行内容,Spark才能从HDFS中读写数据,如果不加这项配置,则Spark只能访问本地数据。exportSPARK_DIST_CLASSPATH=$(/usr/local/Hadoop-3.3.1/bin/hadoopclasspath)

在Hadoop用户的环境变量~/.bashrc文件中,配置以下Spark环境变量。exportSPARK_HOME=/usr/local/spark-3.1.2exportPATH=$SPARK_HOME/bin:$PATHexportPYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATHexportPYSPARK_PYTHON=python图8.9pyspark安装测试Pyspark应用的运行8.2.1Spark安装pyspark编辑和运行应用程序的方式有两种,分别是命令行和文件。命令行编写pyspark应用的方式如图8.10所示图8.10pyspark命令行LineCount.py#!/usr/bin/envpython3frompysparkimportSparkContext,SparkConf

conf=SparkConf().setMaster("local").setAppName("WordCount")

sc=SparkContext(conf=conf)

lines=sc.textFile("file:///home/hadoop/file/file1")

num=lines.count()

print(num)Pyspark应用的运行8.2.1Spark安装使用spark-submit提交pyspark应用程序的运行过程如图8.11所示,其中WARN为警告信息。可在Spark安装的conf目录的perties文件中修改log4j.rootCategory=ERROR.console,可定义日志级别为只显示错误信息,在集群中提交pyspark应用程序时,命令中的选项--master,也用于设定集群管理器地址。图8.11spark-submit提交pyspark应用Spark运行框架8.2.2Spark运行原理SparkCore实现Spark最基础和核心的功能

其中包括内存计算、任务调度、部署模式、故障恢复、存储管理。Spark运行架构集群资源管理器(ClusterManager)可以是spark自带的管理器也可以是YARN或Mesos等运行作业任务的工作节点(WorkerNode)每个应用的任务控制节点(Driver)每个工作节点上负责具体任务的执行进程(Executor)Spark运行框架8.2.2Spark运行原理结合图8.12的SparkCore运行框架,Spark应用的实际执行过程可归纳为以下10项图8.12Spark运行框架(1)应用Application(2)驱动程序Driver(3)资源管理器ClusterManager(4)执行器Executor(5)作业Job(6)阶段Stage(7)弹性分布式数据集(ResilientDistributedDataset,RDD)(8)有向无环图(DirectedAcyclicGraph,DAG)(9)DAGScheduler(10)TaskSchedulerSpark运行流程8.2.2Spark运行原理Spark运行流程如图8.13所示

(1)构建Application的运行环境。启动SparkContext,SparkContext向ClusterManager注册并申请运行Executor资源。(2)ClusterManager为Executor分配资源,并启动Executor进程,Executor的运行情况,将随着“心跳”发送到ClusterManager上。(3)SparkContext构建DAG图,将DAG图分解成多个阶段,并把每个阶段的任务集发送给任务调度器。Executor向SparkContext申请任务,任务调度器将任务发送给Executor,同时SparkContext将应用程序代码发送给Executor。(4)任务在Executor上运行。把执行结果反馈给任务调度器,然后再反馈给DAGScheduler。运行结束后写入数据,SparkContext向ClusterManager注销并释放所有资源。图8.13Spark运行流程RDD设计原理8.2.2Spark运行原理弹性分布式数据集RDD是分布式内存中的一个抽象概念,提供了一种高度受限的共享内存模型。本质上,RDD是一个只读的内存分区记录集,每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。RDD提供了一组丰富的操作以支持常见的数据运算,分为“行动”(Action)和“转换”(Transformation)两种类型,“行动”用于执行计算并指定输出的形式,“转换”指定RDD之间的相互依赖关系。两类操作的主要区别是转换操作接收RDD并返回RDD,而行动操作(比如count、collect等)接收RDD但返回非RDD(即输出一个值或结果)。RDD提供的转换接口非常简单,都是类似于map、filter、groupBy、join等粗粒度的数据转换操作,而不是针对某个数据项的细粒度修改。8.2.3RDD编程RDD创建Spark创建RDD的方式有两种:(1)使用textFile()方法,通过读取文件创建;(2)使用parallelize()方法,从已存在的集合创建,集合如python中的列表、元组等。图8.14读取HDFS文件创建RDD图8.15通过并行列表创建RDD8.2.3RDD编程RDD操作图8.22RDD转换过程示意图RDD转换操作会产生不同的RDD,以便于下一次转换使用。RDD的转换过程中采用了惰性机制,整个转换过程只是记录转换的逻辑,并不会发生真正的计算(当遇到行动操作时,才会触发真正的计算)。RDD的转换操作主要有5个API,其中每个API都是一个高阶方法,需要传入另一个函数(function)作为参数,该函数function定义了具体的转换规则,function常用lambda表达式进行设计。8.2.3RDD编程RDD操作(1)filter(function)filter()方法筛选满足函数function条件的元素,并返回一个新的RDD。仍使用图8.15中的RDD,筛选出包含spark关键词的行的示例如图8.16所示。后续示例如没有特殊说明,代码均在pyspark命令行中运行。图8.16filter()方法的使用8.2.3RDD编程RDD操作(2)map(function)map()方法将RDD通过function函数映射为一个新的RDD,且该方法将对RDD中的每个元素按照function函数的计算规则进行转换。将每个数据乘以2的数值转换示例如图8.17所示,对文本分割的示例如图8.18所示。图8.17map()方法转换数值图8.18map()方法转换文本8.2.3RDD编程RDD操作(3)flatMap(function)flatMap()方法和map()方法在RDD中转换的作用基本相同,不同之处在于其增加了“拍扁”(flat)功能,“拍扁”后RDD中的每行中仅有一个元素。flatMap()方法的示例如图8.19所示。图8.19flatMap()方法的使用8.2.3RDD编程RDD操作(4)groupByKey()groupByKey()方法应用于键值对(K,V)的数据集时,返回一个新的(K,Iterable)形式的数据集。groupByKey()方法的示例如图8.20所示。在示例中使用flatMap()方法将文本转换为每行只有一个单词,再把每个单词转换为键值对(word,1)的形式。groupByKey()方法会对具相同键的数据进行合并,如示例中“Hello”出现了3次,所以进行合并,且合并后的集合在一个Iterable对象中,形如(’hello’,(1,1,1)),使用map()方法可对Iterable中数据进行聚合。图8.20groupByKey()方法的使用8.2.3RDD编程RDD操作(5)reduceByKey(function)reduceByKey()方法作用于(K,V)键值对的RDD时,返回一个新的(K,V)形式的RDD,键值对将按照键值K值传递到function函数中,并对V进行聚合计算。reduceByKey()方法的示例如图8.21所示。示例中使用flatMap()方法将文本转换为每行只有一个单词,再将每个单词转换为键值对(word,1),reduceByKey()方法中传入的a与b是具有相同键的值。图8.21reduceByKey()方法的使用8.2.3RDD编程RDD操作

从文件中加载数据,完成一次又一次的转换操作后,当pyspark程序执行到行动操作时,才会执行真正的计算,完成行动操作并得到结果。常用的RDD行动操作API有以下几种,它们的使用方法如下:(1)count():count()方法返回RDD中元素的个数图8.23count()方法(2)collect():collect()方法以列表的形式返回RDD中元素图8.24collect()方法8.2.3RDD编程RDD操作(3)first()和take(n):first()方法用于获取RDD中的第一个元素,take(n)方法用于获取RDD中的前n个元素得到新的列表,它们常用于数据的观察(4)reduce(function):reduce()也用于聚合,reduce()与reduceByKey()的功能不同,reduce()是行动操作,常用于聚合RDD中的元素,传入的数据是数据集合中的所有元素,而reduceByKey()是转换操作,传入的数据是具有相同键的值图8.25first()和take(n)方法图8.26reduce()方法8.2.3RDD编程键值对转换键值形式的RDD,是指RDD中每个元素都是(K,V)格式的数据,是最常见的RDD数据类型。键值对RDD仍然通过文件加载或集合创建。(1)keys()和values():keys()方法返回所有键值对中的K,values()方法返回所有键值对中的V,分别形成新的RDD图8.27keys()和values()方法8.2.3RDD编程键值对转换(2)sortByKey()和sortBy():sortByKey()方法返回根据键值排序的RDD,sortBy()方法可按照键值对中的指定值排序,两个方法都有升降序排序开关,默认True为升序,False为降序图8.28sortByKey()和sortBy()方法8.2.3RDD编程键值对转换(3)mapValues(function):mapValues()方法是针对键值对中V的转换,它会把所有的V都按照function定义的转换逻辑进行相同的转换操作,不会影响键K图8.29mapValues()方法8.2.3RDD编程键值对转换(4)join():join()方法是对两个RDD中相同键值的数据执行内连接,将(K,V1)和(K,V2)连接成(K,(V1,V2))的键值对形式图8.30join()方法8.2.3RDD编程键值对转换(5))combineByKey():combineByKey()方法是Spark中核心的高阶方法,其他一些高阶方法的底层都是用该方法实现,如groupByKey(),reduceByKey()等。combineByKey()方法的原型为:combineByKey(createCombiner:V=>C,mergeValue:(C,V)=>C,mergeCombiners:(C,C)=>C,partitioner:Partitioner,mapSideCombine:Boolean=true,serializer:Serializer=null图8.31combineByKey()方法8.2.3RDD编程键值对转换(6)saveAsTextFile():saveAsTextFile()方法将RDD以文本文件的格式存储到文件系统中,该方法将按照分区数量生成文件的个数图8.32saveAsTextFile()方法保存结果图8.33浏览saveAsTextFile()保存结果8.2.4SparkSQLDataFrame的创建和保存

执行SparkSQL的方法也有命令行和独立应用两种方式,其中启动pyspark时会自动建立名称为spark的SparkSession实例,采用独立应用的方式时,需要通过以下三行代码自定义SparkSession实例。自定义SparkSession实例#!/usr/bin/envpython3

frompysparkimportSparkContext,SparkConffrompyspark.sqlimportSparkSessionspark=SparkSession.builder.config(conf=SparkConf()).getOrCreate()8.2.4SparkSQLDataFrame的创建和保存图8.37创建DataFrame示例8.2.4SparkSQLDataFrame的常用操作(1)show(numRows:Int,truncate:Boolean)方法(2)printSchema(),用于打印DataFrame的结构(3)select()方法,和SQL语句中的用法一样,可投影DataFrame中的列或在原有列上进行运算(4)filter()或where()方法,相当于SQL中的where关键字(5)sort()或orderBy()方法(6)groupby()方法,类似于SQL中的GroupBy关键字,统计每个分组中的数据(7)distinct()方法,用于去除重复(8)collect()方法(9)withColumn()方法(10)和SQL中对应,DataFrame也有并、交、差运算,分别对应于union()、intersect()、subtract()方法。(11)join()方法,用于连接,生成新的DataFrame8.2.4SparkSQLDataFrame的常用操作图8.38printSchema()图8.39select()、filter()和sort()方法8.2.4SparkSQLRDD转换为DataFrame从RDD转换为DataFrame有两种方式:(1)利用反射机制推断RDD模式(2)使用编程方式定义模式,构造一个DataFrame结构,并编程定义的DataFrame结构与已有的RDD合并形成DataFrame。图8.40反射机制推断RDD结构创建DataFrame8.2.4SparkSQLRDD转换为DataFrameDataFrame结构设计需要字段名称、字段类型和是否为空等信息,SparkSQL提供了StructType来表示结构信息,在其中加入StructField(name,dataType,nullable=True,metedate=None)作为StructType()方法的参数来定义多个字段,DataFrame数据被封装在Row中,最后使用spark.createDataFrame()将结构和数据组装形成DataFrame图8.41编程方式定义RDD结构创建DataFrame8.2.4SparkSQLSparkSQL读写数据库图8.42MySQL中创建数据库和表图8.43SparkSQL读取MySQL数据8.2.4SparkSQLSparkSQL读写数据库图8.44SparkSQL向MySQL数据表中插入数据8.2.5Spark流式计算SparkStreaming

SparkStreaming是构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。SparkStreaming接收实时流数据后,根据一定的时间间隔(通常是0.5~2秒)将数据流拆分成一批批的数据,Spark处理批数据,最终得到处理一批批数据的结果数据。SparkStreaming最主要的抽象数据类型是DStream(离散化数据流,DistcretizedStream),因此对应流数据的DStream可以看成一组RDDs,即RDD的一个序列,对DStream的操作最终转变为相应RDD的操作。SparkStreaming可整合多种输入数据源,如Kafka、Flume、HDFS或TCP套接字,经处理后的数据可存储至文件系统、数据库、或打印输出到控制台。8.2.5Spark流式计算SparkStreaming编写SparkStreaming程序的基本步骤如下:1)创建输入DStream来定义数据源;2)对DStream定义流计算规则;3)调用StreamingContext对象的Start()方法开始接收数据和处理流程;4)调用StreamingContext对象的awaitTermination()方法,等待流计算结束。图8.45命令行创建StreamingContext对象#!/usr/bin/envpython3

frompysparkimportSparkContext,SparkConffrompyspark.streamingimportStreamingContext

conf=SparkConf().setMaster("local").setAppName("DStreamApp")

sc=SparkContext(conf=conf)ssc=StreamingContext(sc,1)创建StreamingContext对象的方式8.2.5Spark流式计算文件流图8.46SparkStreaming流式计算端图8.47SparkStreaming流式计算端计算过程8.2.5Spark流式计算TCP套接字图8.48Socket服务器图8.49SparkStreaming处理Socket套接字流8.2.5Spark流式计算TCP套接字转换方法功能说明window(windowLength,slideInterval)基于窗口生成新的DStreamwindowLength为窗口大小,slideInterval为滑动时间间隔CountByWindow(windowLength,slideInterval)窗口中元素计数windowLength为窗口大小,slideInterval为滑动时间间隔reduceByWindow(function,windowLength,slideInterval)利用function对窗口内单一元素进行聚合function为自定义函数reduceByKeyAndWindow(function,windowLength,slideInterval,[numTasks])利用function对窗口内相同键值的数据进行聚合[numTasks]为聚合任务的数量reduceByKeyAndWindow(function,invFunc,windowLength,slideInterval,[numTasks])更加高效的利用function对窗口内相同键值的数据进行增量聚合function函数指定当前窗口中的数据聚合方式,invFunc是从窗口中移除数据的去除函数表8.1常用的滑动转换操作8.2.5Spark流式计算StructuredStreaming

StructuredStreaming是基于DataFrame的结构化流数据处理技术,该技术将SparkSQL和SparkStreaming结合起来,将实时数据流看成一张不断进行添加的数据表,对表中增量部分的数据,按照固定时间间隔获取并进行处理。StructuredStreaming数据流处理模式分为两种,分别是微批处理和持续批处理,默认使用微批处理模式,使用writeStream().trigger(Trigger.Continuous("1second"))可从微处理模式切换到持续批处理模式。

两种处理模式的最大区别在于,持续批处理模式比微批处理模式更具实时性。8.2.6SparkML将SparkML机器学习库的内容概括如下。(1)算法,包含了常用的机器学习算法,如回归、分类、聚类和协同过滤等;(2)特征工程,包含了特征提取、转化、降维和选择工具等;(3)流水线(Pipeline),用于构建、评估和调整机器学习工作流的工具;(4)持久性,保存和加载算法、模型和管道;(5)实用工具,如线性代数、统计、数据处理等工具。Spark机器学习库从1.2版本以后被分为两个包,分别是SparkMLlib和SparkML。SparkMLlib包含基于RDD的原始API,SparkML则提供了基于DataFrame的高层次API,可以用来构建机器学习流水线(MLPipeLine),机器学习流水线弥补了SparkMLlib库的不足。8.2.6SparkML向量、标注点和矩阵等基础数据类型支持着SparkML中机器学习算法。向量分为稠密向量(DenseVector)和稀疏向量(SparseVector),用浮点数存储数据,两种向量分别继承自pyspark.ml.linalg.Vectors类。标注点(LabeledPoint)是带标签的本地向量,标注点的实现在pyspark.ml.regression.LabeledPoint类中,一个标注点由一个浮点类型的标签和一个向量组成。矩阵是由向量组成的集合,在pyspark.ml.linalg.Matrix类中有DenseMatrix和SparseMatrix两种矩阵。基本数据类型8.2.6SparkML数据预处理特征提取SparkML中提供了词频-逆向文件词频(TermFrequency-InverseDocumentFrequercy,TF-IDF)、Word2Vec、CountVectorizer等几种常用的特征提取操作。

(8.1)TF-IDF的度量值的计算如式(8.2)所示。(8.2)8.2.6SparkML数据预处理特征提取图8.50HashingTF提取特征向量图8.51TF-IDF特征提取8.2.6SparkML数据预处理特征转换SparkML的包中实现了几个相关的转换器,如位于pyspark.ml.feature包中的StringIndexer()、IndexToString()、OneHotEncoder()、VectorIndexer()方法等。IndexToString()方法常和StringIndexer()方法配合使用,先用StringIndexer()方法将类别值转化成数值,进行模型训练,在训练结束后,再将数值转化成原有的类别值图8.52StringIndexer()示例图8.53IndexToString()实例8.2.6SparkML数据预处理特征选择SparkML中的特征选择在pyspark.ml.feature包中,有VectorSlicer()、RFormula()和

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论