大数据分析与应用微课版-课件 项目5、6 电商产品数据实时分析、电商产品数据迁移_第1页
大数据分析与应用微课版-课件 项目5、6 电商产品数据实时分析、电商产品数据迁移_第2页
大数据分析与应用微课版-课件 项目5、6 电商产品数据实时分析、电商产品数据迁移_第3页
大数据分析与应用微课版-课件 项目5、6 电商产品数据实时分析、电商产品数据迁移_第4页
大数据分析与应用微课版-课件 项目5、6 电商产品数据实时分析、电商产品数据迁移_第5页
已阅读5页,还剩42页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

项目五

电商产品数据实时分析创建数据流目录Content1使用SparkStreaming对电商产品数据实时分析2离线数据分析处理速度慢项目导言零实时数据分析学习目标零知识目标了解流式计算的特征及优点;熟悉SparkStreaming架构;掌握DStream的创建操作;掌握DStream转化操作。技能目标具备使用不同数据源创建DStream的能力;具备使用对数据进行转换操作的能力;具备实现数据实时分析的能力。素养目标具有考虑问题时的换位思考能力;具有解决问题时的逆向思维能力;具有较强的总结能力。任务5-1:使用Hive创建电商产品数据库壹流式计算简介任务技能流计算在大数据的场景中非常常见,流计算由两个部分组成分别是流数据和流计算DStream简介DStream表示一个连续不间断的数据流,DStream是随时间推移而收到的数据序列SparkStreaming简介SparkStreaming是Spark中的分布式流处理框架,能够通过指定的时间间隔对数据进行处理,其最小时间间隔可达到500ms,SparkStreaming具有高吞吐量和容错能力强DStream创建DStream的创建是在整个SparkStreaming程序中的,因此我们在创建DStream之前需要创建SparkStreaming对象,通过SparkStreaming对象创建DStream任务5-1:使用Hive创建电商产品数据库壹1流式计算简介流数据是指在时间分布和数量上无限的一系列动态数据的集合,这种数据的价值会随着时间的推移而降低,所以就需要对其进行实时的数据分析并且做出毫秒级别的快速响应,否则就会失去数据原本存在意义数据流动速度快且持续,潜在大小也许是无穷无尽的;数据源较多,且数据格式复杂;数据量大,一旦经过处理,要么被丢弃,要么被归档存储于数据仓库;注重数据的整体价值,不过分关注个别数据;数据顺序颠倒,或者不完整,系统无法控制将要处理的新到达的数据元素的顺序。(1)流数据具有的特征任务5-1:使用Hive创建电商产品数据库壹IBMInfoSphereStreams是IBM公司开发的业内先进流式计算软件,支持开发和执行对数据流中的信息进行处理的应用程序。InfoSphereStreams

