Hadoop平台搭建与应用(第2版)(微课版) 课件 项目8 Hadoop平台应用综合案例_第1页
Hadoop平台搭建与应用(第2版)(微课版) 课件 项目8 Hadoop平台应用综合案例_第2页
Hadoop平台搭建与应用(第2版)(微课版) 课件 项目8 Hadoop平台应用综合案例_第3页
Hadoop平台搭建与应用(第2版)(微课版) 课件 项目8 Hadoop平台应用综合案例_第4页
Hadoop平台搭建与应用(第2版)(微课版) 课件 项目8 Hadoop平台应用综合案例_第5页
已阅读5页,还剩41页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

任务8.1本地数据集上传到数据仓库Hive任务实施下面把test.txt中的数据导入到数据仓库Hive中。为了完成这个操作,需要先把test.txt上传到HDFS中,再在Hive中创建一个外部表,完成数据的导入。1.启动HDFSHDFS是Hadoop的核心组件,因此,要想使用HDFS,必须先安装Hadoop。这里已经安装了Hadoop,打开一个终端,执行命令“start-all.sh”,启动Hadoop服务,操作命令如下。[root@master1~]#start-all.sh执行命令“jps”,查看当前运行的进程,操作命令及结果如下。[root@master1~]#jps673ResourceManager503SecondaryNameNode935Jps298NameNode[root@slave1~]#jps256NodeManager388Jps140DataNode[root@slave2~]#jps243NodeManager375Jps127DataNode2.将本地文件上传到HDFS中将本地文件test.txt上传到HDFS中,并存储在HDFS的/bigdatacase/dataset目录中。在HDFS的根目录中创建一个新的目录bigdatacase,并在其中创建一个子目录dataset,操作命令及结果如下。[root@master1~]#hadoopdfs-mkdir-p/bigdatacase/datasetDEPRECATED:Useofthisscripttoexecutehdfscommandisdeprecated.Insteadusethehdfscommandforit.执行命令“hadoopdfs-put/opt/test.txt/bigdatacase/dataset”,将test.txt文件上传到HDFS的/bigdatacase/dataset目录中,操作命令及结果如下。[root@master1~]#hadoopdfs-put/opt/test.txt/bigdatacase/datasetDEPRECATED:Useofthisscripttoexecutehdfscommandisdeprecated.Insteadusethehdfscommandforit.

执行命令“hadoopdfs-cat/bigdatacase/dataset/test.txt|head-10”,查看HDFS中的test.txt的前10条记录,操作命令及结果如下。[root@master1~]#hadoopdfs-cat/bigdatacase/dataset/test.txt|head-10

DEPRECATED:Useofthisscripttoexecutehdfscommandisdeprecated.Insteadusethehdfscommandforit.43,2015/3/3017:38,/static/image/common/faq.gif26,2015/3/3017:38,/data/cache/style_1_widthauto.css?y7a43,2015/3/3017:38,/static/image/common/hot_1.gif43,2015/3/3017:38,/static/image/common/hot_2.gif43,2015/3/3017:38,/static/image/filetype/common.gif26,2015/3/3017:38,/source/plugin/wsh_wx/img/wsh_zk.css26,2015/3/3017:38,/data/cache/style_1_forum_index.css?y7a26,2015/3/3017:38,/source/plugin/wsh_wx/img/wx_jqr.gif43,2015/3/3017:38,/static/image/common/recommend_1.gif26,2015/3/3017:38,/static/image/common/logo.png3.在Hive中创建数据库(1)创建数据库和数据表执行命令“servicemysqlstart”,启动MySQL数据库,操作命令及结果如下。[root@master1~]#servicemysqlstartStartingMySQLSUCCESS!Hive是基于Hadoop的数据仓库,使用HiveQL语言编写的查询语句,最终都会被Hive自动解析为MapReduce任务,并由Hadoop具体执行。因此,需要先启动Hadoop服务,再启动Hive服务,可通过执行命令“./hive”来启动Hive服务,操作命令及结果如下。[root@master1bin]#./hive

