《通信数据分析与实战》课件-第六章 Kafka 分布式发布订阅消息系统_第1页
《通信数据分析与实战》课件-第六章 Kafka 分布式发布订阅消息系统_第2页
《通信数据分析与实战》课件-第六章 Kafka 分布式发布订阅消息系统_第3页
《通信数据分析与实战》课件-第六章 Kafka 分布式发布订阅消息系统_第4页
《通信数据分析与实战》课件-第六章 Kafka 分布式发布订阅消息系统_第5页
已阅读5页,还剩56页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

通信数据分析与实战Kafka分布式发布订阅消息系统第六章第1节2知道消息传递模式熟悉Kafka的核心组件学习目标TARGET消息传递模式简介一个消息系统负责将数据从一个应用程序传递到另外一个应用程序中,应用程序只关注数据,无需关注数据在多个应用之间是如何传递的,分布式消息传递基于可靠的消息队列,在客户端应用和消息系统之间异步传递消息。消息系统有两种主要消息传递模式,分别是点对点消息传递模式和发布订阅消息传递模式。消息传递模式简介1.倒排索引介绍1.倒排索引介绍1.点对点消息传递模式点对点消息传递模式结构中,消息是通过一个虚拟通道进行传输的,生产者发送一条数据,消息将持久化到一个队列中,此时将有一个或多个消费者会消费队列中数据,但是一条消息只能被消费一次,且消费后的消息会从消息队列中删除,因此,即使有多个消费者同时消费数据,数据都可以被有序处理。消息传递模式简介1.倒排索引介绍2.发布订阅消息传递模式在发布订阅模式中,发布者用于发布消息,订阅者用于订阅消息,发布订阅模式可以有多种不同的订阅者,发布者发布的消息会被持久化到一个主题中,这与点对点模式不同的是,订阅者可订阅一个或多个主题,订阅者可读取该主题中所有数据,同一条数据可被多个订阅者消费,数据被消费后也不会立即删除。Kafka的概述Kafka是由Apache软件基金会开发的一个开源流处理平台,它由Scala和Java语言编写,是一个基于Zookeeper系统的分布式发布订阅消息系统,该项目的设计初衷是为实时数据提供一个统一、高通量、低等待的消息传递平台。Kafka的概述应用程序A应用程序B高度解耦高吞吐低延迟扩展性持久性容错性多语言Kafka的核心组件组件名称相关说明Topic特定类别消息流称为主题,数据存在主题中,主题被拆分成分区Partition主题的数据分割为一个或多个分区,每个分区的数据使用多个segment文件存储,分区中的数据是有序的Offset每个分区消息具有的唯一序列标识Replica副本只是一个分区的备份,它们用于防止数据丢失Producer生产者即数据发布者,该角色将消息发布到Kafka集群主题中Kafka的核心组件组件名称相关说明Consumer消费者可从Broker中读取数据,可消费多个主题数据Broker每个Kafka服务节点都为Broker,Broker接收消息后,将消息追加到segment文件中Leader负责分区的所有读写操作Follower跟随领导指令,若Leader发生故障则选一个Follower为新LeaderConsumerGroup实现一个主题消息的广播和单播的手段Kafka的核心组件生产者主题分区一分区二分区三offsetoffsetoffset服务器节点备份消费组消费者一消费者二消费者三11小结知道消息传递模式熟悉Kafka的核心组件通信数据分析与实战Kafka分布式发布订阅消息系统第六章第2节13熟悉Kafka的工作原理学习目标TARGETKafka工作原理生产者向Kafka集群中生产消息。Producer是消息的生产者,通常情况下,数据消息源可是服务器日志、业务数据及Web服务数据等,生产者采用推送的方式将数据消息发布到Kafka的主题中,主题本质就是一个目录,而主题是由PartitionLogs(分区日志)组成,每条消息都被追加到分区中。1.生产者生产消息过程Kafka工作原理1.生产者生产消息过程1Producer先读取Zookeeper的“/brokers/.../state”节点中找到该Partition的Leader。2Producer将消息发送给Leader。3Leader负责将消息写入本地分区Log文件中。Kafka工作原理1.生产者生产消息过程4Follower从Leader中读取消息,完成备份操作。5Follower写入本地Log文件后,会向Leader发送Ack,每次发送消息都会有一个确认反馈机制,以确保消息正常送达。6Leader收到所有Follower发送的Ack后,向Producer发送Ack,生产消息完成。Kafka工作原理1.生产者生产消息过程Kafka工作原理2.消费者消费消息过程Kafka采用拉取模型,由消费者记录消费状态,根据主题、Zookeeper集群地址和要消费消息的偏移量,每个消费者互相独立地按顺序读取每个分区的消息,消费者消费消息的流程图如下所示。19小结熟悉Kafka的工作原理通信数据分析与实战Kafka分布式发布订阅消息系统第六章第3节21掌握Kafka的安装和启动掌握Kafka基于命令行的使用学习目标TARGETKafka集群部署与测试1.安装Kafka1下载Kafka安装包,并解压至hadoop01节点中的/export/software目录下。2修改配置文件。在perties配置文件中指定broker编号、Kafka运行日志存放的路径、指定Zookeeper地址和本地IP。3添加环境变量。在/etc/profile文件中添加Kafka环境变量。4分发文件。将Kafka安装目录kafka_2.11-2.0.0及环境配置文件profile分发至hadoop02、hadoop03上,并修改broker.id和。Kafka集群部署与测试1.安装Kafka1下载Kafka安装包,并解压至hadoop01节点中的/export/software目录下。#1.切换到软件包存放目录cd/export/software#2.将kafka_2.11-2.0.0.tgz上传到指定位置rz#3.解压到指定目录/export/servers/tar-zxvfkafka_2.11-2.0.0.tgz/export/servers/Kafka集群部署与测试1.安装Kafka#broker的全局唯一编号,不能重复broker.id=0#kafka运行日志存放的路径log.dirs=/export/data/kafka/#broker需要使用zookeeper保存meta数据zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181#删除topicdelete.topic.enable=true#设置本机IP=hadoop01Kafka集群部署与测试1.安装Kafkavi/etc/profile