支持连续且快速地分析流数据(2)商业级流式计算框架任务5-1:使用Hive创建电商产品数据库壹Storm是一款有Twitter开源的分布式实时大数据处理框架,用于实时分析数据,持续计算(3)开源流计算框架SparkStreaming是Spark体系中的一个流式处理框架,可以实现高吞吐量的、具备容错机制的实时流数据的处理任务5-1:使用Hive创建电商产品数据库壹FacebookPuma:FaceBook公司的实时数据处理分析框架,使用puma和HBase相结合来处理实时数据(4)公司为支持自身业务开发的流计算框架任务5-1:使用Hive创建电商产品数据库壹2SparkStreaming简介SparkStreaming是Spark中的分布式流处理框架,能够通过指定的时间间隔对数据进行处理,其最小时间间隔可达到500ms,SparkStreaming具有高吞吐量和容错能力强,支持多种数据源如Kafka、Flume、Twitter、ZeroMQ等(1)SparkStreaming处理的数据流图任务5-1:使用Hive创建电商产品数据库壹(2)SparkStreaming数据处理流程SparkStreaming是在对接外部数据流后按照时间间隔将数据划分为batch(小批次数据流)供后续Sparkengine处理,所以实际上,SparkStreaming是按一个个batch(小批次)来处理数据流的任务5-1:使用Hive创建电商产品数据库壹(3)SparkStreaming应用场景在SparkStreaming中,有无状态操作、有状态操作和窗口操作三种应用场景1、状态操作针对于当前时间间隔内新生成的小批次数据,所有计算都只是基于这个批次的数据进行处理2、有状态操作有状态操作是指除需要当前生成的小批次数据外,还需要使用所有的历史数据,即相当于统计总销售量或销售额等指标3、窗口操作SparkStreaming支持窗口计算以及在一个滑动窗口上进行数据的转换操作任务5-1:使用Hive创建电商产品数据库壹3DStream简介DStream是随时间推移而收到的数据序列。在内部,每个时间区间收到的数据都作为RDD存在,而DStream就是由这些RDD所组成的序列。SDstream数据可通过外部输入源获取(1)DStream内部实现任务5-1:使用Hive创建电商产品数据库壹(2)DStream数据操作流程对DStream中数据的相关操作实际上就是对DStream内部的RDD进行的,通过设置时间,这个操作每隔一段时间就会对RDD进行操作并生成作为新的DStream中该时间段的RDD,在经过一系列操作后,可以将计算结果存储到外部文件系统中,包括本地文件、HDFS、数据库等任务5-1:使用Hive创建电商产品数据库壹DStream的创建是在整个SparkStreaming程序中的,因此在创建DStream之前需要创建SparkStreaming对象,通过SparkStreaming对象创建DStream4DStream创建frompysparkimportSparkConfsc=SparkContext(master,appName)(1)SparkContext创建参数描述masterSpark、Mesos或YARN集群URL,或者是在本地模式下运行的特殊“local[*]”字符串appName应用程序在集群UI上显示的名称任务5-1:使用Hive创建电商产品数据库壹(2)创建SparkStreaming对象frompyspark.streamingimportStreamingContextssc=StreamingContext(sc,Seconds)参数描述scSparkConf实例Seconds处理数据的时间间隔,单位为秒。StreamingContext对象创建完成后即可使用该对象中提供的不同数据源获取方法创建DStream任务5-1:使用Hive创建电商产品数据库壹使用Python编写Spark程序Step1导入SparkContext与StreamingContext两个包Step2创建StreamingContext对象,设置每间隔5秒读取一次数据Step3启动SparkStreamin程序,开始对目录进行监控并打印数据Step4创建DStream使用SparkStreaming监控电商产品数据的处理与分析项目中的评价数据任务5-2:使用Hve对商产品数据统计贰DStream转换操作任务技能DStream的转换操作主要用于对所包含的数据进行处理和统计,包括过滤、合并、计算元素数量、集合、出现频次DStream输出操作在SparkStreaming中,DStream的输出操作用于触发DStream的转换操作和窗口操作,是SparkStreaming程序必不可少的DStream窗口操作SparkStreaming还提供了窗口计算,能够在数据的滑动窗口上应用转换操作SparkStreaming启动与停止SparkStreaming程序在编写完成后,并不会被执行,启动程序后,计算结束完毕程序也不会停止,需要人为设定任务5-2:使用Hve对商产品数据统计贰1算术运算DStream的转换操作主要用于对所包含的数据进行处理和统计,包括过滤、合并、计算元素数量、集合、出现频次方法描述map(func)对DStream中包含的每一个元素应用这个指定的函数,并以DStream格式返回结果flatMap(func)与map方法类似,只不过各个输入项可以被输出为零个或多个输出项filter(func)对DStream的每一个数据应用条件函数进行判断,当符合条件则加入新的DStream中,不符合的则删除reduceByKey(func)与reduce()方法功能相同,但reduceByKey()方法针对(k,v)形式元素进行统计reduce(func)通过指定函数对DStream中的每一个元素进行聚合操作,然后返回只有一个元素的RDD构成的新的DStreamtransform(func)通过指定函数对DStream中的每一个元素执行指定操作,可以是任意的RDD操作,从而返回一个新的RDDcountByValue()计算DStream中每个RDD内的元素出现的频次并返回新的DStream[(K,Long)],其中K是RDD中元素的类型,Long是元素出现的频次count()对DStream中包含的元素数量进行计数,返回一个内部只包含一个元素的RDD的DStreaamunion(otherStream)连接两个DStream中的数据生成一个新的DStream任务5-2:使用Hve对商产品数据统计贰2DStream窗口操作SparkStreaming还提供了窗口计算,能够在数据的滑动窗口上应用转换操作方法描述window()该方法接收两个参数,第一个参数为窗口长度,单位为秒;第二个参数为滑动时间间隔,单位为秒,并且不管是窗口长度还是滑动时间间隔都必须为创建StreamingContext对象设置时间的倍数countByWindow()统计滑动窗口的DStream中元素的数量,并以DStream格式返回,接受参数及代表意义与window()方法相同reduceByWindow()对滑动窗口中DStream的元素进行聚合操作,以DStream格式返回操作结果,该方法需要传入三个参数,第一个参数即为进行聚合操作的函数,第二、三个参数与以上两种方法的参数相同countByValueAndWindow()统计当前滑动窗口中DStream元素出现的频率,并以DStream[(K,Long)]格式返回,其中K是元素的类型,Long是元素出现的频次,接收参数与reduceByWindow()相同reduceByKeyAndWindow()对滑动窗口中DStream的(k,v)类型元素进行聚合操作,该方法包含四个参数,第一个参数为指定的聚合函数;第二个参数同样是一个函数,但其用来处理流出的RDD,可不使用;第三个参数为窗口长度,单位为秒;第四个参数为滑动时间间隔,单位为秒任务5-2:使用Hve对商产品数据统计贰3DStream输出操作在SparkStreaming中,DStream的输出操作用于触发DStream的转换操作和窗口操作,是SparkStreaming程序必不可少的,可以将DStream中的数据保存到外部系统中,包括MySQL数据库、本地文件、HDFS等方法描述pprint()DStream中每批数据的前十个元素。saveAsTextFiles(prefix,[suffix])将DStream中的数据以文本的形式保存在本地文件或HDFS中,其接受两个参数,第一个参数为文件的路径及名称前缀,第二个参数为文件的格式,并且每隔规定时间都会生成一个文件名称包含时间戳的本地文件foreachRDD(func)DStream数据推送到外部系统,通常用于实现将DStream数据保存到数据库中任务5-2:使用Hve对商产品数据统计贰4SparkStreaming启动SparkStreaming程序在编写完成后,并不会被执行,DStream的相关操作只创建执行流程,设定了执行计划,需要SparkStreaming的运行操作才会启动SparkStreaming程序执行预期操作。启动程序后,计算结束完毕程序也不会停止,只能通过相关方法手动停止程序(1)SparkStreaming的启动与停止方法方法描述start()启动SparkStreaming程序进行数据的计算awaitTermination()等待程序结束,用于使程序持续运行stop()停止SparkStreaming程序任务5-2:使用Hve对商产品数据统计贰引入相关库,设置每五秒进行一次数据分析Step1对实时数据进行清洗和处理工作,并保存结果Step2统计实时数据信息并设置程序连续执行Step3将程序提交到Spark集群运行Step4实时计算出总销售量、总销售额使用SparkStreaming监控HDFS目录与您共同成长项目6