Logginginitializedusingconfigurationinjar:file:/simple/hive1.2.1/lib/hive-common-1.2.1.jar!/pertieshive>启动Hive服务后,执行命令“createdatabasedblab”,在Hive中创建一个数据库dblab,操作命令及结果如下。hive>createdatabasedblab;OKTimetaken:0.592seconds创建外部表,操作命令及结果如下。hive>createexternaltabledblab.bigdata_user(ipstring,timestring,urlstring)rowformatdelimitedfieldsterminatedby','storedastextfilelocation'/bigdatacase/dataset';OKTimetaken:0.042seconds(2)查询数据在Hive命令行模式下,执行命令“showcreatetablebigdata_user”,查看表的各种属性,操作命令及结果如下。(文本位于页面下方)hive>showcreatetablebigdata_user;OKCREATEEXTERNALTABLE`bigdata_user`(`ip`string,`time`string,`url`string)ROWFORMATDELIMITEDFIELDSTERMINATEDBY','STOREDASINPUTFORMAT'org.apache.hadoop.mapred.TextInputFormat'OUTPUTFORMAT'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'LOCATION'hdfs://master1:9000/bigdatacase/dataset'TBLPROPERTIES('COLUMN_STATS_ACCURATE'='false','numFiles'='0','numRows'='-1','rawDataSize'='-1','totalSize'='0','transient_lastDdlTime'='1677831958')Timetaken:0.033seconds,Fetched:19row(s)执行命令“descbigdata_user”,查看表的简单结构,操作命令及结果如下。hive>desc

bigdata_user;OKipstringtimestringurlstringTimetaken:0.049seconds,Fetched:3row(s)执行命令“select*frombigdata_userlimit10”,查看表的前10条数据,操作命令及结果如下。hive>select*frombigdata_userlimit10;OK432015/3/3017:38/static/image/common/faq.gif262015/3/3017:38/data/cache/style_1_widthauto.css?y7a432015/3/3017:38/static/image/common/hot_1.gif432015/3/3017:38/static/image/common/hot_2.gif432015/3/3017:38/static/image/filetype/common.gif262015/3/3017:38/source/plugin/wsh_wx/img/wsh_zk.css262015/3/3017:38/data/cache/style_1_forum_index.css?y7a262015/3/3017:38/source/plugin/wsh_wx/img/wx_jqr.gif432015/3/3017:38/static/image/common/recommend_1.gif262015/3/3017:38/static/image/common/logo.pngTimetaken:0.046seconds,Fetched:10row(s)8.2

使用Hive进行简单的数据分析任务实施1.简单查询分析执行命令“selectipfrombigdata_userlimit10”,查询前10条记录的ip,操作命令及结果如下。hive>selectipfrombigdata_userlimit10;OK43264343432626264326Timetaken:0.064seconds,Fetched:10row(s)2.查询前20条记录的ip和time执行命令“selectip,timefrombigdata_userlimit20”,查询前20条记录的ip和time,操作命令及结果如下。3.使用聚合函数count()统计表中的数据执行命令“selectcount(*)frombigdata_user”,统计表中的数据,操作命令及结果如下。8.3

Hive、MySQL、HBase数据的互导任务实施1.Hive预操作创建临时表user_action,操作命令及结果如下。hive>createexternaltableuser_action(ipstring,timestring,urlstring)rowformatdelimitedfieldsterminatedby','storedastextfile;OKTimetaken:0.054seconds创建完成后,Hive会自动在HDFS中创建对应的数据文件/user/hive/warehouse/dbalb.db/user_action。执行命令“hadoopdfs-ls/user/hive/warehouse/dblab.db/”,在HDFS中查看创建的user_action表,操作命令及结果如下。[root@master1/]#hadoopdfs-ls/user/hive/warehouse/dblab.db/

