




版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
大数据导论第十二章大数据导论第十二章1CONTENTS目录PART01SPARKSQL简介PART02SPARKSQL执行流程PART03基础数据模型DATAFRAMEPART04使用SparkSQL的方式PART05SPARKSQL数据源PART06SPARKSQLCLI介绍PART07在Pyspark中使用SparkSQLPART08在Java中连接SparkSQLPART09习题CONTENTS目录PART01SPARKSQL简介2PART01SparkSQL简介SparkSQL是一个用来处理结构化数据的Spark组件,为Spark提供了查询结构化数据的能力。SparkSQL可被视为一个分布式的SQL查询引擎,可以实现对多种数据格式和数据源进行SQL操作,包括Parquet,Hive,MongoDB,JSON、HDFS、JDBC、S3和RDD等。PART01SparkSQL简介SparkSQL3SparkSQL简介SparkSQL介绍:SparkSQL是为了处理结构化数据的一个Spark模块。不同于SparkRDD的基本API,SparkSQL接口拥有更多关于数据结构本身与执行计划等更多信息。在Spark内部,SparkSQL可以利用这些信息更好地对操作进行优化。SparkSQL提供了三种访问接口:SQL,DataFrameAPI和DatasetAPI。当计算引擎被用来执行一个计算时,有不同的API和语言种类可供选择。这种统一性意味着开发人员可以来回轻松切换各种最熟悉的API来完成同一个计算工作。SparkSQL简介SparkSQL介绍:4SparkSQL简介SparkSQL具有如下特点数据兼容方面:能加载和查询来自各种来源的数据。性能优化方面:除了采取内存列存储、代码生成等优化技术外,还引进成本模型对查询进行动态评估、获取最佳物理计划等;组件扩展方面:无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。标准连接:SparkSQL包括具有行业标准JDBC和ODBC连接的服务器模式。SparkSQL简介SparkSQL具有如下特点5SparkSQL简介SparkSQL具有如下特点集成:无缝地将SQL查询与Spark程序混合。SparkSQL允许将结构化数据作为Spark中的分布式数据集(RDD)进行查询,在Python,Scala和Java中集成了API。这种紧密的集成使得SQL查询以及复杂的分析算法可以轻松地运行。可扩展性:对于交互式查询和长查询使用相同的引擎。SparkSQL利用RDD模型来支持查询容错,使其能够扩展到大型作业,不需担心为历史数据使用不同的引擎。SparkSQL简介SparkSQL具有如下特点6PART02SparkSQL执行流程PART02SparkSQL执行流程7SparkSQL执行流程类似于关系型数据库,SparkSQL语句也是由Projection(a1,a2,a3)、DataSource(tableA)、Filter(condition)三部分组成,分别对应SQL查询过程中的Result、DataSource、Operation,也就是说SQL语句按Result-->DataSource-->Operation的次序来描述的。SparkSQL执行流程类似于关系型数据库,SparkS8SparkSQL执行流程解析(Parse)对读入的SQL语句进行解析,分辨出SQL语句中哪些词是关键词(如SELECT、FROM、WHERE),哪些是表达式、哪些是Projection、哪些是DataSource等,从而判断SQL语句是否规范;绑定(Bind)将SQL语句和数据库的数据字典(列、表、视图等)进行绑定,如果相关的Projection、DataSource等都存在,则这个SQL语句是可以执行的;SparkSQL执行流程解析(Parse)9SparkSQL执行流程优化(Optimize)一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划;执行(Execute)按Operation-->DataSource-->Result的次序来执行计划。在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。SparkSQL执行流程优化(Optimize)10PART03基础数据模型DataFrameDataFrame是由“命名列”(类似关系表的字段定义)所组织起来的一个分布式数据集合,可以把它看成是一个关系型数据库的表。PART03基础数据模型DataFrameDataFra11基础数据模型DataFrameDataFrame是SparkSQL的核心,它将数据保存为行构成的集合,行对应列有相应的列名。DataFrame与RDD的主要区别在于,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得SparkSQL可以掌握更多的结构信息,从而能够对DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。基础数据模型DataFrameDataFrame是Spark12基础数据模型DataFrameDataFrame与RDD的对比:基础数据模型DataFrameDataFrame与RDD的对13PART04使用SparkSQL的方式PART04使用SparkSQL的方式14使用SparkSQL的方式使用SparkSQL,首先利用sqlContext从外部数据源加载数据为DataFrame;然后,利用DataFrame上丰富的API进行查询、转换;最后,将结果进行展现或存储为各种外部数据形式。Spark
SQL
为Spark提供了查询结构化数据的能力,查询时既可以使用SQL也可以使用DataFrame
API(RDD)。通过ThriftServer,Spark
SQL支持多语言编程包括Java、Scala、Python及R。使用SparkSQL的方式使用SparkSQL,首先利用15使用SparkSQL的方式使用SparkSQL的方式16使用SparkSQL的方式加载数据①.从Hive中的users表构造DataFrame:users=sqlContext.table("users")②.加载S3上的JSON文件:logs=sqlContext.load("s3n://path/to/data.json","json")③.加载HDFS上的Parquet文件:clicks=sqlContext.load("hdfs://path/to/data.parquet","parquet")使用SparkSQL的方式加载数据①.从Hive中的us17使用SparkSQL的方式加载数据④.通过JDBC访问MySQL:comments=sqlContext.jdbc("jdbc:mysql://localhost/comments","user")⑤.将普通RDD转变为DataFrame:rdd=sparkContext.textFile(“article.txt”).flatMap(_.split("")).map((_,1)).reduceByKey(_+_)wordCounts=sqlContext.createDataFrame(rdd,["word","count"])使用SparkSQL的方式加载数据④.通过JDBC访问M18使用SparkSQL的方式加载数据⑥.将本地数据容器转变为DataFrame:data=[("Alice",21),("Bob",24)]people=sqlContext.createDataFrame(data,["name","age"])⑦.将Pandas
DataFrame转变为Spark
DataFrame(Python
API特有功能):sparkDF
=
sqlContext.createDataFrame(pandasDF)使用SparkSQL的方式加载数据⑥.将本地数据容器转变19使用SparkSQL的方式使用DataFrame①.创建一个只包含"年轻"用户的DataFrame:young=users.filter(users.age<21)②.也可以使用Pandas风格的语法:young=users[users.age<21]③.将所有人的年龄加1:young.select(,young.age+1)使用SparkSQL的方式使用DataFrame①.创建20使用SparkSQL的方式使用DataFrame④.统计年轻用户中各性别人数:young.groupBy("gender").count()⑤.将所有年轻用户与另一个名为logs的DataFrame联接起来:young.join(logs,logs.userId==users.userId,"left_outer")使用SparkSQL的方式使用DataFrame④.统计21使用SparkSQL的方式保存结果①.追加至HDFS上的Parquet文件:young.save(path="hdfs://path/to/data.parquet",source="parquet",mode="append")②.覆写S3上的JSON文件:young.save(path="s3n://path/to/data.json",source="json",mode="append")使用SparkSQL的方式保存结果①.追加至HDFS上的22使用SparkSQL的方式保存结果③.保存为SQL表:young.saveAsTable(tableName="young",source="parquet"mode="overwrite")④.转换为PandasDataFrame(PythonAPI特有功能):pandasDF=young.toPandas()⑤.以表格形式打印输出:young.show()使用SparkSQL的方式保存结果③.保存为SQL表:23SparkSQL通过DataFrame接口使用多种数据源。应用程序可以直接使用关系型转换对DataFrame进行操作,也可以用来创建临时表。把DataFrame注册为临时表后就可以使用SQL对其数据进行查询PART05SparkSQL数据源SparkSQL通过DataFrame接口使用多种数据源。24通用Load方法SparkSQL使用数据源的最简单模式就是对缺省数据源类型来进行各种操作。缺省数据源类型是parquet文件df=spark.read.load("examples/src/main/resources/users.parquet")df.select("name","favorite_color").write.save("namesAndFavColors.parquet")也可以通过load()函数的参数来说明数据源的类型:df=spark.read.load("examples/src/main/resources/people.json",format="json")df.select("name","age").write.save("namesAndAges.parquet",format="parquet")通用Load方法SparkSQL使用数据源的最简单模式就是25通用Load方法也可以不把文件上载到DataFrame,然后再对其进行查询操作,而是直接对文件进行SQL查询:df=spark.sql("SELECT*FROMparquet.examples/src/main/resources/users.parquet")通用Load方法也可以不把文件上载到DataFrame,然后26通用Save方法Save方法Save()操作的可选参数SaveMode用来说明如何处理已有数据。需要提醒的是这些保存操作不是原子操作,如果选择的是Overwrite模式,Save()操作会首先删除已有的数据,然后才写入新的数据。SaveMode的可选值:Scala/Java任何语言解释SaveMode.ErrorIfExists"error"(缺省值)如果有数据存在,就抛异常。SaveMode.Append"append"如果有数据/表存在,就添加新数据到老数据后面。SaveMode.Overwrite"overwrite"如果有数据/表存在,就用新数据覆盖老数据。SaveMode.Ignore"ignore"如果有数据存在,就不写入新数据。通用Save方法Save方法Save()操作的可选参数Sav27通用Save方法基于文件的数据源,可以用path可选参数定义一个数据库表的路径,比如:df.write.option(“path”,“/some/path”).saveAsTable(“t”)即使数据库表删除后,该数据库表路径和数据也不会被删除。如果程序不指定数据库表路径,Spark会把数据写到一个缺省的数据库表路径下。但是在这种情况下,当数据库被删除的时候,缺省数据库表的路径将会被删除。通用Save方法基于文件的数据源,可以用path可选参数定义28通用Save方法对基于文件的数据源还可以对其输出进行分桶、排序和分割,分桶和排序只能用于当输出方式为持久数据库表的时候。df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")当进行分割时,可以使用DatasetAPI接口的save()和saveAsTable()方法:df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")通用Save方法对基于文件的数据源还可以对其输出进行分桶、排29通用Save方法还可以对同一张表同时进行分割和分桶:df=spark.read.parquet("examples/src/main/resources/users.parquet")(df.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed"))通用Save方法还可以对同一张表同时进行分割和分桶:df=30Parquet文件数据源Parquet文件数据源SparkSQL支持对Parquet文件的读写操作,并自动保持源数据的模式。为了兼容性,在写Parquet文件的时候,所有的列会自动转换为可为空的列。把DataFrame保存为Parquet文件,格式信息会自动保留。peopleDF=spark.read.json("examples/src/main/resources/people.json")df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")Parquet文件数据源Parquet文件数据源Spark31读取上面存储的Parquet文件为DataFrameParquet文件也可以用来创建临时表供SQL查询使用parquetFile=spark.read.parquet("people.parquet")parquetFile.createOrReplaceTempView("parquetFile")teenagers=spark.sql("SELECTnameFROMparquetFileWHEREage>=13ANDage<=19")teenagers.show()Parquet文件数据源读取上面存储的Parquet文件为DataFrameParq32JSONDataSets数据源JSONDataSets数据源SparkSQL可以自动根据JSONDataSet的格式把其上载为DataFrame。用路径指定JSONdataset;路径下可以是一个文件,也可以是多个文件:sc=spark.sparkContextpath="examples/src/main/resources/people.json"peopleDF=spark.read.json(path)使用的结构可以调用printSchema()方法打印:peopleDF.printSchema()JSONDataSets数据源JSONDataSets33利用DataFrame创建一个临时表:使用Spark的sql方法进行SQL查询:peopleDF.createOrReplaceTempView("people")teenagerNamesDF=spark.sql("SELECTnameFROMpeopleWHEREageBETWEEN13AND19")teenagerNamesDF.show()JSONDataSets数据源利用DataFrame创建一个临时表:使用Spark的sql34JSONdataset的DataFrame也可以是RDD[String]格式,每个JSON对象为一个string:jsonStrings=['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']otherPeopleRDD=sc.parallelize(jsonStrings)otherPeople=spark.read.json(otherPeopleRDD)otherPeople.show()JSONDataSets数据源JSONdataset的DataFrame也可以是RDD[35Hive表数据源Hive表数据源SparkSQL支持对Hive中的数据进行读写。首先创建一个支持Hive的SparkSession对象,包括与Hivemetastore的连接,支持Hive的序列化和反序列化操作,支持用户定义的Hive操作等。warehouse_location=abspath('spark-warehouse')spark=SparkSession.builder\.appName("PythonSparkSQLHiveintegrationexample")\.config("spark.sql.warehouse.dir",warehouse_location)\.enableHiveSupport().getOrCreate()warehouse_location指定数据库和表的缺省位置:Hive表数据源Hive表数据源SparkSQL支持对Hi36Hive表数据源spark.sql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGhive")spark.sql("LOADDATALOCALINPATH'examples/src/main/resources/kv1.txt'INTOTABLEsrc")基于新创建的SparkSession创建表和上载数据到表中:spark.sql("SELECT*FROMsrc").show()spark.sql("SELECTCOUNT(*)FROMsrc").show()使用HiveQL进行查询:Hive表数据源spark.sql("CREATETABL37Hive表数据源sqlDF=spark.sql("SELECTkey,valueFROMsrcWHEREkey<10ORDERBYkey")SQL查询的结果也是DataFrames,可以对结果进行所有DataFrame的操作:stringsDS=sqlDF.rdd.map(lambdarow:"Key:%d,Value:%s"%(row.key,row.value))forrecordinstringsDS.collect():print(record)DataFrames中的元素的类型是Row,所以可以按次序访问列:Hive表数据源sqlDF=spark.sql("SEL38Hive表数据源Record=Row("key","value")recordsDF=spark.createDataFrame([Record(i,"val_"+str(i))foriinrange(1,101)])recordsDF.createOrReplaceTempView("records")也可以在同一个SparkSession内用DataFrames来创建临时表:Hive表数据源Record=Row("key","v39Hive表数据源创建Hive表的时候需要说明表通过哪种方式读写文件系统,以及如何序列化和反序列化数据。属性名解释fileFormat文件格式定义了存储的格式,包括serde,inputformat和outputformat。目前支持6种文件格式:sequencefile,rcfile,orc,parquet,textfile和avro。inputFormat,outputFormat必须按对出现,分别定义存储的inputformat和outputformat。不能与fileFormat同时使用。serde说明serde类的名字。当文件格式本身没有serde时候使用。Hive表数据源创建Hive表的时候需要说明表通过哪种方式读40SparkCLI指使用命令界面直接输入SQL命令,然后发送到Spark集群进行执行,在界面中显示运行过程和最终的结果。PART06SparkSQLCLI介绍SparkCLI指使用命令界面直接输入SQL命令,然后发送41SparkSQLCLI介绍SparkCLI指的是使用命令界面直接输入SQL命令,然后发送到Spark集群进行执行,在界面中显示运行过程和最终的结果。SparkSQL已经集成在SparkShell中,因此,只要启动SparkShell,就可以使用SparkSQL的Shell交互接口:bin/spark-shell--masterspark://hdp-node-01:7077或者,可以启动SparkSQL界面,使用起来更方便:bin/spark-sql--masterspark://hdp-node-01:7077SparkSQLCLI介绍SparkCLI指的是使用命42SparkSQLCLI介绍SparkSQL所有功能的入口是SQLContext类及其子类。为了创建一个基本的SQLContext,需要一个SparkContext。scala>valsqlContext=neworg.apache.spark.sql.SQLContext(sc)sqlContext:org.apache.spark.sql.SQLContext=org.apache.spark.sql.SQLContext@1943a343scala>importsqlContext.implicits._importsqlContext.implicits._SQLContextSparkSQLCLI介绍SparkSQL所有功能的入43SparkSQLCLI介绍下面的操作基于一个简单的数据文件people.json,文件的内容如下:{"name":"Michael"}{"name":"Andy","age":30}{"name":"Justin","age":19}数据文件下面语句从本地文件people.json读取数据创建DataFrame:{valdf=sqlContext.read.json("file:///data/people.json")………df:org.apache.spark.sql.DataFrame=[age:bigint,name:string]创建DataFramesSparkSQLCLI介绍下面的操作基于一个简单的数据文44Pyspark是针对Spark的PythonAPI。Spark使用py4j来实现Python与Java的互操作,从而实现使用Python编写Spark程序。Spark也同样提供了Pyspark,一个Spark的PythonShell,可以以交互的方式使用Python编写Spark程序。PART07在Pyspark中使用SparkSQLPyspark是针对Spark的PythonAPI。Spa45在Pyspark中使用SparkSQL在终端上启动Python
Spark
Shell:./bin/pyspark使用JSON文件作为数据源,创建JSON文件/home/sparksql/courses.json,并输入下面的内容:实例描述{"name":"Linux","type":"basic","length":10}{"name":"TCPIP","type":"project","length":15}{"name":"Python","type":"project","length":8}{"name":"GO","type":"basic","length":2}{"name":"Ruby","type":"basic","length":5}在Pyspark中使用SparkSQL在终端上启动Pyth46在Pyspark中使用SparkSQL首先使用SQLContext模块,其作用是提供SparkSQL处理的功能。在PysparkShell中逐步输入下面步骤的内容:引入pyspark.sql中的SQLContext:frompyspark.sqlimportSQLContext创建SQLContext对象使用pyspark的SparkContext对象,创建SQLContext对象:sqlContext=SQLContext(sc)在Pyspark中使用SparkSQL首先使用SQLCon47在Pyspark中使用SparkSQLDataFrame对象可以由RDD创建,也可以从Hive表或JSON文件等数据源创建。创建DataFrame,指明来源自JSON文件:df=sqlContext.read.json("/home/shiyanlou/courses.json")创建DataFrame对象在Pyspark中使用SparkSQLDataFrame对48在Pyspark中使用SparkSQL首先打印当前DataFrame里的内容和数据表的格式:df.select("name").show()#展示了所有的课程名df.select("name","length").show()#展示了所有的课程名及课程长度对DataFrame进行操作show()函数将打印出JSON文件中存储的数据表;使用printSchema()函数打印数据表的格式。然后对DataFrame的数据进行各种操作:df.show()df.printSchema()在Pyspark中使用SparkSQL首先打印当前Data49在Pyspark中使用SparkSQLdf.filter(df['type']=='basic').select('name','type').show()#展示了课程类型为基础课(basic)的课程名和课程类型df.groupBy("type").count().show()#计算所有基础课和项目课的数量。首先需要将DataFrame注册为Table才可以在该表上执行SQL语句:df.registerTempTable('courses')coursesRDD=sqlContext.sql("SELECTnameFROMcoursesWHERElength>=5andlength<=10")names=coursesRDD.rdd.map(lambdap:"Name:"+)fornameinnames.collect():printname执行SQL语句在Pyspark中使用SparkSQLdf.filter(50在Pyspark中使用SparkSQLParquet是SparkSQL读取的默认数据文件格式,把从JSON中读取的DataFrame保存为Parquet格式,只保存课程名称和长度两项数据:df.select("name","length").write.save("/tmp/courses.parquet",format="parquet")保存DataFrame为其他格式将创建hdfs://master:9000/tmp/courses.parquet文件夹并存入课程名称和长度数据。在Pyspark中使用SparkSQLParquet是Sp51SparkSQL实现了ThriftJDBC/ODBCserver,所以Java程序可以通过JDBC远程连接SparkSQL发送SQL语句并执行。PART08在Java中连接SparkSQLSparkSQL实现了ThriftJDBC/ODBCs52在Java中连接SparkSQL首先将${HIVE_HOME}/conf/hive-site.xml拷贝到${SPARK_HOME}/conf目录下。另外,因为Hive元数据信息存储在MySQL中,所以Spark在访问这些元数据信息时需要MySQL连接驱动的支持。添加驱动的方式有三种:在${SPARK_HOME}/conf目录下的spark-defaults.conf中添加:spark.jars/opt/lib2/mysql-connector-java-5.1.26-bin.jar
;可以实现添加多个依赖jar比较方便:spark.driver.extraClassPath/opt/lib2/mysql-connector-java-5.1.26-bin.jar;设置配置在Java中连接SparkSQL首先将${HIVE_HOM53在运行时添加--jars
/opt/lib2/mysql-connector-java-5.1.26-bin.jar做完上面的准备工作后,SparkSQL和Hive就继承在一起了,SparkSQL可以读取Hive中的数据。设置配置启动Thrift在Spark根目录下执行:./sbin/start-thriftserver.sh开启thrift服务器,它可以接受所有spark-submit的参数,并且还可以接受--hiveconf参数。不添加任何参数表示以local方式运行,默认的监听端口为10000在Java中连接SparkSQL在运行时添加--jars
/opt/lib2/mysql-54添加依赖打开Eclipse用JDBC连接HiveServer2。新建一个Maven项目,在pom.xml添加以下依赖:<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.4.1</version>
</dependency>
在Java中连接SparkSQL添加依赖打开Eclipse用JDBC连接HiveServe55添加依赖<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.6</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
在Java中连接SparkSQL添加依赖<dependency>
在Java中连接Spar56JDBC连接HiveServer2的相关参数:驱动:org.apache.hive.jdbc.HiveDriverurl:jdbc:hive2://31:10000/default用户名:hadoop(启动thriftserver的linux用户名)密码:“”(默认密码为空)JDBC连接参数在Java中连接SparkSQLJDBC连接HiveServer2的相关参数:JDBC连接57import
java.sql.Connection;
import
java.sql.DriverManager;
import
java.sql.ResultSet;
import
java.sql.SQLException;
import
java.sql.Statement;
public
class
Test1
{
public
static
void
main(String[]
args)
throws
SQLException
{
String
url
=
"jdbc:hive2://31:10000/default";
try
{
Class.forName("org.apache.hive.jdbc.HiveDriver");
}
catch
(ClassNotFoundException
e)
{
e.printStackTrace();
}
Connection
conn
=
DriverManager.getConnection(url,"hadoop","");
Statement
stmt
=
conn.createStatement();
String
sql
=
"SELECT
*
FROM
doc1
limit
10";
System.out.println("Running"+sql);
ResultSet
res
=
stmt.executeQuery(sql);
while(res.next()){
System.out.println("id:
"+res.getInt(1)+"\ttype:
"+res.getString(2)+"\tauthors:
"+res.getString(3)+"\ttitle:
"+res.getString(4)+"\tyear:"
+
res.getInt(5));
}
}
}
JDBC连接参数在Java中连接SparkSQLimport
java.sql.Connection;
J58PART09作业
PART09作业59作业作业:什么是SparkSQL?其主要目的是什么?SparkSQL的执行流程有哪几个步骤?在SparkSQL中,什么是DataFrame?使用DataFrame的优势是什么?DataFrame与RDD的主要区别是什么?使用SparkSQL的方式有哪几种?使用SparkSQL的步骤是什么?常用的SparkSQL的数据源有哪些?Parquet文件格式是什么?它的主要特点是什么?为了使Java程序可以通过JDBC远程连接SparkSQL,需要做哪些准备工作?连接数据库的语句是什么?有哪些参数?作业作业:什么是SparkSQL?其主要目的是什么?60作业作业:请按下述要求写出相应的SparkSQL语句:从一个本地JSON文件创建DataFrame;打印DataFrame元数据;按照列属性过滤DataFrame的数据;返回某列满足条件的数据;把DataFrame注册成数据库表。作业作业:请按下述要求写出相应的SparkSQL语句:61谢谢FORYOURLISTENINGHandgeCO.LTD.2016.12.09谢谢FORYOURLISTENINGHandgeCO.62
大数据导论第十二章大数据导论第十二章63CONTENTS目录PART01SPARKSQL简介PART02SPARKSQL执行流程PART03基础数据模型DATAFRAMEPART04使用SparkSQL的方式PART05SPARKSQL数据源PART06SPARKSQLCLI介绍PART07在Pyspark中使用SparkSQLPART08在Java中连接SparkSQLPART09习题CONTENTS目录PART01SPARKSQL简介64PART01SparkSQL简介SparkSQL是一个用来处理结构化数据的Spark组件,为Spark提供了查询结构化数据的能力。SparkSQL可被视为一个分布式的SQL查询引擎,可以实现对多种数据格式和数据源进行SQL操作,包括Parquet,Hive,MongoDB,JSON、HDFS、JDBC、S3和RDD等。PART01SparkSQL简介SparkSQL65SparkSQL简介SparkSQL介绍:SparkSQL是为了处理结构化数据的一个Spark模块。不同于SparkRDD的基本API,SparkSQL接口拥有更多关于数据结构本身与执行计划等更多信息。在Spark内部,SparkSQL可以利用这些信息更好地对操作进行优化。SparkSQL提供了三种访问接口:SQL,DataFrameAPI和DatasetAPI。当计算引擎被用来执行一个计算时,有不同的API和语言种类可供选择。这种统一性意味着开发人员可以来回轻松切换各种最熟悉的API来完成同一个计算工作。SparkSQL简介SparkSQL介绍:66SparkSQL简介SparkSQL具有如下特点数据兼容方面:能加载和查询来自各种来源的数据。性能优化方面:除了采取内存列存储、代码生成等优化技术外,还引进成本模型对查询进行动态评估、获取最佳物理计划等;组件扩展方面:无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。标准连接:SparkSQL包括具有行业标准JDBC和ODBC连接的服务器模式。SparkSQL简介SparkSQL具有如下特点67SparkSQL简介SparkSQL具有如下特点集成:无缝地将SQL查询与Spark程序混合。SparkSQL允许将结构化数据作为Spark中的分布式数据集(RDD)进行查询,在Python,Scala和Java中集成了API。这种紧密的集成使得SQL查询以及复杂的分析算法可以轻松地运行。可扩展性:对于交互式查询和长查询使用相同的引擎。SparkSQL利用RDD模型来支持查询容错,使其能够扩展到大型作业,不需担心为历史数据使用不同的引擎。SparkSQL简介SparkSQL具有如下特点68PART02SparkSQL执行流程PART02SparkSQL执行流程69SparkSQL执行流程类似于关系型数据库,SparkSQL语句也是由Projection(a1,a2,a3)、DataSource(tableA)、Filter(condition)三部分组成,分别对应SQL查询过程中的Result、DataSource、Operation,也就是说SQL语句按Result-->DataSource-->Operation的次序来描述的。SparkSQL执行流程类似于关系型数据库,SparkS70SparkSQL执行流程解析(Parse)对读入的SQL语句进行解析,分辨出SQL语句中哪些词是关键词(如SELECT、FROM、WHERE),哪些是表达式、哪些是Projection、哪些是DataSource等,从而判断SQL语句是否规范;绑定(Bind)将SQL语句和数据库的数据字典(列、表、视图等)进行绑定,如果相关的Projection、DataSource等都存在,则这个SQL语句是可以执行的;SparkSQL执行流程解析(Parse)71SparkSQL执行流程优化(Optimize)一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划;执行(Execute)按Operation-->DataSource-->Result的次序来执行计划。在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。SparkSQL执行流程优化(Optimize)72PART03基础数据模型DataFrameDataFrame是由“命名列”(类似关系表的字段定义)所组织起来的一个分布式数据集合,可以把它看成是一个关系型数据库的表。PART03基础数据模型DataFrameDataFra73基础数据模型DataFrameDataFrame是SparkSQL的核心,它将数据保存为行构成的集合,行对应列有相应的列名。DataFrame与RDD的主要区别在于,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得SparkSQL可以掌握更多的结构信息,从而能够对DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。基础数据模型DataFrameDataFrame是Spark74基础数据模型DataFrameDataFrame与RDD的对比:基础数据模型DataFrameDataFrame与RDD的对75PART04使用SparkSQL的方式PART04使用SparkSQL的方式76使用SparkSQL的方式使用SparkSQL,首先利用sqlContext从外部数据源加载数据为DataFrame;然后,利用DataFrame上丰富的API进行查询、转换;最后,将结果进行展现或存储为各种外部数据形式。Spark
SQL
为Spark提供了查询结构化数据的能力,查询时既可以使用SQL也可以使用DataFrame
API(RDD)。通过ThriftServer,Spark
SQL支持多语言编程包括Java、Scala、Python及R。使用SparkSQL的方式使用SparkSQL,首先利用77使用SparkSQL的方式使用SparkSQL的方式78使用SparkSQL的方式加载数据①.从Hive中的users表构造DataFrame:users=sqlContext.table("users")②.加载S3上的JSON文件:logs=sqlContext.load("s3n://path/to/data.json","json")③.加载HDFS上的Parquet文件:clicks=sqlContext.load("hdfs://path/to/data.parquet","parquet")使用SparkSQL的方式加载数据①.从Hive中的us79使用SparkSQL的方式加载数据④.通过JDBC访问MySQL:comments=sqlContext.jdbc("jdbc:mysql://localhost/comments","user")⑤.将普通RDD转变为DataFrame:rdd=sparkContext.textFile(“article.txt”).flatMap(_.split("")).map((_,1)).reduceByKey(_+_)wordCounts=sqlContext.createDataFrame(rdd,["word","count"])使用SparkSQL的方式加载数据④.通过JDBC访问M80使用SparkSQL的方式加载数据⑥.将本地数据容器转变为DataFrame:data=[("Alice",21),("Bob",24)]people=sqlContext.createDataFrame(data,["name","age"])⑦.将Pandas
DataFrame转变为Spark
DataFrame(Python
API特有功能):sparkDF
=
sqlContext.createDataFrame(pandasDF)使用SparkSQL的方式加载数据⑥.将本地数据容器转变81使用SparkSQL的方式使用DataFrame①.创建一个只包含"年轻"用户的DataFrame:young=users.filter(users.age<21)②.也可以使用Pandas风格的语法:young=users[users.age<21]③.将所有人的年龄加1:young.select(,young.age+1)使用SparkSQL的方式使用DataFrame①.创建82使用SparkSQL的方式使用DataFrame④.统计年轻用户中各性别人数:young.groupBy("gender").count()⑤.将所有年轻用户与另一个名为logs的DataFrame联接起来:young.join(logs,logs.userId==users.userId,"left_outer")使用SparkSQL的方式使用DataFrame④.统计83使用SparkSQL的方式保存结果①.追加至HDFS上的Parquet文件:young.save(path="hdfs://path/to/data.parquet",source="parquet",mode="append")②.覆写S3上的JSON文件:young.save(path="s3n://path/to/data.json",source="json",mode="append")使用SparkSQL的方式保存结果①.追加至HDFS上的84使用SparkSQL的方式保存结果③.保存为SQL表:young.saveAsTable(tableName="young",source="parquet"mode="overwrite")④.转换为PandasDataFrame(PythonAPI特有功能):pandasDF=young.toPandas()⑤.以表格形式打印输出:young.show()使用SparkSQL的方式保存结果③.保存为SQL表:85SparkSQL通过DataFrame接口使用多种数据源。应用程序可以直接使用关系型转换对DataFrame进行操作,也可以用来创建临时表。把DataFrame注册为临时表后就可以使用SQL对其数据进行查询PART05SparkSQL数据源SparkSQL通过DataFrame接口使用多种数据源。86通用Load方法SparkSQL使用数据源的最简单模式就是对缺省数据源类型来进行各种操作。缺省数据源类型是parquet文件df=spark.read.load("examples/src/main/resources/users.parquet")df.select("name","favorite_color").write.save("namesAndFavColors.parquet")也可以通过load()函数的参数来说明数据源的类型:df=spark.read.load("examples/src/main/resources/people.json",format="json")df.select("name","age").write.save("namesAndAges.parquet",format="parquet")通用Load方法SparkSQL使用数据源的最简单模式就是87通用Load方法也可以不把文件上载到DataFrame,然后再对其进行查询操作,而是直接对文件进行SQL查询:df=spark.sql("SELECT*FROMparquet.examples/src/main/resources/users.parquet")通用Load方法也可以不把文件上载到DataFrame,然后88通用Save方法Save方法Save()操作的可选参数SaveMode用来说明如何处理已有数据。需要提醒的是这些保存操作不是原子操作,如果选择的是Overwrite模式,Save()操作会首先删除已有的数据,然后才写入新的数据。SaveMode的可选值:Scala/Java任何语言解释SaveMode.ErrorIfExists"error"(缺省值)如果有数据存在,就抛异常。SaveMode.Append"append"如果有数据/表存在,就添加新数据到老数据后面。SaveMode.Overwrite"overwrite"如果有数据/表存在,就用新数据覆盖老数据。SaveMode.Ignore"ignore"如果有数据存在,就不写入新数据。通用Save方法Save方法Save()操作的可选参数Sav89通用Save方法基于文件的数据源,可以用path可选参数定义一个数据库表的路径,比如:df.write.option(“path”,“/some/path”).saveAsTable(“t”)即使数据库表删除后,该数据库表路径和数据也不会被删除。如果程序不指定数据库表路径,Spark会把数据写到一个缺省的数据库表路径下。但是在这种情况下,当数据库被删除的时候,缺省数据库表的路径将会被删除。通用Save方法基于文件的数据源,可以用path可选参数定义90通用Save方法对基于文件的数据源还可以对其输出进行分桶、排序和分割,分桶和排序只能用于当输出方式为持久数据库表的时候。df.write.bucketBy(42,"name").sortBy("age").saveAsTable("people_bucketed")当进行分割时,可以使用DatasetAPI接口的save()和saveAsTable()方法:df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")通用Save方法对基于文件的数据源还可以对其输出进行分桶、排91通用Save方法还可以对同一张表同时进行分割和分桶:df=spark.read.parquet("examples/src/main/resources/users.parquet")(df.write.partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("people_partitioned_bucketed"))通用Save方法还可以对同一张表同时进行分割和分桶:df=92Parquet文件数据源Parquet文件数据源SparkSQL支持对Parquet文件的读写操作,并自动保持源数据的模式。为了兼容性,在写Parquet文件的时候,所有的列会自动转换为可为空的列。把DataFrame保存为Parquet文件,格式信息会自动保留。peopleDF=spark.read.json("examples/src/main/resources/people.json")df.write.partitionBy("favor_color").format("parquet").save("namesPartByColor.parquet")Parquet文件数据源Parquet文件数据源Spark93读取上面存储的Parquet文件为DataFrameParquet文件也可以用来创建临时表供SQL查询使用parquetFile=spark.read.parquet("people.parquet")parquetFile.createOrReplaceTempView("parquetFile")teenagers=spark.sql("SELECTnameFROMparquetFileWHEREage>=13ANDage<=19")teenagers.show()Parquet文件数据源读取上面存储的Parquet文件为DataFrameParq94JSONDataSets数据源JSONDataSets数据源SparkSQL可以自动根据JSONDataSet的格式把其上载为DataFrame。用路径指定JSONdataset;路径下可以是一个文件,也可以是多个文件:sc=spark.sparkContextpath="examples/src/main/resources/people.json"peopleDF=spark.read.json(path)使用的结构可以调用printSchema()方法打印:peopleDF.printSchema()JSONDataSets数据源JSONDataSets95利用DataFrame创建一个临时表:使用Spark的sql方法进行SQL查询:peopleDF.createOrReplaceTempView("people")teenagerNamesDF=spark.sql("SELECTnameFROMpeopleWHEREageBETWEEN13AND19")teenagerNamesDF.show()JSONDataSets数据源利用DataFrame创建一个临时表:使用Spark的sql96JSONdataset的DataFrame也可以是RDD[String]格式,每个JSON对象为一个string:jsonStrings=['{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}']otherPeopleRDD=sc.parallelize(jsonStrings)otherPeople=spark.read.json(otherPeopleRDD)otherPeople.show()JSONDataSets数据源JSONdataset的DataFrame也可以是RDD[97Hive表数据源Hive表数据源SparkSQL支持对Hive中的数据进行读写。首先创建一个支持Hive的SparkSession对象,包括与Hivemetastore的连接,支持Hive的序列化和反序列化操作,支持用户定义的Hive操作等。warehouse_location=abspath('spark-warehouse')spark=SparkSession.builder\.appName("PythonSparkSQLHiveintegrationexample")\.config("spark.sql.warehouse.dir",warehouse_location)\.enableHiveSupport().getOrCreate()warehouse_location指定数据库和表的缺省位置:Hive表数据源Hive表数据源SparkSQL支持对Hi98Hive表数据源spark.sql("CREATETABLEIFNOTEXISTSsrc(keyINT,valueSTRING)USINGhive")spark.sql("LOADDATALOCALINPATH'examples/src/main/resources/kv1.txt'INTOTABLEsrc")基于新创建的SparkSession创建表和上载数据到表中:spark.sql("SELECT*FROMsrc").show()spark.sql("SELECTCOUNT(*)FROMsrc").show()使用HiveQL进行查询:Hive表数据源spark.sql("CREATETABL99Hive表数据源sqlDF=spark.sql("SELECTkey,valueFROMsrcWHEREkey<10ORDERBYkey")SQL查询的结果也是DataFrames,可以对结果进行所有DataFrame的操作:stringsDS=sqlDF.rdd.map(lambdaro
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 农场出租带猪圈合同范例
- 危房申请合同范例
- 公司餐饮配送合同范例
- 上海郊区房屋租赁合同范例
- 公司水果采购合同范例
- 厂区安保合同范例
- 书画加盟合同范例
- 个人房子买卖合同范例
- 厨房装潢合同范例
- 厂区修路合同范例
- GB/T 7588.2-2020电梯制造与安装安全规范第2部分:电梯部件的设计原则、计算和检验
- GB/T 17457-2019球墨铸铁管和管件水泥砂浆内衬
- 紧急采购申请单
- 小学道德与法治学科高级(一级)教师职称考试试题(有答案)
- 复旦大学英语水平测试大纲9300词汇表讲义
- 最新青岛版科学四年级上册《温度计的秘密》优质课件
- DB63-T 1675-2018+建筑消防设施维护保养技术规范
- 四年级上册美术课件-第12课 精美的邮票 ▏人教新课标 ( ) (共21张PPT)
- 炉内水处理磷酸盐处理(PT)、磷酸盐隐藏汇总课件
- 边坡变形观测报告
- 零星材料明细单
评论
0/150
提交评论