电商产品数据迁移根据电商产品数据统计结果创建数据表并查看目录Content1使用Sqoop将Hive中电商产品数据统计结果导出2项目导言零

在大数据技术兴起前,诸多企业的数据存储业务均交由关系型数据库处理,大数据技术兴起后很多企业的数据处理业务都在向大数据技术转型,有大量的数据需要进行迁移。在早期由于工具的缺乏,数据迁移非常困难,Sqoop的出现使得数据迁移问题得以解决。学习目标零知识目标了解Sqoop相关知识;熟悉Sqoop中常用连接器;熟悉Sqoop中配置数据库密码方式;掌握Sqoop数据导入与导出;具有实现数据迁移的能力。技能目标具备实现Sqoop中数据库密码配置的能力;具备掌握Sqoop命令使用的能力;具备实现数据导入与导出功能的能力;具备应用Sqoop将HDFS中数据迁移到MySQL数据库的能力。技能目标具备获取信息并利用信息能力;具有综合与系统分析能力;具备换位思考的能力。任务6-1根据电商产品数据统计结果创建数据表并查看壹1Sqoop简介1.Sqoop介绍Sqoop最早出现于2009年,是一款开源工具,作为Hadoop的一个第三方模块存在,主要用于Hadoop和关系型数据库(MySQL、Oracle等)之间数据的传递,能够做到将数据从关系型数据库导入到Hadoop的HDFS中,以及从Hadoop的文件系统中导出到关系型数据库,解决了传统数据库和Hadoop之间数据的迁移问题。任务6-1根据电商产品数据统计结果创建数据表并查看壹2.Sqoop优势Sqoop中最大的亮点就是能够通过Hadoop的MapReduce使数据在关系型数据库和HDFS之间进行移动,并充分利用MapReduce并行特点以批处理的方式加快数据传输。主要优势如下所示:通过调整任务数量控制并发度、配置数据库访问时间等操作可以高效且可控的进行资源的利用自动完成数据类型映射与转换,无需用户手动操作支持MySQL、Oracle、SQLServer、PostgreSQL等多种数据库010203任务6-1根据电商产品数据统计结果创建数据表并查看壹3.Sqoop架构(1)Sqoop1架构任务6-1根据电商产品数据统计结果创建数据表并查看壹3.Sqoop架构(2)Sqoop2架构任务6-1根据电商产品数据统计结果创建数据表并查看壹2Sqoop连接器