DEPRECATED:Useofthisscripttoexecutehdfscommandisdeprecated.Insteadusethehdfscommandforit.Found1itemsdrwxr-xr-x-rootsupergroup02023-03-0316:32/user/hive/warehouse/dblab.db/user_action2.数据导入操作在HiveShell模式下执行命令“insertoverwritetabledblab.user_actionselect*fromdblab.bigdata_user”,将bigdata_user表中的数据导入到user_action表中,操作命令如下。hive>insertoverwritetabledblab.user_actionselect*fromdblab.bigdata_user;执行命令“select*fromuser_actionlimit10”,查询表的前10条记录,操作命令及结果如下。3.使用Sqoop将数据从Hive导入到MySQL中登录MySQL,在dblab数据库中创建与Hive对应的user_action表,并设置其编码格式为UTF-8,操作命令及结果如下。mysql>usedblab;Databasechangedmysql>createtableuser_action(->ipvarchar(50),->timevarchar(50),->urlvarchar(255))->ENGINE=InnoDBDEFAULTCHARSET=utf8;QueryOK,0rowsaffected(0.00sec)退出MySQL,进入Sqoop的bin目录,导入数据,操作命令如下。[root@master1bin]#./sqoopexport--connectjdbc:mysql://master1:3306/dblab--usernameroot--password123456--tableuser_action--export-dir/user/hive/warehouse/dblab.db/user_action--input-fields-terminated-by',';使用root用户登录MySQL,查看已经从Hive导入到MySQL中的数据,操作命令及结果如下。4.使用Sqoop将数据从MySQL导入到HBase中启动Hadoop集群和HBase服务,并查看集群节点进程,操作命令及结果如下。[root@master1bin]#start-all.sh[root@master1bin]#./zkServer.shstart[root@master1bin]#./start-hbase.shmaster1节点的进程如下。[root@master1bin]#jps1714SecondaryNameNode4437Jps3207HMaster1514NameNode1883ResourceManager3358HRegionServer3039QuorumPeerMainslave1节点的进程如下。[root@slave1bin]#jps577NodeManager786QuorumPeerMain1811Jps854HRegionServer473DataNodeslave2节点的进程如下。[root@slave2bin]#jps578NodeManager3154Jps1028QuorumPeerMain474DataNode1102HRegionServer进入HBaseShell,操作命令及结果如下。在HBase中创建user_action表,操作命令及结果如下。hbase(main):001:0>create'user_action',{NAME=>'f1',VERSION=>5}0row(s)in1.4250seconds新建一个终端,导入数据,操作命令如下。[root@master1bin]#./sqoopimport--connectjdbc:mysql://localhost:3306/dblab?zeroDateTimeBehavior=ROUND--usernameroot--password123456--tableuser_action--hbase-tableuser_action--column-familyf1--hbase-row-keyip-m1再次切换到HBaseShell运行的终端窗口,执行命令“scan'user_action'”,查询插入的数据,如图所示。5.利用HBase-thrift库将数据导入到HBase中首先,使用“pip”命令安装最新版的HBase-thrift库,操作命令如下。[root@master1bin]#pipinstallhbase-thrift其次,在hbase/bin目录下启动thrift相关命令,开启9095端口,操作命令如下。[root@master1bin]#hbase-daemon.shstartthrift--infoport9095-p9090查看9095端口,操作命令及结果如下所示。[root@master1~]#netstat-anp|grep9095tcp00:9095:*LISTEN1802/java执行“jps”命令,查看进程运行情况,操作命令及结果如下。[root@master1~]#jps501SecondaryNameNode1125HMaster280NameNode1802ThriftServer1292HRegionServer942QuorumPeerMain1886Jps671ResourceManager在HBase中创建ip_info表,表中包含两个列族分别为:ip、url,查看创建的表,操作命令及结果如下。hbase(main):004:0>create‘ip_info’,’ip’,’url’

