Hadoop大数据平台构建与应用(工作手册式)(微课版) 案例 9.2 广告日志数据实时传输_第1页
Hadoop大数据平台构建与应用(工作手册式)(微课版) 案例 9.2 广告日志数据实时传输_第2页
Hadoop大数据平台构建与应用(工作手册式)(微课版) 案例 9.2 广告日志数据实时传输_第3页
Hadoop大数据平台构建与应用(工作手册式)(微课版) 案例 9.2 广告日志数据实时传输_第4页
Hadoop大数据平台构建与应用(工作手册式)(微课版) 案例 9.2 广告日志数据实时传输_第5页
已阅读5页,还剩1页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

广告日志数据实时传输【任务描述】Flume作为日志收集系统,可以从不同的数据源将数据源源不断收集,但Flume不会持久地保存数据,需要使用Sink将数据存储到外部存储系统,如HDFS、HBase、Kafka等。Flume与HDFS、HBase的结合一般用于离线批处理。而Flume与Kafka的整合一般用于数据实时流处理,通过Flume的Agent代理收集日志数据,再由Flume的Sink将数据传送到Kafka集群,完成数据的生产流程,最后交给Storm、Flink、SparkStreaming等进行实时消费计算。本案例将基于项目8中的case_data_new.csv广告日志数据,使用Flume和Kafka整合实现广告日志数据的实时传输。首先用脚本模拟实时生成的日志数据并存入MySQL,再使用Flume实时监视MySQL中增加的数据,采集到Kafka集群的主题中,并启动消费者消费主题数据,最终实现数据的实时传输。【任务分析】广告日志数据的实时传输的实现步骤如下。脚本定时抽取数据到指定目录,模拟日志文件产生,并将其存入MySQL表中。创建Kafka主题,开启消费者以消费数据。编写conf采集配置文件,将存入MySQL中的数据传入Kafka主题。【任务实施】创建脚本文件在master节点下运行“mysql-uroot-pPassword123$”进入MySQL数据库中,创建数据库kafka,并在kafka下创建表用于存储数据。如REF_Ref100155267\h代码91所示。代码STYLEREF1\s9SEQ代码\*ARABIC\s11创建数据表createdatabasekafka;usekafka;createtablecase_data(`rank`int,dtint,cookievarchar(200),ipvarchar(200),idfavarchar(200),imeivarchar(200),androidvarchar(200),openudidvarchar(200),macvarchar(200),timestampsint,campint,creativeidint,mobile_osint,mobile_typevarchar(200),app_key_md5varchar(200),app_name_md5varchar(200),placementidvarchar(200),useragentvarchar(200),mediaidvarchar(200),os_typevarchar(200),born_timeint);//开启MySQL的local_infile服务setgloballocal_infile=1;打开一个新的master终端,运行“vi/data/datamysql.sh”创建一个脚本文件,如REF_Ref100155277\h代码92所示。脚本内容为一个whiletrue循环,每分钟在case_data_new.csv随机提取100条数据,存入“/data/datamysql/mysqltmp.txt”文件中,再将文件中的数据存入MySQL数据库中。代码STYLEREF1\s9SEQ代码\*ARABIC\s12脚本datamysql.sh#!/bin/bashwhiletruedotime=$(date"+%Y%m%d_%H%M%S")shuf-n100/opt/case_data_new.csv>/data/datamysql/mysqltmp.txtmysql-uroot-pPassword123$--local-infile-e"useKafka;loaddatalocalinfile'/data/datamysql/mysqltmp.txt'intotablecase_datafieldsterminatedby','OPTIONALLYENCLOSEDBY'\"';"sleep60done脚本创建完成后,赋予脚本权限,然后将其启动,如REF_Ref100155288\h代码93所示。启动成功后可能会发出如REF_Ref100155569\h图91所示的警报信息,表示在命令行中直接输入密码账户信息是不安全的,该警报信息是在MySQL5.6版本后有的,并不影响运行结果,可以选择忽视。代码STYLEREF1\s9SEQ代码\*ARABIC\s13关于脚本的命令//脚本权限chmod777/data/data2mysql.sh//脚本启动命令sh/data/datamysql.sh&//脚本中断命令psaux|grep"datamysql.sh"|grep-vgrep|cut-c9-15|xargskill-9图STYLEREF1\s9SEQ图\*ARABIC\s11执行脚本文件在MySQL数据库中查看数据是否成功存入表中,如REF_Ref100155298\h代码94所示。结果如REF_Ref100155537\h图92所示。可以看出已经有数据存入表中,并正在实时更新中。代码STYLEREF1\s9SEQ代码\*ARABIC\s14查看数据是否存入//进入数据库kafkausekafka;//查看表中有几行selectcount(*)fromcase_data;图STYLEREF1\s9SEQ图\*ARABIC\s12数据已存入创建Kafka主题分别在slave1、slave2中开启ZooKeeper、Kafka集群。在slave1节点创建一个Kafka主题RealTime,设置2个副本,2个分区。创建成功后,开启消费者消费,如REF_Ref100155306\h代码95所示。代码STYLEREF1\s9SEQ代码\*ARABIC\s15创建Kafka主题并开启消费//创建RealTime主题kafka-topics.sh-create--topicRealTime--bootstrap-serverslave1:9092,slave2:9092--partitions2--replication-factor2//开启消费者kafka-console-consumer.sh--topicRealTime--bootstrap-serverslave1:9092,slave2:9092目前,该消费者并没有在指定主题中消费到数据。Flume采集日志在Flume的conf目录下创建一个“datamysql.conf”文件,实现从MySQL中采集数据,并传入RealTime主题中,如REF_Ref100155318\h代码96所示。代码STYLEREF1\s9SEQ代码\*ARABIC\s16Flume脚本datamysql.confagent.sources=sql-sourceagent.sinks=k1agent.channels=chagent.sources.sql-source.type=org.keedio.flume.source.SQLSourceagent.sources.sql-source.hibernate.connection.url=jdbc:mysql://81:3306/kafka?&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMTagent.sources.sql-source.hibernate.connection.user=rootagent.sources.sql-source.hibernate.connection.password=Password123$agent.sources.sql-source.hibernate.dialect=org.hibernate.dialect.MySQLDialectagent.sources.sql-source.hibernate.driver_class=com.mysql.cj.jdbc.Driveragent.sources.sql-source.hibernate.connection.autocommit=trueagent.sources.sql-source.table=case_dataagent.sources.sql-source.columns.to.select=*agent.sources.sql-source.run.query.delay=10000agent.sources.sql-source.status.file.path=/var/lib/=sql-source.statusagent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSinkagent.sinks.k1.topic=RealTimeagent.sinks.k1.brokerList=slave1:9092,slave2:9092agent.sinks.k1.batchsize=200agent.sinks.kafkaSink.requiredAcks=1agent.sinks.k1.serializer.class=kafka.serializer.StringEncoderagent.sinks.kafkaSink.zookeeperConnect=slave1:2181,slave2:2181agent.channels.ch.type=memoryagent.channels.ch.capacity=10000agent.channels.ch.transactionCapacity=10000agent.channels.hbaseC.keep-alive=20agent.sources.sql-source.channels=chagent.sinks.k1.channel=ch启动FlumeAgent命令开始采集MySQL中的数据,如REF_Ref100155329\h代码97所示。切换到Kafka消费者的终端,可以看到主题上已经有数据被消费者消费,如REF_Ref100155585\h图93所示。观察消费者终端,可以看到消费者每过一分钟,就会有新数据消费,因为脚本文件一直在模拟用户产生数据,而Flume在实时采集并传入到Kafka主题上。代码STYLEREF1\s9SEQ代码\*ARABIC\s17执行Flume脚本flume-ngagent-nagent-f/usr/local/src/flume/conf/datamysql.conf-c/usr/local/src/flume/conf/-Dflume.root.logger=INFO,console图STYLEREF1\s9

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论