exportKAFKA_HOME=/export/servers/kafka_2.11-2.0.0exportPATH=$PATH:$KAFKA_HOME/binKafka集群部署与测试1.安装Kafkacd/export/serversscp-rkafka_2.11-2.0.0hadoop02:/export/servers/scp-rkafka_2.11-2.0.0hadoop02:/export/servers/scp/etc/profilehadoop02:/etc/profilescp/etc/profilehadoop03:/etc/profile#分别在hadoop010203上面激活profileSourceprofileKafka集群部署与测试1.安装Kafka

#hadoop02

broker.id=1=hadoop02

#hadoop03

broker.id=2=hadoop03cd/export/servers/kafka_2.11-2.0.0/confvipertiesKafka集群部署与测试启动Zookeeper服务Kafka集群部署与测试启动Kafka服务cd/export/servers/kafka_2.11-2.0.0bin/kafka-server-start.shconfig/perties基于命令行方式使用Kafka1.创建主题$kafka-topics.sh--create\--topicittopic\--partitions3\--replication-factor2\--zookeeperhadoop01:2181,hadoop02:2181,hadoop03:2181基于命令行方式使用Kafka$kafka-console-producer.sh\--broker-listhadoop01:9092,hadoop02:9092,hadoop03:9092\--topicittopic--hellokafka2.向主题中发送消息数据基于命令行方式使用Kafka3.消费主题中的消息$kafka-console-consumer.sh\--from-beginning--topicittopic\--bootstrap-serverhadoop01:9092,hadoop02:9092,hadoop03:909233小结掌握Kafka的安装和启动掌握Kafka基于命令行的使用通信数据分析与实战Kafka分布式发布订阅消息系统第六章第4节35掌握Kafka的生产者实例掌握Kafka的消费者实例学习目标TARGETKafka生产者消费者实例1.基于JavaAPI方式使用Kafka