=>["ip_info","user_action"]hbase(main):005:0>desc'ip_info'Tableip_infoisENABLEDip_infoCOLUMNFAMILIESDESCRIPTION{NAME=>'ip',BLOOMFILTER=>'ROW',VERSIONS=>'1',IN_MEMORY=>'false',KEEP_DELETED_CELLS=>'FALSE',DATA_BLOCK_ENCODING=>'NONE',TTL=>'FOREVER',COMPRESSION=>'NONE',MIN_VERSIONS=>'0',BLOCKCACHE=>'true',BLOCKSIZE=>'65536',REPLICATION_SCOPE=>'0'}{NAME=>'url',BLOOMFILTER=>'ROW',VERSIONS=>'1',IN_MEMORY=>'false',KEEP_DELETED_CELLS=>'FALSE',DATA_BLOCK_ENCODING=>'NONE',TTL=>'FOREVER',COMPRESSION=>'NONE',MIN_VERSIONS=>'0',BLOCKCACHE=>'true',BLOCKSIZE=>'65536',REPLICATION_SCOPE=>'0'}2row(s)in0.0820seconds

使用Python编程将本地数据导入到HBase中,程序代码如下。切换到HBaseShell运行窗口,查询ip_info中插入的3条数据,如图所示。任务8.4

流数据处理的简单应用任务实施在进行操作前,需确保Hadoop、Zookeeper和Kafka集群已启动。1.Telnet工具的安装与使用(1)通过yum命令安装Telnet和Netcat工具,操作命令如下。[root@master1~]#yuminstall-ytelnet[root@master1~]#yuminstall-ync(2)Netcat工具的使用测试,开启一个本地7777的TCP协议端口,等待客户端发起连接,操作命令及结果如下。[root@master1~]#nc-lk7777新打开一个终端窗口,执行“telnet”命令,进入telnet客户端命令模式,操作命令及结果如下。[root@master1~]#telnet7777Trying...Connectedto.Escapecharacteris'^]'.在telnet客户端命令模式下输入如下信息,向目标服务器发送“test”和“txt”信息,若要退出命令发送模式,直接按'^]'即可,然后执行quit退出telnet客户端,输入信息如下。testtxt在“nc”监听窗口监听到打印数据代表测试成功,操作结果如下。[root@master1~]#nc-lk7777

testTxt如果Netcat顺利监听到数据,即可终止上述两个命令完成测试。2.Kafka主题的创建与查看(1)创建wordcount主题,分区数设为4,副本数为1,操作命令及结果如下。[root@master1bin]#./kafka-topics.sh--create--zookeepermaster1:2181,slave1:2181,slave2:2181--partitions4--replication-factor1--topicwordcountCreatedtopic"wordcount"(2)使用"--list"命令查看已经创建好的主题列表,操作命令及结果如下。[root@master1bin]#./kafka-topics.sh--list--zookeepermaster1:2181,slave1:2181,slave2:2181__consumer_offsetstestwordcount(3)通过“--describe--topicwordcount”,可以查看wordcount主题的具体信息,操作命令及结果如下。[root@master1bin]#./kafka-topics.sh--describe--zookeepermaster1:2181,slave1:2181,slave2:2181--topicwordcountTopic:wordcount

PartitionCount:4ReplicationFactor:1Configs:Topic:wordcountPartition:0Leader:1Replicas:1Isr:1Topic:wordcountPartition:1Leader:2Replicas:2Isr:2Topic:wordcountPartition:2Leader:0Replicas:0Isr:0Topic:wordcountPartition:3Leader:1Replicas:1Isr:13.Flume配置与启动(1)切换到Flume的conf目录下,操作命令如下。[root@master1~]#cd/usr/local/flume/conf/(2)新建并编辑一个配置文件nc_wordcount.conf,操作命令及配置文件如下。[root@master1conf]#vinc_wordcount.confa1.sources=s1a1.channels=c1a1.sources.s1.type=netcata1.sources.s1.bind=localhosta1.sources.s1.port=7777a1.channels.c1.type=org.apache.flume.channel.kafka.KafkaChannela1.channels.c1.kafka.bootstrap.servers=master1:9092,slave1:9092,slave2:9092a1.channels.c1.kafka.topic=wordcounta1.channels.c1.parseAsFlumeEvent=false