Sqoop支持MySQL、Oracle、PostgreSQL、SQLServer、DB2等多种连接器,以及支持MySQL和PostgreSQL数据库的快速路径连接器(专门连接器,用于批次传输数据的高吞吐量),能够实现数据在Hadoop和连接器支持的外部仓库之间进行高效的传输。DatabaseVersion-directsupportConnectstringHSQLDB1.8.0+Nojdbc:hsqldb:*//MySQL5.0+Yesjdbc:mysql:*//Oracle10.2.0+Nojdbc:oracle:*//PostgreSQL8.3+Yesjdbc:postgresql://任务6-1根据电商产品数据统计结果创建数据表并查看壹3Sqoop配置数据库密码方式03文件模式主要应用于Sqoop脚本定时执行的场景,可以通过--password-file参数读取文件中存储的密码实现数据库的访问。04别名模式是目前最安全的一种方式,解决了文件模式中使用明文保存密码的问题。01明文模式是最为简单的方式,在使用Sqoop命令时会通过--password参数将密码直接写入到命令中,但由于命令行中包含密码,会带来密码泄露的风险。02交互模式是最常用的一种密码输入模式,在使用Sqoop命令时通过-P参数的设置,访问时需要按回车键后,会显示密码输入提示,之后密码在输入时是不显示的。Sqoop支持4种输入密码的模式任务6-1根据电商产品数据统计结果创建数据表并查看壹4列出所有数据库

在Sqoop中,可以使用“list-databases”命令查看关系型数据库中全部的数据库名称并返回;在使用时,只需对通用参数进行设置即可连接数据库并返回数据库名称,语法格式如下所示。sqooplist-databases<通用参数>例如,指定账户和密码连接MySQL数据库查看全部数据库名称[root@master~]#sqooplist-databases--connectjdbc:mysql://localhost:3306/--usernameroot--password123456任务6-1根据电商产品数据统计结果创建数据表并查看壹5列出所有表在数据库查看完成后,Sqoop还提供了一个“list-tables”命令,能够查看关系型数据库中是否存在Hive元数据库中的所有表,与“list-databases”命令的使用基本相同,语法格式如下所示。sqooplist-tables<通用参数>例如,使用list-tables命令查询MySQL中所有表[root@master~]#sqooplist-tables--connectjdbc:mysql://localhost:3306/hive_metadata--usernameroot--password123456Sqoop明文方式连接MySQL数据库Step1连接MySQL数据库,进入phone数据库,创建表Step2使用list-tables命令查询phone数据库中所有表,验证表是否创建成功Step3使用Sqoop连接MySQL数据库,并对数据库进行操作任务6-1根据电商产品数据统计结果创建数据表并查看壹任务6-2使用Sqoop将Hive中电商产品数据统计结果导出贰1Sqoop数据导入与导出1.数据导入

