版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
基于SparkSQL实现广告流量检测数据预处理知识准备教学目标知识目标掌握DataFrame行列的增、删操作方法。掌握用户自定义函数的创建与使用方法。掌握多种DataFrame表联合的方法。掌握DataFrame保存数据的多种方式。教学目标技能目标能够处理DataFrame中的缺失值。能够实现DataFrame多个表联合。能够创建和使用用户自定义函数。能够按照不同需求采用不同方式保存DataFrame数据。素质目标具有将问题关联起来的系统思维,通过学习DataFrame多表联合,能够掌握DataFrame不同联合方式的原理。具备强大的自我学习能力,通过学习用户自定义函数的创建和使用,可以提高代码的重用性和可读性。具备知行合一的实践精神,通过学习DataFrame的输出操作,能够输出广告流量检测数据的关键特征数据。思维导图项目背景数据预处理是整个数据分析过程中很重要的一个步骤。真实项目中的原始数据很多是不完整、不一致的脏数据,无法直接用于进行数据挖掘,或对这类数据的分析结果质量一般。在广告流量检测案例中广告流量检测数据的特点是数据量包含上万个IP,而且每个IP的访问数据多达数万条,存在缺失值、与分析主题无关的属性或未直接反映虚假流量的属性等。针对广告流量检测数据的特点,本项目主要对广告流量检测数据的存在缺失字段进行删除处理,对处理后的数据构建关键特征,并将关键特征合并后保存数据,实现广告流量检测违规识别项目中的数据预处理。项目目标根据项目4对数据的探索结果,通过DataFrame删除操作、自定义函数构建特征及多表联合等操作对广告流量检测数据进行预处理。目标分析使用DataFrame行列表删除操作,对广告流量检测数据进行缺失字段删除操作。使用用户自定义函数和DataFrame表联合操作,构建广告流量检测数据关键特征,生成4份特征数据集,然后合并特征数据集。使用DataFrame行列表输出操作,将广告流量检测模型数据保存到Hive表中。掌握DataFrame行列表增、删操作创建与使用用户自定义函数掌握DataFrame表联合操作掌握DataFrame行列表输出操作掌握DataFrame行列表增、删操作Spark可以通过对DataFrame进行添加或删除列操作得到新的DataFrame。新增数据列withColumn()方法通过添加或替换与现有列有相同的名字的列,返回一个新的DataFrame。当在DataFrame中新增一列时,该列可来源于本身DataFrame对象,不可来自其他非己DataFrame对象。withColumn(colName:String,col:Column)方法根据指定字段名(colName)往DataFrame中新增一列,若该字段已存在,则会覆盖当前列。新增数据列例如,读取users.json文件创建DataFrame对象df,该Json数据源其实是一张典型的结构化学生表,包括了姓名、年龄、性别、学院等常见学生信息,其字段说明如数据格式如下表。字段字段说明name姓名age年龄sex性别institute学院新增数据列现向DataFrame对象user添加自增id列,其中monotonically_increasing_id()方法可生成单调递增的整数的数据列,添加结果如右图。删除数据列drop()方法是删除指定数据列,保留其他数据列。drop()方法有两种方式使用。第一种方式是drop(colName:String),其传入的参数是描述列字段名,例如,删除DataFrame对象df中的id列,如代码52所示,其返回结果如右图。删除数据列第二种方式是drop(col:Column):DataFrame,传入的参数是Column类型的列,例如,删除DataFrame对象user中的id列,其返回结果如右图。上图和右图返回的结果一致,drop()方法两种方式不同在于,前者输入参数是描述列字段的String类型列,而后者传入的是Column类型的列,两者都是返回一个新的DataFrame对象。掌握DataFrame行列表增、删操作创建与使用用户自定义函数掌握DataFrame表联合操作掌握DataFrame行列表输出操作创建与使用用户自定义函数当系统的内置函数不足以执行所需任务时,用户可以定义自己的函数,即用户定义函数(user-definedfunctions,UDFs)。要在SparkSQL中使用UDFs,用户必须首先定义函数,然后向Spark注册函数,最后调用注册的函数。用户定义函数可以作用于单行,也可以同时作用于多行。SparkSQL还支持将现有的Hive中的自定义函数(UDFs)、聚合函数(UDAFs)和表值函数(UDTFs)集成至SparkSQL中使用。UDFs使用介绍UDFs的使用步骤如下。编写函数。通过Scala语言或Python语言,编写一个函数,用于实现用户想要的自定义逻辑。注册函数。通过将函数名及其签名(即传入参数的类型和返回值的类型)传递给Spark的udf()函数或register()函数来注册该函数。使用函数。一旦函数被注册为UDF,用户可以在Spark的DataFrame代码或SQL查询中使用该函数。在DataFrameAPI中,用户可以使用注册的函数作为列转换函数。UDFs使用介绍在SQL查询中使用UDFs时,注册函数有两种方式。第一种注册函数使用spark.udf.register方法进行UDFs注册,注册后的函数既可以在SQL中使用,也可以在DataFrame的selectExpr表达式中使用。第二种使用org.apache.spark.sql.functions.udf方法进行UDFs注册,注册后的函数只能在DataFrame中使用,不能用在Spark-SQL中,也不能在DataFrame的selectExpr表达式中使用。使用UDFs以使用org.apache.spark.sql.functions.udf方法注册UDFs为例,使用UDFs的流程如下图。使用UDFs成绩等级评定实现操作如下。创建DataFrame对象。创建DataFrame对象studentDF,查看DataFrame对象studentDF结果如下图。使用UDFs编写函数。自定义函数,将成绩转换为考察等级。注册并使用UDFs。将成绩转为字母等级,转换结果如下图。掌握DataFrame行列表增、删操作创建与使用用户自定义函数掌握DataFrame表联合操作掌握DataFrame行列表输出操作掌握DataFrame表联合操作在SQL语言中用得很多的就是表联合操作,DataFrame中同样也提供了表联合的功能。在DataFrame中提供了5种重载的join()方法,如下表。序号方法说明1defjoin(right:Dataset[_],usingColumn:String):DataFrame使用一个给定列与另外一个DataFrame进行内连接2defjoin(right:Dataset[_],usingColumns:Seq[String]):DataFrame使用两个或多个给定的列与另外一个DataFrame进行内连接3defjoin(right:Dataset[_],usingColumns:Seq[String],joinType:String):DataFrame使用给定的列与另一个DataFrame按照指定的连接类型进行连接4defjoin(right:Dataset[_],joinExprs:Column):DataFrame使用给定的连接表达式与另一个DataFrame进行内部连接5defjoin(right:Dataset[_],joinExprs:Column,joinType:String):DataFrame使用给定的连接表达式与另一个DataFrame按照指定的连接类型进行连接掌握DataFrame表联合操作使用表联合操作时,可指定的连接类型有inner、outer、left_outer、right_outer、semijoin,连接类型说明如下表。类型描述inner内连接,返回两个表中满足连接条件的匹配行。只有在两个表中都存在匹配的行时,才会返回结果outer外连接,返回两个表中满足连接条件的匹配行以及不满足连接条件的行。若在一个表中找不到匹配的行,则以NULL填充结果left_outer左外连接,返回左表中的所有行以及与右表满足连接条件的匹配行。若在右表中找不到匹配的行,则以NULL填充结果right_outer右外连接,返回右表中的所有行以及与左表满足连接条件的匹配行。若在左表中找不到匹配的行,则以NULL填充结果semijoin半连接,返回左表中满足连接条件的行,但只返回右表中的列。可用于过滤左表中的行,而无需返回完整的连接结果。通常用于优化查询性能join()五大重载方法说明观察上上表的5种join()方法,发现其主要区别在于输入参数的个数与类型不同。其中,第1种、第2种、第4种的join()方法皆为内连接(innerjoin),原因是其join()方法并没有join类型的joinType的参数输入,因此是默认的内连接。而第3种、第5种方法皆有joinType:String该参数,因此可从内连接、左外连接、右外连接等选择任何一种连接类型进行表联合操作。join()五大重载方法说明观察上上表的第1种、第2种join()方法,这两者主要区别在于第2个输入参数分别为usingColumn:String、usingColumns:Seq[String],前者是表示一个数据列的字符串(String),后者是可以表示多个数据列的序列(Seq),即当在两个DataFrame对象进行连接操作时,不仅可以基于一个数据列,也可以用多个数据列进行匹配连接。观察上上表的第4种、第5种join()方法,可看到出第2个输入参数不再是象征着数据列的usingColumn:String、usingColumns:Seq[String],而是joinExprs:Column,其表示两个参与join运算的连接数据列的表述(expression)。根据特定字段进行表联合操作上上表的第1种join()方法需要两个DataFrame中有相同的一个列名。以info.json文件为例,该json文件记录着学生个人信息,数据字段说明如下表。字段字段说明name姓名height身高(单位:厘米)weight体重(单位:千克)phone手机号码根据特定字段进行表联合操作读取info.json文件创建DataFrame对象,查看创建结果如右图。根据特定字段进行表联合操作对DataFrame对象users与info进行单字段内连接,运行结果如下图。其中,name字段只显示一次。指定类型进行表联合操作在根据多个字段进行表联合的情况下,可以选上上表的第3种join()方法,指定表联合的类型为右连接,合并结果如右图。使用Column类型进行表联合操作如果不采用上上表的第1种、第2种、第3种join()方法,即不采用通过直接传入列名或多个列名组成的序列的指定连接条件的方式,也可以使用上上表的第4种join()方法,通过直接指定两个DataFrame连接Column的形式,,其联合结果如下图。掌握DataFrame行列表增、删操作创建与使用用户自定义函数掌握DataFrame表联合操作掌握DataFrame行列表输出操作掌握DataFrame行列表输出操作DataFrame提供了很多输出操作的方法,其中使用save()方法可以将DataFrame数据保存成文件,也可以使用saveAsTable()方法将DataFrame数据保存成持久化的表。saveAsTable()方法会将DataFrame数据保存为表,并在Hive的元数据库中创建一个指针指向该表的位置,持久化的表会一直保留,即使Spark程序重启也没有影响,只要连接至同一个元数据服务即可读取表数据。读取持久化表时,只需要用表名作为参数,调用spark.table()方法即可加载表数据并创建DataFrame。默认情况下,saveAsTable()方法会创建一个内部表,表数据的位置是由元数据服务控制的。如果删除表,那么表数据也会同步删除。保存为文件将DataFrame数据保存为文件,实现步骤如下。首先创建一个Map对象,用于存储一些save()方法需要用到的一些数据,指定文件的头信息及文件的保存路径。从user数据中选择出name、sex和age这3列字段的数据。调用save()方法将步骤(2)中的DataFrame数据保存至copyOfUser.json文件夹中。保存为文件format()方法用于指定输出文件格式的方法,接受一个字符串参数,表示要使用的输出文件格式;mode()方法用于指定数据保存的模式,可以接收的参数有Overwrite、Append、Ignore和ErrorIfExists,参数说明如下表;而options()方法用于设置一些额外的选项,如压缩级别、编码方式等。参数参数说明Overwrite表示覆盖目录中已存在的数据Append表示在目标目录下追加数据Ignore表示如果目录下已有文件,那么什么都不执行ErrorIfExists表示如果目标目录下已存在文件,那么抛出相应的异常保存为文件在HDFS的/user/root/SparkSQL/目录下查看保存结果,如下图。保存为持久化的表将DataFrame数据保存为持久化的表,实现步骤如下。启动Hive的元数据服务。使用saveAsTable()方法将DataFrame对象copyOfUser保存到Hive中copyUser表,结果如下图。基于SparkSQL实现广告流量检测数据预处理项目实施使用drop语句删除数据自定义函数构建关键特征保存DataFrame数据至Hive表使用drop语句删除数据在项目4的任务二中,发现存在数据缺失的现象。但由于所采集到的数据为字符型数据,无法对缺失值进行插补.为了减小缺失数据对模型产生的影响,将缺失率过高的mac、creativeid、mobile_os、mobile_type、app_key_md5、app_name_md5、os_type等属性进行删除,而对于idfa、imei、android、openudid这4个数据含义相似的数据字段,由于后续构建特征时不确定是否需要使用到,所以先不进行处理。使用drop语句删除数据处理后的结果将保存在Hive表case_data_sample_new中,保存成功后,即可在Hive的命令行窗口中,使用select语句查询ad_traffic.case_data_sample_new的前3行,如下图。使用drop语句删除数据自定义函数构建关键特征保存DataFrame数据至Hive表自定义函数构建关键特征通过数据挖掘得到的数据分析结论,具有充分的价值,能进一步推动构建精细化管理模式,提升管理水平的精准性、科学性、有效性。因此,要充分利用数据分析结论。根据探索分析结果,不同作弊行为产生的虚假流量的不同数据特征,分别构建N、N1、N2、N3关键特征,关键特征构建说明如下表,其中,5个小时是根据广告点击周期的频率进行划分得出。关键特征构建方法说明N统计在5个小时内,原始数据集中,IP与Cookie两个属性相同的记录出现次数IP和Cookie不变的情况下,出现的记录次数特征:NN1统计在5个小时内,原始数据集中,同一个IP产生的Cookie记录条数IP不变,对应Cookie出现的记录次数特征:N1N2统计在5个小时内,原始数据集中,IP前两段相同的记录的出现次数IP前两段相同的次数特征:N2N3统计在5个小时内,原始数据集中,IP前三段相同的记录的出现次数IP前三段相同的次数特征:N3划分时间区间以规定的间隔对自定义的区间(5小时,即18000秒)进行等分切割成不同的小区间,运行结果如下图,存在34个时间分割点。构建关键特征并保存至Hive表中得到时间划分的区间列表后,以5小时的区间对数据进行特征构建,构建特征N、N1、N2、N3,然后在得到4个特征数据集后,将这些数据集以ranks字段进行合并得到含ranks和4个特征的完整特征数据集,将此数据集以追加(Append)的方式写入到Hive表中。需要注意的是,代码中是对每个区间内特征进行构建,又由于7天的广告流量检测数据的数据量非常大,处理会耗费大量的时间,且对硬件要求极大,所以需要将3台虚拟机的内存进行调大至2GB。若计算机硬件较差,或不想等待时间过长,可不对7天的广告流量检测数据进行处理,仅取出一部分时间区间进行处理即可,如将代码中的循环变量“times.length-1”设置为“4”,取出前25个小时。构建关键特征并保存至Hive表中执行代码后,在Hive的ad_traffic数据库中查询case_data_sample_model_N表的前10行,并查看表中的字段名称及类型结果,如下图。使用drop语句删除数据自定义函数构建关键特征保存DataFrame数据至Hive表保存DataFrame数据至Hive表从上图中可以看出,case_data_sample_model_N表只存在5个数据字段,不包含label字段,label字段存在于完整数据集中,因此需要将case_data_sample_model_N表和case_data_sample_new表进行合并连接,构建存在rank、dt、N、N1、N2、N3和label字段的模型数据。可使用“data_model.show(5)”命令查看模型数据的前5行,如下图。保存DataFrame数据至Hive表将模型数据保存在Hive表ad_traffic.case_data_sample_model中。可使用“data_model.describe().show(false)”命令,统计各个特征的数据分布,如下图。小结本项目首先介绍了DataFrame行列表增、删操作,通过DataFrame对象新增或删除得到新的DataFrame;接着介绍了创建与使用用户自定义函数,通过用户自定义函数可以根据特定的需求进行编写和调用;再接着介绍了DataFrame多种联合操作,并分别举例阐述联合的区别;最后介绍DataFrame行列表输出操作相关知识。对广告流量检测数据进行数据预处理实施操作,通过对缺失值删除处理后构建关键特征,将构建出来的特征合并输出保存到Hive表中。通过本项目的操作,为项目6模型的构建提供数据特征服务。基于SparkSQL实现广告流量检测数据预处理技能拓展+测试题技能拓展Datasets是一个特定域的强类型的不可变数据集,每个Datasets都有一个非类型化视图DataFrame(DataFameDataSet[Row]的一种表示形式)。DataFrame可以通过调用as(Encoder)方法转换Datasets,而Datasets则可以通过调用toDF()方法转换成DataFrame,两者之间可以互相灵活转换。操作Datasets可以像操作RDD一样使用各种转换(Transformation)操作并行操作,转换算子采用“惰性”执行方式。技能拓展当调用Action算子时才会触发真正的计算执行创建Datasets需要显式提供Encoder将对象序列化为二进制形式进行存储,而不是使用Java序列化或Kryo序列化方式Datasets使用专门的编码器序列化对象在网络间传输处编码器动态生成代码,可以在编译时检查类型,不需要将对象反序列化就可以进行过滤、排序等操作。避免了缓存(Cache)过程中频繁的序列化和反序列化,有效减少了内存的使用和Java对象频繁垃圾回收(GarbageCollection,GC)的开销。技能拓展Datasets创建的方式有如下两种方式。通过caseclass创建Datasets,创建结果如下图。技能拓展DataFrame调用as[Encoder]方法数创建Datasets,创建结果如下图。知识测试(1)下列操作中,DataFrame删除字段列操作的是()A.drop()B.withColumn()C.select()D.join()(2)DataFrame中drop()方法后返回值的类型是()。A.ArrayB.RowC.DataFrameD.Column知识测试(3)下列操作中,DataFrame新增字段列操作的是()A.drop()B.withColumn()C.select()D.join()(4)【多选题】Spark支持将现有的Hive中的用户自定义函数集成至SparkSQL中使用,其中现有的Hive中的用户自定义函数类别包括()A.UDFsB.UDAFsC.UDTFsD.FDAFs知识测试(5)【多选题】下列表述属于内连接的代码是()A.df.join(df,Seq("city","state"),"inner").showB.df.join(df,Seq("city","state")).showC.df.join(df,Seq("city","state"),"left").showD.df.join(df,Seq("city","state"),"right").show(6)【多选题】下列表述说法正确的是()。A.format()方法指定输出文件格式的方法,如format("json")输出JSON文件格式。B.mode()方法用于指定数据保存的模式,可以接收的参数有Overwrite、Append、Ignore和ErrorIfExists。C.mode("Overwrite")表示在目标目录下追加数据。D.options()方法用于设置一些额外的选项,如压缩级别、编码方式等。知识测试(7)下列选项中,能够正确的将的DataFrame保存到people.json文件中的语句是()。A.df.write.json("people.json")B.df.json("people.json")C.df.write.format("csv").save("people.json")D.df.write.csv("people.json")(8)下列选项中,可将DataFrame输出至Hive表的方法是()。A.saveAsStreamingFiles()B.saveAsTable()C.saveAsTextFiles()D.saveAsObjectFiles()知识测试(9
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 无线安装合同范本
- 内蒙古乌兰察布市(2024年-2025年小学五年级语文)统编版课后作业((上下)学期)试卷及答案
- 工业冷却设备拆除与安装方案
- 引水隧洞施工施工组织设计方案
- 农村地区太阳能路灯应用技术方案
- 高性能计算机房打眼施工方案
- 高钙石市场营销合同框架
- 邢台学院《文学片段表演》2021-2022学年第一学期期末试卷
- 邢台学院《市场调查实务》2023-2024学年第一学期期末试卷
- 初创公司股权激励设计方案
- 苏科版八年级数学上册讲练专题复习实数章末重难点题型(原卷版+解析)
- CJT 437-2013 垃圾填埋场用土工滤网
- 主观验光概述-综合验光仪结构(验光技术课件)
- 专题一第3课三、《SmartArt图形工具的使用》教学设计 2023-2024学年青岛版(2018)初中信息技术七年级下册
- 海南经贸职业技术学院人才培养工作评估分项自评报告
- 物联网技术应用专业-物联网技术课程标准
- 小学道德与法治行动研究报告
- 2024年工业和信息化部应急通信保障中心招聘公开引进高层次人才和急需紧缺人才笔试参考题库(共500题)答案详解版
- 工程部项目培训
- 迷你临床演练评量MiniCEX于英
- 2024版《隐患排查标准手册》(附检查依据)
评论
0/150
提交评论