第九章 Kafka安装配置与应用_第1页
第九章 Kafka安装配置与应用_第2页
第九章 Kafka安装配置与应用_第3页
第九章 Kafka安装配置与应用_第4页
第九章 Kafka安装配置与应用_第5页
已阅读5页,还剩32页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

Kafka安装与配置主讲:李强任务描述Kafka的安装与配置比较简单,本任务的介绍重点在于对集群中的Broker进行编号,并且如何在集群中模拟发布者和消费者进行消息的同步发送。任务分析Kafka安装后只需要增加运行的环境变量,并修改属性配置文件,在配置文件中设置每个节点的Broker编号,这里将集群中的master节点编号为0,slave1节点编号为1,slave2节点编号为2;配置文件中同时需要设置ZooKeeper的节点;在启动Hadoop、ZooKeeper和Kafka集群后,进行消息的发布和消费功能演示。9.1.1master节点安装配置Kafka组件1.解压安装文件下载好的安装包都需上传到指定目录下,本手册指定为/root/目录,使用ls命令可以查看上传的安装包(具体上传方式请参考项目1),如图9-1所示。代码9-1解压安装包cd/root/tar-zxfkafka_2.11-2.3.1.tgz-C/usr/local/src/cd/usr/local/src/ls图9-1查看安装包使用tar命令解压Kafka到/usr/local/src文件夹,并切换到安装目录下查看,可以使用ls查看解压后的效果,如代码9-1所示,效果如图9-2所示。图9-2查看解压好的安装包2.修改文件夹名称由于解压后的文件夹名称带有比较复杂的版本号,为了简化后续配置,此处修改文件夹名称。使用mv命令将解压的kafka_2.11-2.3.1目录重命名为kafka,如代码9-2所示,效果如图9-3所示。代码9-2重命名文件夹cd/usr/local/src/mvkafka_2.11-2.3.1kafkals图9-3重命名文件夹3.修改环境变量文件为了可以在任何目录下直接执行Kafka的相关命令,可以在环境变量文件中添加Kafka的环境变量。参考项目2,使用“vi/root/.bash_profile”打开环境变量文件,将如表9-1所示的配置信息添加到/root/.bash_profile文件的末尾,保存退出。表9-1环境变量文件的添加内容#setkafkaenvironmentexportKAFKA_HOME=/usr/local/src/kafka#Flume安装路径exportPATH=$PATH:$KAFKA_HOME/bin#添加系统PATH环境变量4.生效环境变量文件在master节点上运行如代码9-3所示的命令,使master节点上配置的Kafka的环境变量生效。代码9-3master节点生效环境变量source/root/.bash_profile5.配置Kafka环境Kafka启动会通过perties文件启动,里面是Kafka的Broker节点编号和ZooKeeper集群节点的配置文件。打开要修改的文件,如代码9-4所示。代码9-4修改pertiescd/usr/local/src/kafka/configviperties打开这个配置文件后,将broker.id设置为0,将zookeeper.connect设置为集群中三个节点的名称,参考内容如表9-2所示。表9-2文件的修改内容broker.id=0zookeeper.connect=master,slave1,slave29.1.2Slave节点安装Kafka组件1.修改flume-env.sh配置文件将master上配置好的Kafka文件夹内容和环境变量文件分别分发到slave1和slave2节点,分发命令,如代码9-5所示。代码9-5分发Kafka配置到Slave节点scp-r/usr/local/src/kafkaslave1:/usr/local/src/scp-r/usr/local/src/kafkaslave2:/usr/local/src/scp/root/.bash_profileslave1:/root/scp/root/.bash_profileslave2:/root/2.生效用户环境变量文件在slave1和slave2节点上运行如代码9-6所示的命令,使每个slave节点上配置的Kafka的环境变量生效。代码9-6Slave节点生效环境变量source/root/.bash_profile3.修改从节点中的属性文件slave1在架构中的Broker编号为1,参考9.1.1小节第5点,使用“vi/usr/local/src/kafka/config/perties”命令在perties文件下找到broker.id修改为“broker.id=1”。slave2在架构中的Broker编号为2,参考9.1.1小节第5点,使用“vi/usr/local/src/kafka/config/perties”命令在perties文件下找到broker.id修改为“broker.id=2”。9.1.3管理Kafka服务1.启动Hadoop和ZooKeeper集群在master节点一键启动Hadoop集群,如代码9-7所示。在各节点启动ZooKeeper集群,如代码9-8所示。代码9-7启动Hadoop集群start-all.sh代码9-8启动Zookeeper集群zkServer.shstart2.在各节点启动Kafka服务Kafka服务启动后的效果如图9-4所示,每个节点的Broker编号如上面所设置的分别为0,1,2。代码9-9启动Kafka服务kafka-server-start.sh/usr/local/src/kafka/config/perties图9-4集群节点启动后效果图2.在各节点启动Kafka服务请读者务必注意,启动的三个Kafka服务不要关闭,也不要关闭窗口;如果要关闭服务,可以按【Ctrl+C】退出。在master节点打开一个新的终端窗口,使用jps查看进程,会发现列表中多了一个Kafka的进程,效果如图9-5所示。至此Kafka的安装和配置就完成了,下面将演示消息发布和消费的过程。图9-5Kafka进程图9.1.4Kafka分布式发布订阅消息演示1.创建一个新的Topic在master节点打开一个新的终端窗口,使用kafka-topics.sh命令创建一个名称为happy的Topic,只有一个分区和两个备份,如代码9-10所示,该命令的几个参数说明如下。(1)--create:表示创建功能;(2)--zookeeper:表示ZooKeeper集群节点,此处为:“master:2181,slave1:2181,slave2:2181”,ZooKeeper的端口设置为2181,如果不同请修改一致;(3)--replication-factor:表示消息副本,此处为2;(4)--topic:表示Topic的名称,此处为“happy”;(5)--partitions:表示Topic分区数,此处为1;(6)--list:表示显示该服务器上所有可用的topic主题。创建成功后查询到的Topic主题,效果如图9-6所示。图8-5Flume启动后采集和传输的过程图代码9-10创建topic并查看kafka-topics.sh--create--zookeepermaster:2181,slave1:2181,slave2:2181--replication-factor2--topichappy--partitions1#创建完以后可以显示topickafka-topics.sh--list--zookeepermaster:2181,slave1:2181,slave2:21812.创建生产者并发送消息在任何一个节点打开一个新的终端窗口,使用中的kafka-console-producer.sh命令创建一个happy的Topic的生产者,并准备发布消息,如代码9-11所示,该命令的几个参数说明如下。(1)--broker-list:表示Kafka服务器器列表,此处为:“master:9092,slave1:9092,slave2:9092”;由于perties配置文件中Broker的端口默认为9092,所以此处为9092。(2)--topic:表示发布消息的Topic的名称,此处为“happy”。代码9-11创建生产者kafka-console-producer.sh--broker-listmaster:9092,slave1:9092,slave2:9092--topichappy2.创建生产者并发送消息创建成功后会进入发布消息的交互界面,效果如图9-7所示。可以在这里输入一些测试的消息,效果如图9-8所示。如果要停止发布消息,可以按【Ctrl+C】退出。图9-8发布的测试消息图9-7创建topic后的效果3.创建消费者者并消费消息在任何一个节点打开一个新的终端窗口,使用中的kafka-console-producer.sh命令创建一个happy的Topic的生产者,并准备发布消息,如代码9-12所示,该命令的几个参数说明如下。(1)--bootstrap-server:表示指定Kafka集群,此处为“master:9092,slave1:9092,slave2:9092”;由于perties配置文件中Broker的端口默认为9092,所以此处为9092。(2)--topic:表示消费消息的Topic的名称,此处为“happy”;(3)--from-beginning:表示消费历史为被消费的消息。代码9-12创建消费者kafka-console-consumer.sh--bootstrap-servermaster:9092,slave1:9092,slave2:9092--topichappy--from-beginning图9-9消费Topic后的效果创建成功后会进入消费消息的界面,同时会显示之前没有消费的消息,效果如图9-9所示。此时可以回到生产者窗口那边,继续输入其他的发布消息,会发现在消费者这边的窗口会实时同步显示相关的消息。请读者自行测试。如果要停止消费消息,可以按【Ctrl+C】退出。谢谢广告日志数据实时传输主讲:李强任务描述Flume作为日志收集系统,可以从不同的数据源将数据源源不断收集,但Flume不会持久地保存数据,需要使用Sink将数据存储到外部存储系统,如HDFS、HBase、Kafka等。Flume与HDFS、HBase的结合一般用于离线批处理。而Flume与Kafka的整合一般用于数据实时流处理,通过Flume的Agent代理收集日志数据,再由Flume的Sink将数据传送到Kafka集群,完成数据的生产流程,最后交给Storm、Flink、SparkStreaming等进行实时消费计算。本案例将基于项目8中的case_data_new.csv广告日志数据,使用Flume和Kafka整合实现广告日志数据的实时传输。首先用脚本模拟实时生成的日志数据并存入MySQL,再使用Flume实时监视MySQL中增加的数据,采集到Kafka集群的主题中,并启动消费者消费主题数据,最终实现数据的实时传输。任务分析广告日志数据的实时传输的实现步骤如下。(1)脚本定时抽取数据到指定目录,模拟系统日志产生,并将其存入MySQL数据表中。(2)创建Kafka主题,开启消费者以消费数据。(3)编写conf采集配置文件,将存入MySQL数据库中的数据传入Kafka主题。9.2.1创建脚本文件创建脚本文件在master节点下运行“mysql-uroot-pPassword123$”进入MySQL数据库中,创建数据库kafka,并在kafka下创建表用于存储数据,如代码9-13所示。代码9-13创建数据表createdatabasekafka;usekafka;createtablecase_data(`rank`int,dtint,cookievarchar(200),ipvarchar(200),idfavarchar(200),imeivarchar(200),androidvarchar(200),openudidvarchar(200),macvarchar(200),......省略创建脚本文件打开一个新的master节点终端窗口,运行“vi/data/datamysql.sh”创建一个脚本文件,如代码9-14所示。脚本内容为一个whiletrue循环,每分钟在case_data_new.csv随机提取100条数据,存入“/data/datamysql/mysqltmp.txt”文件中,再将文件中的数据存入MySQL数据库中。代码9-14脚本datamysql.sh#!/bin/bashwhiletruedotime=$(date"+%Y%m%d_%H%M%S")shuf-n100/opt/case_data_new.csv>/data/datamysql/mysqltmp.txtmysql-uroot-pPassword123$--local-infile-e"useKafka;loaddatalocalinfile'/data/datamysql/mysqltmp.txt'intotablecase_datafieldsterminatedby','OPTIONALLYENCLOSEDBY'\"';"sleep60done创建脚本文件脚本创建完成后,赋予脚本权限,然后将其启动,如代码9-15所示。启动成功后可能会发出如图9-10所示的警报信息,表示在命令行中直接输入密码账户信息是不安全的,该警报信息是在MySQL5.6版本后有的,并不影响运行结果,可以选择忽视。代码9-15关于脚本的命令//脚本权限chmod777/data/data2mysql.sh//脚本启动命令sh/data/datamysql.sh&//脚本中断命令psaux|grep"datamysql.sh"|grep-vgrep|cut-c9-15|xargskill-9图9-10执行脚本文件创建脚本文件在MySQL数据库中查看数据是否成功存入表中,如代码9-16所示。结果如图9-11所示。可以看出已经有数据存入表中,并正在实时更新中。代码9-16查看数据是否存入//进入数据库kafkausekafka;//查看表中有几行selectcount(*)fromcase_data;图9-11数据已存入9.2.2创建Kafka主题创建Kafka主题分别在slave1、slave2中开启ZooKeeper、Kafka集群。在slave1节点创建一个Kafka主题RealTime,设置2个副本,2个分区。创建成功后,开启消费者消费,如代码9-17所示。目前,该消费者并没有在指定主题中消费到数据。代码9-17创建Kafka主题并开启消费//创建RealTime主题kafka-topics.sh-create--topicRealTime--bootstrap-serverslave1:9092,slave2:9092--partitions2--replication-factor2//开启消费者kafka-console-consumer.sh--topicRealTime--bootstrap-serverslave1:9092,slave2:90929.2.3Flume采集日志Flume采集日志在Flume的conf目录下创建一个“datamysql.conf”文件,实现从MySQL中采集数据,并传入RealTime主题中,如代码9-18所示。代码9-18Flume脚本datamysql.confagent.sources=sql-sourceagent.sinks=k1agent.channels=chagent.sources.sql-source.type=org.keedio.flume.source.SQLSourceagent.sources.sql-source.hibernate.connection.url=jdbc:mysql://81:3306/kafka?&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true&serverTimezo

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论