在使用Sqoop进行数据的导入时,Sqoop会对数据表以及表中的列和数据类型进行检查,之后Sqoop代码生成器通过这些信息来创建对应表的类,用于保存从表中抽取的记录。任务6-2使用Sqoop将Hive中电商产品数据统计结果导出贰1Sqoop数据导入与导出数据导入的大致流程Sqoop通过JDBC读取导入的数据表的结构启动MapReducer作业,读取数据并切分数据,然后创建Map并将关系型数据库中的数据设置key-value形式交由Map映射数据类型为Java的数据类型,如varchar、number等被映射成String、int等,并根据数据表信息生成序列化类提交到Hadoop在载入过程中启动作业,并通过JDBC读取数据表中的内容后,使用Sqoop生成的列执行反序列化操作使用Sqoop类进行反序列化,并将记录写入到HDFS0103020405任务6-2使用Sqoop将Hive中电商产品数据统计结果导出贰1Sqoop数据导入与导出

在Sqoop中,数据导入通过Import命令实现,能够将关系型数据库中的数据导入到HDFS存储平台中,便于使用大数据技术对数据进行分析,Import命令不仅可以将数据追加到HDFS中已存在的数据集,还可以将数据导入到普通文件中。sqoopimport<通用参数><import命令参数>通用参数:指主要是针对关系型数据库链接的一些参数,如JDBC连接或数据库账户密码等设置参数描述--connect连接关系型数据库的URL--connection-manager指定要使用的连接管理类--driverHadoop根目录--hadoop-home覆写$HADOOP_HOME--help打印帮助指令--password-file设置包含验证密码的文件的路径-P从控制台读取密码--password连接数据库的密码--username连接数据库的用户名--verbose在控制台打印出详细信息任务6-2使用Sqoop将Hive中电商产品数据统计结果导出贰1Sqoop数据导入与导出

Import命令参数:主要设置将数据导入到HDFS中的位置和倒数方式以及其他导入的配置选项含义说明--append将数据追加到HDFS上一个已经存在的数据集上--as-avrodatafile将数据导入到Avro数据文件--as-sequencefile将数据导入到SequenceFile--as-textfile将数据导入到普通文本文件(默认)--columns指定要导入的字段,多个字段通过“,”逗号连接--delete-target-dir如果指定目录存在,则先删除掉--direct如果数据库存在,请使用直接连接器--fetch-size一次要从数据库读取的条目数--inline-lob-limit<n>设置内联的LOB对象的大小-m,--num-mappers使用n个map任务并行导入数据--e,--query导入的查询语句--split-by指定按照哪个列去分割数据--table导入的原表表名--target-dir导入HDFS的目标路径--warehouse-dirHDFS存放表的根路径--where导出时所使用的查询条件--z,--compress启用压缩--compression-code指定Hadoop的压缩编码类--null-string为字符串列的空值写入的字符串--null-non-string非字符串列将为空值写入的字符串任务6-2使用Sqoop将Hive中电商产品数据统计结果导出贰1Sqoop数据导入与导出2.数据导出Sqoop会根据数据库连接字符串来选择一个导出方法,对于大部分系统来说,Sqoop会采用JDBC方式,将HDFS中数据导出到关系型数据库中。任务6-2使用Sqoop将Hive中电商产品数据统计结果导出贰1Sqoop数据导入与导出2.数据导出具体过程第二步:根据获取到的元数据的信息,sqoop生成一个Java

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论