版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
1、Spark SQL数据加载和保存实战一:前置知识详解:Spark SQL 重要是操作 DataFrame ,DataFrame 本身提供了 save 和 load 的操作, Load :可以创建 DataFrame ,Save :把 DataFrame中的数据保存到文件或者说与具体的格式来指明我们要读取的文件的类型以及与具体的格式来指出我们要输出的文件是什么类型。二: Spark SQL读写数据代码实战:import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spa
2、rk.api.java.JavaSparkContext;import org.apache.spark.api.java.function.Function;import org.apache.spark.sql.*;import org.apache.spark.sql.types.DataTypes;import org.apache.spark.sql.types.StructField;import org.apache.spark.sql.types.StructType;import java.util.ArrayList;import java.util.List;public
3、 class SparkSQLLoadSaveOps public static void main(String args) SparkConfconfSparkConf().setMaster(local).setAppName(SparkSQLLoadSaveOps);JavaSparkContext sc = new JavaSparkContext(conf);SQLContext = new SQLContext(sc);/* read() 是 DataFrameReader类型, load 可以将数据读取出来*/=newDataFrame peopleDF = sqlContex
4、t.read().format(json).load(E:SparkSparkinstanll_packageBig_Data_Softwarespa rk-1.6.0-bin-hadoop2.6examplessrcmainresourcespeople.json);/* 直接对 DataFrame进行操作* Json:是一种自解释的格式,读取Json 的时候怎么判断其是什么格式?通过扫描整个 Json 。扫描之后才会知道元数据*/通过 mode 来指定输出文件的是 append 。创建新文件来追加文件peopleDF.select(name).write().mode(SaveMode.A
5、ppend).save(E:personNames);读取过程源码分析如下:1. read方法返回 DataFrameReader,用于读取数据。/*: Experimental :Returns a DataFrameReader that can be used to read data in as a DataFrame.sqlContext.read.parquet(/path/to/file.parquet)sqlContext.read.schema(schema).json(/path/to/file.json)* group genericdata* since 1.4.0*/
6、Experimental/ 创建 DataFrameReader实例,获得了DataFrameReader引用def read: DataFrameReader = new DataFrameReader(this)然后再调用 DataFrameReader 类中的 format ,指出读取文件的格式。/*Specifies the input data source format.since 1.4.0*/def format(source: String): DataFrameReader = this.source = sourcethis3.通过 DtaFrameReader中 loa
7、d 方法通过路径把传入过来的输入变成DataFrame。/*Loads input in as a DataFrame, for data sources that require a path (e.g. data backed bya local or distributed file system).*since 1.4.0*/TODO: Remove this one in Spark 2.0. def load(path: String): DataFrame = option(path, path).load()至此,数据的读取工作就完成了,下面就对DataFrame进行操作。下面
8、就是写操作!1. 调用 DataFrame中 select函数进行对列筛选/*Selects a set of columns. This is a variant of select that can only selectexisting columns using column names (i.e. cannot construct expressions).*/ The following two are equivalent:df.select(colA, colB)df.select($colA, $colB)group dfopssince 1.3.0*/scala.annot
9、ation.varargsdef select(col: String, cols: String*): DataFrame = select(col +: cols).map(Column(_) : _*)然后通过 write 将结果写入到外部存储系统中。/*: Experimental :Interface for saving the content of the DataFrame out into external storage.group outputsince 1.4.0*/Experimentaldef write: DataFrameWriter = new DataFra
10、meWriter(this)在保持文件的时候 mode 指定追加文件的方式/* Specifies the behavior when data or table already exists. Options include:/ Overwrite是覆盖- SaveMode.Overwrite: overwrite the existing data.创建新的文件,然后追加- SaveMode.Append: append the data.- SaveMode.Ignore: ignore the operation (i.e. no-op).- SaveMode.ErrorIfExist
11、s: default option, throw an exception at runtime.since 1.4.0*/def mode(saveMode: SaveMode): DataFrameWriter = this.mode = saveModethis最后, save() 方法触发 action ,将文件输出到指定文件中。/*Saves the content of the DataFrame at the specified path.since 1.4.0*/def save(path: String): Unit = this.extraOptions += (path
12、- path)save()三: Spark SQL读写整个流程图如下:这里写图片描述四:对于流程中部分函数源码详解:DataFrameReader.Load()Load ()返回 DataFrame 类型的数据集合,使用的数据是从默认的路径读取。/*Returns the dataset stored at path as a DataFrame,using the default data source configured by spark.sql.sources.default.group genericdatadeprecated As of 1.4.0, replaced by re
13、ad().load(path). This will be removed in Spark 2.0.*/deprecated(Use read.load(path). This will be removed in Spark 2.0., 1.4.0)def load(path: String): DataFrame = / 此时的 read 就是 DataFrameReaderread.load(path)追踪 load 源码进去,源码如下:在 DataFrameReader中的方法。 Load() 通过路径把输入传进来变成一个DataFrame。/*Loads input in as a
14、 DataFrame, for data sources that require a path (e.g. data backed bya local or distributed file system).*since 1.4.0*/TODO: Remove this one in Spark 2.0. def load(path: String): DataFrame = option(path, path).load()追踪 load 源码如下:/*Loads input in as a DataFrame, for data sources that dont require a p
15、ath (e.g. externalkey-value stores).*since 1.4.0*/def load(): DataFrame = 对传入的 Source 进行解析val resolved = ResolvedDataSource(sqlContext,userSpecifiedSchema = userSpecifiedSchema,partitionColumns = Array.emptyString,provider = source,options = extraOptions.toMap)DataFrame(sqlContext, LogicalRelation(r
16、esolved.relation)DataFrameReader.format()1. Format:具体指定文件格式,这就获得一个巨大的启示是:如果是Json文件格式可以保持为Parquet等此类操作。Spark SQL在读取文件的时候可以指定读取文件的类型。例如,Json,Parquet./* Specifies the input data source format.Built-in options include“parquet”,”json ”,etc.* since 1.4.0*/def format(source: String): DataFrameReader = this
17、.source = source /FileTypethisDataFrame.write()1. 创建 DataFrameWriter实例/*: Experimental :Interface for saving the content of the DataFrame out into external storage.group outputsince 1.4.0*/Experimentaldef write: DataFrameWriter = new DataFrameWriter(this)2.追踪 DataFrameWriter源码如下:以 DataFrame的方式向外部存储系
18、统中写入数据。/*: Experimental :Interface used to write a DataFrame to external storage systems (e.g. file systems,key-value stores, etc). Use DataFrame.write to access this.*since 1.4.0*/Experimentalfinal class DataFrameWriter privatesql(df: DataFrame) DataFrameWriter.mode()Overwrite 是覆盖,之前写的数据全都被覆盖了。Appe
19、nd: 是追加,对于普通文件是在一个文件中进行追加,但是对于parquet格式的文件则创建新的文件进行追加。/*Specifies the behavior when data or table already exists. Options include:- SaveMode.Overwrite: overwrite the existing data.- SaveMode.Append: append the data.- SaveMode.Ignore: ignore the operation (i.e. no-op).默认操作- SaveMode.ErrorIfExists: de
20、fault option, throw an exception at runtime.since 1.4.0*/def mode(saveMode: SaveMode): DataFrameWriter = this.mode = saveModethis通过模式匹配接收外部参数/*Specifies the behavior when data or table already exists. Options include:- overwrite: overwrite the existing data.- append: append the data.- ignore: ignore
21、 the operation (i.e. no-op).- error: default option, throw an exception at runtime.*since 1.4.0*/def mode(saveMode: String): DataFrameWriter = this.mode = saveMode.toLowerCase match case overwrite = SaveMode.Overwritecase append = SaveMode.Appendcase ignore = SaveMode.Ignorecase error | default = Sa
22、veMode.ErrorIfExistscase _ = throw new IllegalArgumentException(sUnknown save mode: $saveMode. + Accepted modes are overwrite, append, ignore, error.)thisDataFrameWriter.save()save 将结果保存传入的路径。/*Saves the content of the DataFrame at the specified path.since 1.4.0*/def save(path: String): Unit = this.
23、extraOptions += (path - path)save()追踪 save 方法。/*Saves the content of the DataFrame as the specified table.since 1.4.0*/def save(): Unit = ResolvedDataSource(df.sqlContext,source,partitioningColumns.map(_.toArray).getOrElse(Array.emptyString),mode,extraOptions.toMap,df)其中 source 是 SQLConf 的 defaultDa
24、taSourceNameprivate var source: String = df.sqlContext.conf.defaultDataSourceName11其中 DEFAULT_DATA_SOURCE_NAME默认参数是parquet。/ This is used to set the default data sourceval DEFAULT_DATA_SOURCE_NAME = stringConf(spark.sql.sources.default, defaultValue = Some(pache.spark.sql.parquet), doc = The default
25、 data source to use in input/output.)DataFrame.Scala中部分函数详解:1. toDF函数是将 RDD 转换成 DataFrame/*Returns the object itself.group basicsince 1.3.0*/This is declared with parentheses to prevent the Scala compiler from treatingrdd.toDF(1) as invoking this toDF and then apply on the returned DataFrame. def toDF(): DataFrame = thisshow() 方法:将结果显示出来/*Displays the DataFrame in a tabular form. For example:*yearmonth AVG(Adj Close) MAX(Adj Close)*1980120.5032180.595103*1981010.5232890.570307*1982020.4365040.475256
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 西安明德理工学院《组成原理与系统结构》2023-2024学年第一学期期末试卷
- 2024无锡江阴金融服务合同
- 2024版简单的土石方承包合同范本
- 临时安保服务定制协议:2024年标准版B版
- 二零二五年跨境电商平台合作销售合同3篇
- 个性化制作服务费及销售权合同(2024版)版
- 二零二五年度高端房地产信托借款服务合同3篇
- 2025年度企业社会责任报告编辑服务合同范本3篇
- 天津城市职业学院《铸造工艺》2023-2024学年第一学期期末试卷
- 苏州大学应用技术学院《生物工程单元操作原理》2023-2024学年第一学期期末试卷
- 【阅读提升】部编版语文五年级下册第五单元阅读要素解析 类文阅读课外阅读过关(含答案)
- 挖掘机运输方案
- 民企廉洁培训课件
- 飞书使用培训课件
- 食品生产许可证办理流程详解
- 2023年1月自考07484社会保障学试题及答案含解析
- 餐饮咨询服务合同范本
- 股权投资的基本概念与原理
- 数据交换详细设计说明书
- 最全海外常驻和出差补助管理规定
- 工程质保金返还审批单
评论
0/150
提交评论