大数据导论思维、技术与应用第12章SPARKSQL课件_第1页
大数据导论思维、技术与应用第12章SPARKSQL课件_第2页
大数据导论思维、技术与应用第12章SPARKSQL课件_第3页
大数据导论思维、技术与应用第12章SPARKSQL课件_第4页
大数据导论思维、技术与应用第12章SPARKSQL课件_第5页
已阅读5页,还剩57页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

1、 大数据导论第十二章CONTENTS目录PART 01 SPARK SQL简介PART 02 SPARK SQL执行流程PART 03 基础数据模型DATAFRAMEPART 04 使用Spark SQL的方式PART 05 SPARK SQL数据源PART 06 SPARK SQL CLI介绍PART 07在Pyspark中使用Spark SQLPART 08 在Java中连接Spark SQLPART 09 习题PART 01 Spark SQL简介Spark SQL是一个用来处理结构化数据的Spark组件,为Spark提供了查询结构化数据的能力。Spark SQL可被视为一个分布式的SQ

2、L查询引擎,可以实现对多种数据格式和数据源进行SQL操作,包括Parquet,Hive,MongoDB,JSON、HDFS、JDBC、S3和RDD等。Spark SQL简介Spark SQL介绍:Spark SQL是为了处理结构化数据的一个Spark 模块。不同于Spark RDD的基本API,Spark SQL接口拥有更多关于数据结构本身与执行计划等更多信息。在Spark内部,Spark SQL可以利用这些信息更好地对操作进行优化。Spark SQL提供了三种访问接口:SQL,DataFrame API和Dataset API。当计算引擎被用来执行一个计算时,有不同的API和语言种类可供选择

3、。这种统一性意味着开发人员可以来回轻松切换各种最熟悉的API来完成同一个计算工作。Spark SQL简介Spark SQL具有如下特点数据兼容方面:能加载和查询来自各种来源的数据。 性能优化方面:除了采取内存列存储、代码生成等优化技术外,还引进成本模型对查询进行动态评估、获取最佳物理计划等; 组件扩展方面:无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展。标准连接:Spark SQL包括具有行业标准JDBC和ODBC连接的服务器模式。Spark SQL简介Spark SQL具有如下特点集成:无缝地将SQL查询与Spark程序混合。 Spark SQL允许将结构化数据作为Spa

4、rk中的分布式数据集(RDD)进行查询,在Python,Scala和Java中集成了API。这种紧密的集成使得SQL查询以及复杂的分析算法可以轻松地运行。可扩展性:对于交互式查询和长查询使用相同的引擎。Spark SQL利用RDD模型来支持查询容错,使其能够扩展到大型作业,不需担心为历史数据使用不同的引擎。PART 02 Spark SQL执行流程Spark SQL执行流程类似于关系型数据库,Spark SQL语句也是由Projection(a1,a2,a3)、 Data Source (tableA)、 Filter(condition)三部分组成,分别对应SQL查询过程中的Result、D

5、ata Source、 Operation,也就是说SQL语句按Result-Data Source-Operation的次序来描述的。Spark SQL执行流程解析(Parse)对读入的SQL语句进行解析,分辨出SQL语句中哪些词是关键词(如SELECT、 FROM、WHERE),哪些是表达式、哪些是 Projection、哪些是 Data Source 等,从而判断SQL语句是否规范; 绑定(Bind)将SQL语句和数据库的数据字典(列、表、视图等)进行绑定,如果相关的Projection、Data Source等都存在,则这个SQL语句是可以执行的; Spark SQL执行流程优化(Op

6、timize)一般的数据库会提供几个执行计划,这些计划一般都有运行统计数据,数据库会在这些计划中选择一个最优计划; 执行(Execute)按Operation-Data Source-Result 的次序来执行计划。在执行过程有时候甚至不需要读取物理表就可以返回结果,比如重新运行刚运行过的SQL语句,可能直接从数据库的缓冲池中获取返回结果。PART 03 基础数据模型DataFrameDataFrame是由“命名列”(类似关系表的字段定义)所组织起来的一个分布式数据集合,可以把它看成是一个关系型数据库的表。基础数据模型DataFrameDataFrame是Spark SQL的核心,它将数据保存

7、为行构成的集合,行对应列有相应的列名。DataFrame与RDD的主要区别在于,DataFrame带有Schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL可以掌握更多的结构信息,从而能够对DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。基础数据模型DataFrameDataFrame与RDD的对比:PART 04 使用Spark SQL的方式使用Spark SQL的方式使用Spark SQL,首先利用sqlContext从外部数据源加载数据为DataFrame;然