a1.sources.s1.channels=c1(3)启动Flume(操作窗口请不要关掉),并指定配置文件为nc_wordcount.conf,操作命令及部分结果如下。[root@master1flume]#flume-ngagent-na1-cconf--f./conf/nc_wordcount.conf-Dflume.root.logger=info,console.......2023-03-2714:13:58,868(lifecycleSupervisor-1-0)[INFO-org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)]Componenttype:CHANNEL,name:c1started2023-03-2714:13:58,869(conf-file-poller-0)[INFO-org.apache.flume.node.Application.startAllComponents(Application.java:182)]StartingSources12023-03-2714:13:58,872(lifecycleSupervisor-1-1)[INFO-org.apache.flume.source.NetcatSource.start(NetcatSource.java:155)]Sourcestarting2023-03-2714:13:58,885(lifecycleSupervisor-1-1)[INFO-org.apache.flume.source.NetcatSource.start(NetcatSource.java:169)]CreatedserverSocket:sun.nio.ch.ServerSocketChannelImpl[/:7777]4.数据流写入Kafka(1)新开一个终端窗口,通过“telnet7777”连接对应端口,并向端口写入一些单词数据,操作命令及结果如下。[root@master1~]#telnet7777Trying...Connectedto.Escapecharacteris'^]'.helloworldOKhelloscalaOKhelloworldOKhelloworldOK(2)再次打开一个终端窗口,创建Kafka消费者用于消费“wordcount”主题,操作命令及结果如下。[root@master1bin]#./kafka-console-consumer.sh--bootstrap-servermaster1:9092,slave2:9092,slave2:9092--topicwordcount--from-beginninghelloworldhelloscalahelloworldhelloworld5.Flink编程(1)在IDEA中创建一个maven项目,命名为FlinkWordCount。需要在IDEA中安装Scala插件,并勾选起用。(2)在项目的src/main下新建一个Directory,命名为scala,选中并右击,选择”MarkDirectoryas“,在二级菜单下选择SourcesRoot。(3)在项目名上,右击选择OpenModuleSettings,在弹出的页面中选择Modules,单击+号,选择Scala,若本地没有scala,则单击Create...,再单击Download...,选择scala版本2.11.0,单击OK。(4)编辑pom.xml文件,编辑文件内容如下。

<?xmlversion="1.0"encoding="UTF-8"?><projectxmlns="/POM/4.0.0"

xmlns:xsi="/2001/XMLSchema-instance"

xsi:schemaLocation="/POM/4.0.0/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>org.example</groupId><artifactId>FlinkWordCount</artifactId><version>1.0-SNAPSHOT</version><properties><piler.source>8</piler.source><piler.target>8</piler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-scala_2.11</artifactId><version>1.10.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.10.0</version></dependency></dependencies></project>(5)在scala文件夹下,新建scala文件,选择Object型,命名为WordCount,编写代码如下。importorg.apache.flink.api.common.serialization.SimpleStringSchemaimportorg.apache.flink.streaming.api.TimeCharacteristicimportorg.apache.flink.streaming.api.scala._importorg.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,

FlinkKafkaProducer}importorg.apache.kafka.clients.consumer.ConsumerConfigimportjava.util.PropertiesobjectWordCount{defmain(args:Array[String]):Unit={//创建流处理环境valenv=StreamExecutionEnvironment.getExecutionEnvironment

env.setParallelism(1)

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)//设置kafka连接属性配置valproperties=newProperties()properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master1:9092,slave1:9092,slave2:9092")//kafka集群properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest")//从主题的第一条消息开始读取//键值反序列化配置properties.setProperty

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论