用户不仅能够通过命令行的形式操作Kafka服务,Kafka还提供了许多编程语言的客户端工具,用户在开发独立项目时,通过调用KafkaAPI来操作Kafka集群,其核心API主要有5种,分别是ProducerAPI、ConsumerAPI、StreamsAPI、ConnectAPI、AdminClientAPI。Kafka生产者消费者实例KafkaProducer常用API方法名称相关说明abortTransaction​()终止正在进行的事物close​()关闭这个生产者flush​()调用此方法使所有缓冲的记录立即发送partitionsFor​(java.lang.Stringtopic)获取给定主题的分区元数据send​(ProducerRecord<K,V>record)异步发送记录到主题Kafka生产者消费者实例KafkaConsumer常用API方法名称相关说明subscribe​(java.util.Collection<java.lang.String>topics)订阅给定主题列表以获取动态分区close​()关闭这个消费者wakeup​()唤醒消费者metrics​()获取消费者保留的指标listTopics​()获取有关用户有权查看的所有主题的分区的元数据Kafka生产者消费者实例操作1创建一个名为“spark_chapter06”的Maven工程,在pom.xml文件中添加Kafka依赖。2创建KafkaProducerTest文件用于生产消息数据并将数据发送到Kafka集群。3通过KafkaAPI创建KafkaConsumerTest对象,用于消费Kafka集群中名为“ittopic”主题的消息数据。Kafka生产者消费者实例操作消费者消费消息效果图Kafka生产者消费者实例操作1创建一个名为“spark_chapter06”的Maven工程,在pom.xml文件中添加Kafka依赖。<dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.0.0</version></dependency></dependencies>Kafka生产者消费者实例操作2创建KafkaProducerTest文件用于生产消息数据并将数据发送到Kafka集群。publicclassKafkaProducerTest{publicstaticvoidmain(String[]args){Propertiesprops=newProperties();//1、指定Kafka集群的IP地址和端口号props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");

//2、指定等待所有副本节点的应答props.put("acks","all");

//3、指定消息发送最大尝试次数props.put("retries",0);

//4、指定一批消息处理大小props.put("batch.size",16384);

//5、指定请求延时props.put("linger.ms",1);Kafka生产者消费者实例操作2创建KafkaProducerTest文件用于生产消息数据并将数据发送到Kafka集群。

//6、指定缓存区内存大小

props.put("buffer.memory",33554432);

//7、设置key序列化

props.put("key.serializer","mon.serialization.StringSerializer");

//8、设置value序列化

props.put("value.serializer","mon.serialization.StringSerializer");

//9、生产数据

KafkaProducer<String,String>producer=newKafkaProducer<String,String>(props);for(inti=0;i<50;i++){producer.send(newProducerRecord<String,String>("ittopic",Integer.toString(i),"helloworld-"+i));}producer.close();Kafka生产者消费者实例操作3通过KafkaAPI创建KafkaConsumer对象,用于消费Kafka集群中名为“ittopic”主题的消息数据。publicclassKafkaConsumerTest{publicstaticvoidmain(String[]args){

//1.准备配置文件Propertiesprops=newProperties();

//2.指定Kafka集群主机名和端口号props.put("bootstrap.servers","hadoop01:9092,hadoop02:9092,hadoop03:9092");

//3.指定消费者组ID,在同一时刻同一消费组中只有//一个线程可以去消费一个分区消息,不同的消费组可以去消费同一个分区的消息。props.put("group.id","ittopic");

//4.自动提交偏移量props.put("mit","true");Kafka生产者消费者实例操作3通过KafkaAPI创建KafkaConsumer对象,用于消费Kafka集群中名为“ittopic”主题的消息数据。//5.自动提交时间间隔,每秒提交一次

props.put("erval.ms","1000");props.put("key.deserializer","mon.serialization.StringDeserializer");props.put("value.deserializer","mon.serialization.StringDeserializer");KafkaConsumer<String,String>kafkaConsumer=newKafkaConsumer<String,String>(props);

//6.订阅消息,这里的topic可以是多个

kafkaConsumer.subscribe(Arrays.asList("ittopic"));Kafka生产者消费者实例操作3通过KafkaAPI创建KafkaConsumer对象,用于消费Kafka集群中名为“ittopic”主题的消息数据。//7.获取消息

while(true){//每隔100ms就拉去一次

ConsumerRecords<String,String>records=kafkaConsumer.poll(100);for(ConsumerRecord<String,String>record:records){System.out.printf("topic=%s,offset=%d,key=%s,value=%s%n",record.topic(),record.offset(),record.key(),record.value());}}47小结掌握Kafka的生产者实例掌握Kafka的消费者实例通信数据分析与实战Kafka分布式发布订阅消息系统第六章第5节49熟悉KafkaStreams的作用掌握KafkaStreams的案例学习目标TARGETKafka

Streams概述

KafkaStreams是ApacheKafka开源的一个流处理框架,基于Kafka的生产者和消费者,为开发者提供流式处理能力,具有低延迟性、高扩展性、弹性、容错的特点,易于集成到现有应用程序中。它是一套处理分析Kafka中存储数据的客户端类库,处理完的数据可重新写回Kafka,也可发送给外部存储系统。Kafka

Streams概述在流式计算框架模型中,通常需要构建数据流的拓扑结构,例如生产数据源、分析数据的处理器及处理完后发送的目标节点,Kafka流处理框架同样将“输入主题自定义处理器输出主题”抽象成一个DAG拓扑图。生产者作为数据源不断生产和发送消息至Kafka的testStreams1主题中,通过自定义处理器对每条消息执行相应计算逻辑,最后将结果发送到Kafka的testStreams2主题中供消费者消费消息数据。Kafka

Streams案例1在spark_chapter06项目中,打开pom.xml文件,添加KafkaStreams依赖。2创建LogProcessor类,并继承StreamsAPI中的Processor接口,实现单词计数业务逻辑。3单词计数的业务功能开发完成后,KafkaStreams需要编写一个运行主程序的类App,用来测试LogProcessor业务程序。Kafka

Streams案例4在hadoop01节点创建testStreams1和testStreams2主题。5分别在hadoop01和hadoop02节点启动生产者服务和消费者服务。6运行App主程序类。在生产者服务节点(hadoop01)中输入测试语句,返回消费者服务节点(hadoop02)中查看执行结果。Kafka

Streams案例<dependency>​

<groupId>org.apache.kafka</groupId>​

<artifactId>kafka-streams</artifactId>​

<version>2.0.0</version></dependency>Kafka

Streams案例publicclassLogProcessorimplementsProcessor<byte[],byte[]>{

//上下文对象

privateProcessorContextprocessorContext;

@Override

publicvoidinit(ProcessorContextprocessorContext){//初始化方法

this.processorContext=processorContext;}

@Override

publicvoidprocess(byte[]key,byte[]value){

//处理一条消息

StringinputOri=newString(value);

HashMap<String,Integer>map=newHashMap<String,Integer>();

inttimes=1;

if(inputOri.contains("")){

//截取字段

String[]words=inputOri.split("");

for(Stringword:words){

if(map.containsKey(word)){

map.put(word,map.get(word)+1);

}else{

map.put(word,times);}}}

inputOri=map.toString();

processorContext.forward(key,inputOri.getBytes());}

@Override

publicvoidclose(){}Kafka

Streams案例publicclassApp{

publicstaticvoidmain(String[]args){

StringfromTopic="testStreams1";

//声明来源主题

StringtoTopic="testStreams2";

//声明目标主题

Propertiesprops=newProperties();

//设置参数

props.put(StreamsConfig.APPLICATION_ID_CONFIG,"logProcessor");

props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop01:9092,hadoop02:9092,hadoop03:9092");

//实例化StreamsConfig

StreamsConfigconfig=newStreamsConfig(props);

//构建拓扑结构

Topologytopology=newTopology();

//添加源处理节点,为源处理节点指定名称和它订阅的主题

topology.addSource("SOURCE",fromTopic)

//添加自定义处理节点,指定名称,处理器类和上一个节点的名称

.addProcessor("PROCESSOR",newProcessorSupplier(

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论