8、后,利用DataFrame上丰富的API进行查询、转换;最后,将结果进行展现或存储为各种外部数据形式。SparkSQL为Spark提供了查询结构化数据的能力,查询时既可以使用SQL也可以使用DataFrameAPI(RDD)。通过Thrift Server,SparkSQL支持多语言编程包括Java、Scala、Python及R。使用Spark SQL的方式使用Spark SQL的方式加载数据. 从Hive中的users表构造DataFrame:users = sqlContext.table(users). 加载S3上的JSON文件:logs = sqlContext.load(s3n:/p

9、ath/to/data.json, json). 加载HDFS上的Parquet文件:clicks = sqlContext.load(hdfs:/path/to/data.parquet, parquet)使用Spark SQL的方式加载数据. 通过JDBC访问MySQL:comments = sqlContext.jdbc(jdbc:mysql:/localhost/comments, user). 将普通RDD转变为DataFrame:rdd = sparkContext.textFile(“article.txt”) .flatMap(_.split( ) .map(_, 1) .re

10、duceByKey(_+_) wordCounts = sqlContext.createDataFrame(rdd, word, count)使用Spark SQL的方式加载数据. 将本地数据容器转变为DataFrame:data = (Alice, 21), (Bob, 24)people = sqlContext.createDataFrame(data, name, age). 将PandasDataFrame转变为SparkDataFrame(PythonAPI特有功能):sparkDF=sqlContext.createDataFrame(pandasDF)使用Spark SQL的

11、方式使用DataFrame. 创建一个只包含年轻用户的DataFrame :young = users.filter(users.age 21) . 也可以使用Pandas风格的语法: young = usersusers.age = 13 AND age = 19)teenagers.show()Parquet文件数据源JSON DataSets 数据源JSON DataSets 数据源Spark SQL可以自动根据JSON DataSet的格式把其上载为DataFrame。用路径指定JSON dataset;路径下可以是一个文件,也可以是多个文件:sc = spark.sparkConte

12、xtpath = examples/src/main/resources/people.jsonpeopleDF = spark.read.json(path)使用的结构可以调用printSchema()方法打印:peopleDF.printSchema()利用DataFrame创建一个临时表:使用Spark的sql方法进行SQL查询:peopleDF.createOrReplaceTempView(people)teenagerNamesDF = spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19)teenagerNam

13、esDF.show()JSON DataSets 数据源JSON dataset的DataFrame也可以是RDDString 格式,每个JSON对象为一个string:jsonStrings = name:Yin,address:city:Columbus,state:OhiootherPeopleRDD = sc.parallelize(jsonStrings)otherPeople = spark.read.json(otherPeopleRDD)otherPeople.show()JSON DataSets 数据源Hive表数据源Hive表数据源Spark SQL支持对Hive中的数据

14、进行读写。首先创建一个支持Hive的SparkSession对象,包括与Hive metastore的连接,支持Hive的序列化和反序列化操作,支持用户定义的Hive操作等。warehouse_location = abspath(spark-warehouse)spark = SparkSession .builder .appName(Python Spark SQL Hive integration example) .config(spark.sql.warehouse.dir, warehouse_location) .enableHiveSupport() .getOrCreate

