版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
广告日志数据实时传输【任务描述】Flume作为日志收集系统,可以从不同的数据源将数据源源不断收集,但Flume不会持久地保存数据,需要使用Sink将数据存储到外部存储系统,如HDFS、HBase、Kafka等。Flume与HDFS、HBase的结合一般用于离线批处理。而Flume与Kafka的整合一般用于数据实时流处理,通过Flume的Agent代理收集日志数据,再由Flume的Sink将数据传送到Kafka集群,完成数据的生产流程,最后交给Storm、Flink、SparkStreaming等进行实时消费计算。本案例将基于项目8中的case_data_new.csv广告日志数据,使用Flume和Kafka整合实现广告日志数据的实时传输。首先用脚本模拟实时生成的日志数据并存入MySQL,再使用Flume实时监视MySQL中增加的数据,采集到Kafka集群的主题中,并启动消费者消费主题数据,最终实现数据的实时传输。【任务分析】广告日志数据的实时传输的实现步骤如下。脚本定时抽取数据到指定目录,模拟日志文件产生,并将其存入MySQL表中。创建Kafka主题,开启消费者以消费数据。编写conf采集配置文件,将存入MySQL中的数据传入Kafka主题。【任务实施】创建脚本文件在master节点下运行“mysql-uroot-pPassword123$”进入MySQL数据库中,创建数据库kafka,并在kafka下创建表用于存储数据。如REF_Ref100155267\h代码91所示。代码STYLEREF1\s9SEQ代码\*ARABIC\s11创建数据表createdatabasekafka;usekafka;createtablecase_data(`rank`int,dtint,cookievarchar(200),ipvarchar(200),idfavarchar(200),imeivarchar(200),androidvarchar(200),openudidvarchar(200),macvarchar(200),timestampsint,campint,creativeidint,mobile_osint,mobile_typevarchar(200),app_key_md5varchar(200),app_name_md5varchar(200),placementidvarchar(200),useragentvarchar(200),mediaidvarchar(200),os_typevarchar(200),born_timeint);//开启MySQL的local_infile服务setgloballocal_infile=1;打开一个新的master终端,运行“vi/data/datamysql.sh”创建一个脚本文件,如REF_Ref100155277\h代码92所示。脚本内容为一个whiletrue循环,每分钟在case_data_new.csv随机提取100条数据,存入“/data/datamysql/mysqltmp.txt”文件中,再将文件中的数据存入MySQL数据库中。代码STYLEREF1\s9SEQ代码\*ARABIC\s12脚本datamysql.sh#!/bin/bashwhiletruedotime=$(date"+%Y%m%d_%H%M%S")shuf-n100/opt/case_data_new.csv>/data/datamysql/mysqltmp.txtmysql-uroot-pPassword123$--local-infile-e"useKafka;loaddatalocalinfile'/data/datamysql/mysqltmp.txt'intotablecase_datafieldsterminatedby','OPTIONALLYENCLOSEDBY'\"';"sleep60done脚本创建完成后,赋予脚本权限,然后将其启动,如REF_Ref100155288\h代码93所示。启动成功后可能会发出如REF_Ref100155569\h图91所示的警报信息,表示在命令行中直接输入密码账户信息是不安全的,该警报信息是在MySQL5.6版本后有的,并不影响运行结果,可以选择忽视。代码STYLEREF1\s9SEQ代码\*ARABIC\s13关于脚本的命令//脚本权限chmod777/data/data2mysql.sh//脚本启动命令sh/data/datamysql.sh&//脚本中断命令psaux|grep"datamysql.sh"|grep-vgrep|cut-c9-15|xargskill-9图STYLEREF1\s9SEQ图\*ARABIC\s11执行脚本文件在MySQL数据库中查看数据是否成功存入表中,如REF_Ref100155298\h代码94所示。结果如REF_Ref100155537\h图92所示。可以看出已经有数据存入表中,并正在实时更新中。代码STYLEREF1\s9SEQ代码\*ARABIC\s14查看数据是否存入//进入数据库kafkausekafka;//查看表中有几行selectcount(*)fromcase_data;图STYLEREF1\s9SEQ图\*ARABIC\s12数据已存入创建Kafka主题分别在slave1、slave2中开启ZooKeeper、Kafka集群。在slave1节点创建一个Kafka主题RealTime,设置2个副本,2个分区。创建成功后,开启消费者消费,如REF_Ref100155306\h代码95所示。代码STYLEREF1\s9SEQ代码\*ARABIC\s15创建Kafka主题并开启消费//创建RealTime主题kafka-topics.sh-create--topicRealTime--bootstrap-serverslave1:9092,slave2:9092--partitions2--replication-factor2//开启消费者kafka-console-consumer.sh--topicRealTime--bootstrap-serverslave1:9092,slave2:9092目前,该消费者并没有在指定主题中消费到数据。Flume采集日志在Flume的conf目录下创建一个“datamysql.conf”文件,实现从MySQL中采集数据,并传入RealTime主题中,如REF_Ref100155318\h代码96所示。代码STYLEREF1\s9SEQ代码\*ARABIC\s16Flume脚本datamysql.confagent.sources=sql-sourceagent.sinks=k1agent.channels=chagent.sources.sql-source.type=org.keedio.flume.source.SQLSourceagent.sources.sql-source.hibernate.connection.url=jdbc:mysql://81:3306/kafka?&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezone=GMTagent.sources.sql-source.hibernate.connection.user=rootagent.sources.sql-source.hibernate.connection.password=Password123$agent.sources.sql-source.hibernate.dialect=org.hibernate.dialect.MySQLDialectagent.sources.sql-source.hibernate.driver_class=com.mysql.cj.jdbc.Driveragent.sources.sql-source.hibernate.connection.autocommit=trueagent.sources.sql-source.table=case_dataagent.sources.sql-source.columns.to.select=*agent.sources.sql-source.run.query.delay=10000agent.sources.sql-source.status.file.path=/var/lib/=sql-source.statusagent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSinkagent.sinks.k1.topic=RealTimeagent.sinks.k1.brokerList=slave1:9092,slave2:9092agent.sinks.k1.batchsize=200agent.sinks.kafkaSink.requiredAcks=1agent.sinks.k1.serializer.class=kafka.serializer.StringEncoderagent.sinks.kafkaSink.zookeeperConnect=slave1:2181,slave2:2181agent.channels.ch.type=memoryagent.channels.ch.capacity=10000agent.channels.ch.transactionCapacity=10000agent.channels.hbaseC.keep-alive=20agent.sources.sql-source.channels=chagent.sinks.k1.channel=ch启动FlumeAgent命令开始采集MySQL中的数据,如REF_Ref100155329\h代码97所示。切换到Kafka消费者的终端,可以看到主题上已经有数据被消费者消费,如REF_Ref100155585\h图93所示。观察消费者终端,可以看到消费者每过一分钟,就会有新数据消费,因为脚本文件一直在模拟用户产生数据,而Flume在实时采集并传入到Kafka主题上。代码STYLEREF1\s9SEQ代码\*ARABIC\s17执行Flume脚本flume-ngagent-nagent-f/usr/local/src/flume/conf/datamysql.conf-c/usr/local/src/flume/conf/-Dflume.root.logger=INFO,console图STYLEREF1\s9
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 2026年四川大学华西乐城医院招聘18人备考题库带答案详解
- 2026年中国成达工程有限公司招聘备考题库及参考答案详解
- 2026年六盘水市钟山区教育局所属事业单位公开考调工作人员11人备考题库及答案详解一套
- 2026年广州民航职业技术学院民航经营管理学院招聘劳动合同制教学助理备考题库及完整答案详解1套
- 2026年广东省阳江市江城第一中学公开引进高层次(急需紧缺)人才9人备考题库及参考答案详解一套
- 2026年云南业图人工智能数据标注基地“AI人工智能训练师”招聘备考题库(第三期)及完整答案详解一套
- 2026年甘肃钢铁职业技术学院单招职业适应性考试题库及答案1套
- 2026年中煤第三建设(集团)有限责任公司二十九工程处招聘备考题库及参考答案详解
- 2026年甘肃卫生职业学院单招职业倾向性考试题库新版
- 2026年山东能源集团营销贸易有限公司招聘备考题库带答案详解
- 消化系统疾病课件
- 工程维保三方合同
- 地铁车辆检修安全培训
- 造血干细胞移植临床应用和新进展课件
- GB/T 10802-2023通用软质聚氨酯泡沫塑料
- 黑布林英语阅读初一年级16《柳林风声》译文和答案
- 杰青优青学术项目申报答辩PPT模板
- 宿舍入住申请书
- 深圳中核海得威生物科技有限公司桐城分公司碳13-尿素原料药项目环境影响报告书
- 2023年全国高考体育单招文化考试数学试卷真题及答案
- GB/T 28733-2012固体生物质燃料全水分测定方法
评论
0/150
提交评论