15、()warehouse_location 指定数据库和表的缺省位置:Hive表数据源spark.sql(CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive)spark.sql(LOAD DATA LOCAL INPATH examples/src/main/resources/kv1.txt INTO TABLE src)基于新创建的SparkSession创建表和上载数据到表中:spark.sql(SELECT * FROM src).show()spark.sql(SELECT COUNT(*) FROM sr

16、c).show()使用HiveQL进行查询:Hive表数据源sqlDF = spark.sql(SELECT key, value FROM src WHERE key val sqlContext = new org.apache.spark.sql.SQLContext(sc)sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext1943a343scala import sqlContext.implicits._import sqlContext.implicits._SQLContext

17、Spark SQL CLI介绍下面的操作基于一个简单的数据文件people.json,文件的内容如下:name:Michaelname:Andy, age:30name:Justin, age:19数据文件下面语句从本地文件people.json读取数据创建DataFrame:val df = sqlContext.read.json(file:/data/people. json)df: org.apache.spark.sql.DataFrame = age: bigint, name: string创建DataFramesPyspark是针对Spark的Python API。Spark使

18、用py4j来实现Python与Java的互操作,从而实现使用Python编写Spark程序。Spark也同样提供了Pyspark,一个Spark的Python Shell,可以以交互的方式使用Python编写Spark程序。PART 07 在Pyspark中使用Spark SQL在Pyspark中使用Spark SQL在终端上启动PythonSparkShell:./bin/pyspark使用JSON文件作为数据源,创建JSON文件/home/sparksql/courses.json,并输入下面的内容:实例描述name:Linux, type:basic, length:10name:TCP

19、IP, type:project, length:15name:Python, type:project, length:8name:GO, type:basic, length:2name:Ruby, type:basic, length:5在Pyspark中使用Spark SQL首先使用SQLContext模块,其作用是提供Spark SQL处理的功能。在Pyspark Shell中逐步输入下面步骤的内容:引入pyspark.sql中的SQLContext:from pyspark.sql import SQLContext创建SQLContext对象使用pyspark的SparkCont

20、ext对象,创建SQLContext对象:sqlContext = SQLContext(sc)在Pyspark中使用Spark SQLDataFrame对象可以由RDD创建,也可以从Hive表或JSON文件等数据源创建。创建DataFrame,指明来源自JSON文件:df = sqlContext.read.json(/home/shiyanlou/courses.json)创建DataFrame对象在Pyspark中使用Spark SQL首先打印当前DataFrame里的内容和数据表的格式:df.select(name).show()#展示了所有的课程名df.select(name, le

21、ngth).show()#展示了所有的课程名及课程长度对DataFrame进行操作show()函数将打印出JSON文件中存储的数据表;使用printSchema()函数打印数据表的格式。然后对DataFrame的数据进行各种操作:df.show() df.printSchema()在Pyspark中使用Spark SQLdf.filter(dftype = basic).select(name, type).show()#展示了课程类型为基础课(basic)的课程名和课程类型df.groupBy(type).count().show()#计算所有基础课和项目课的数量。首先需要将DataFram

22、e注册为Table才可以在该表上执行SQL语句:df.registerTempTable(courses)coursesRDD = sqlContext.sql(SELECT name FROM courses WHERE length = 5 and length = 10)names = coursesRDD.rdd.map(lambda p: Name: + )for name in names.collect(): print name执行SQL语句在Pyspark中使用Spark SQLParquet是Spark SQL读取的默认数据文件格式,把从JSON中读取的Data

23、Frame保存为Parquet格式,只保存课程名称和长度两项数据:df.select(name, length).write.save(/tmp/courses.parquet, format=parquet)保存 DataFrame为其他格式将创建hdfs:/master:9000/tmp/courses.parquet文件夹并存入课程名称和长度数据。Spark SQL实现了Thrift JDBC/ODBC server,所以Java程序可以通过JDBC远程连接Spark SQL发送SQL语句并执行。PART 08 在Java中连接Spark SQL在Java中连接Spark SQL首先将$

24、HIVE_HOME/conf/hive-site.xml 拷贝到$SPARK_HOME/conf目录下。另外,因为Hive元数据信息存储在MySQL中,所以Spark在访问这些元数据信息时需要MySQL连接驱动的支持。添加驱动的方式有三种:在$SPARK_HOME/conf目录下的spark-defaults.conf中添加:spark.jars /opt/lib2/mysql-connector-java-5.1.26-bin.jar;可以实现添加多个依赖jar比较方便:spark.driver.extraClassPath /opt/lib2/mysql-connector-java-5.

25、1.26-bin.jar;设置配置在运行时添加 -jars/opt/lib2/mysql-connector-java-5.1.26-bin.jar做完上面的准备工作后,Spark SQL和Hive就继承在一起了,Spark SQL可以读取Hive中的数据。设置配置启动Thrift在Spark根目录下执行:./sbin/start-thriftserver.sh开启thrift服务器,它可以接受所有spark-submit的参数,并且还可以接受-hiveconf 参数。不添加任何参数表示以local方式运行,默认的监听端口为10000 在Java中连接Spark SQL添加依赖打开Eclips

26、e用JDBC连接Hive Server2。新建一个Maven项目,在pom.xml添加以下依赖:org.apache.hivehive-jdbc1.2.1org.apache.hadoophadoop-common2.4.1在Java中连接Spark SQL添加依赖jdk.toolsjdk.tools1.6system$JAVA_HOME/lib/tools.jar在Java中连接Spark SQLJDBC连接Hive Server2的相关参数:驱动:org.apache.hive.jdbc.HiveDriverurl:jdbc:hive2:/31:10000/default用户名:hadoop (启动thriftserver的linux用户名)密码:“”(默认密码为空)JDBC连接参数在Java中连接Spark SQLimportjava.sql.Connection;importjava.sql.DriverManager;importjava.sql.ResultSet;importjava.sql.SQLException;importjava.sql.Statement;publicclassTest1publicstaticvoidmain(Stringargs)throwsSQLExceptionStringurl=jdbc:hive2:

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论