消息队列:Kafka:Kafka架构与原理_第1页
消息队列:Kafka:Kafka架构与原理_第2页
消息队列:Kafka:Kafka架构与原理_第3页
消息队列:Kafka:Kafka架构与原理_第4页
消息队列:Kafka:Kafka架构与原理_第5页
已阅读5页,还剩13页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

消息队列:Kafka:Kafka架构与原理1消息队列简介1.1消息队列的概念消息队列是一种应用程序间的通信方法,它允许消息的发送者和接收者不需要同时存在。消息队列可以存储消息,直到接收者能够处理它们。这种通信模式在分布式系统中非常常见,可以提高系统的解耦性、可扩展性和可靠性。1.1.1为什么使用消息队列解耦:消息队列可以将系统中的各个组件解耦,使得一个组件的更改不会直接影响到其他组件。异步处理:消息队列允许系统以异步方式处理消息,这样可以提高系统的响应速度和处理能力。流量削峰:在高流量的情况下,消息队列可以作为缓冲,避免系统过载。冗余存储:消息队列可以存储消息,即使接收者暂时不可用,消息也不会丢失。最终一致性:通过消息队列,可以确保在分布式系统中实现最终一致性。1.2消息队列的类型消息队列主要分为两种类型:点对点(Point-to-Point,P2P)和发布/订阅(Publish/Subscribe,Pub/Sub)模式。1.2.1点对点模式在点对点模式中,消息被发送到队列,每个消息只能被一个消费者消费。一旦消息被消费,它就会从队列中移除。示例代码#使用RabbitMQ的点对点模式示例

importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个队列

channel.queue_declare(queue='hello')

#发送消息

channel.basic_publish(exchange='',

routing_key='hello',

body='HelloWorld!')

print("[x]Sent'HelloWorld!'")

connection.close()1.2.2发布/订阅模式在发布/订阅模式中,消息被发送到一个主题,所有订阅该主题的消费者都会收到消息的副本。这种模式适用于需要将消息广播给多个接收者的情况。示例代码#使用RabbitMQ的发布/订阅模式示例

importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个交换机

channel.exchange_declare(exchange='logs',

exchange_type='fanout')

#声明一个队列,并与交换机绑定

result=channel.queue_declare(queue='',exclusive=True)

queue_name=result.method.queue

channel.queue_bind(exchange='logs',

queue=queue_name)

#发布消息

channel.basic_publish(exchange='logs',

routing_key='',

body='HelloWorld!')

print("[x]Sent'HelloWorld!'")

connection.close()1.3消息队列的应用场景消息队列在多种场景下都有广泛的应用,包括但不限于:日志处理:收集来自不同服务的日志,进行统一处理和分析。任务队列:将耗时的任务放入队列,由后台工作进程异步处理。数据同步:在多个服务或系统之间同步数据,确保数据的一致性。消息通知:发送通知或消息给多个接收者,如邮件通知、短信通知等。流处理:处理实时数据流,如股票价格、传感器数据等。通过使用消息队列,可以有效地管理和优化这些场景下的数据流和任务处理,提高系统的整体性能和稳定性。2Kafka概述2.1Kafka的历史与发展Kafka是由LinkedIn公司于2010年开发的一款分布式消息系统,最初是为了处理网站的活动流数据和日志数据。随着其在LinkedIn内部的成功应用,Kafka于2011年被开源,并在2012年成为Apache的顶级项目。Kafka的设计灵感来源于Amazon的Kinesis和Google的Pub/Sub模型,但其独特的设计使其在消息处理领域脱颖而出,成为大数据和流处理的首选工具。2.2Kafka的特点Kafka具有以下显著特点:高吞吐量:Kafka能够处理每秒数百万的消息,提供低延迟的数据传输。持久性与可靠性:Kafka将消息存储在磁盘上,同时支持数据复制,确保数据不会丢失。可扩展性:Kafka的分布式架构允许它在多台服务器上运行,轻松扩展以处理更大的数据量。持久存储:Kafka可以作为持久的消息存储,支持长时间的数据保留。流处理:Kafka支持实时数据流处理,可以与Spark、Flink等流处理框架集成。多语言支持:Kafka提供了多种语言的客户端库,包括Java、Python、C++等,方便不同开发环境的使用。2.3Kafka的适用场景Kafka适用于以下场景:日志收集:Kafka可以作为日志收集系统,从多个数据源收集日志数据,然后将其传输到中央存储系统。流处理:实时处理和分析数据流,如实时监控、实时数据分析等。消息系统:构建基于发布/订阅模式的消息系统,用于解耦服务和应用。网站活动跟踪:跟踪用户在网站上的活动,如页面浏览、点击等,用于分析用户行为。数据集成:作为数据集成平台,Kafka可以连接不同的数据源和数据处理系统,实现数据的实时传输和集成。3Kafka架构与原理3.1Kafka架构Kafka的架构主要由以下组件构成:Producer:消息生产者,负责向Kafka的Topic中发送消息。Broker:Kafka集群中的服务器节点,负责存储和处理消息。Consumer:消息消费者,负责从Topic中读取消息。Topic:消息分类的逻辑概念,一个Topic可以被多个Producer和Consumer共享。Partition:物理上将Topic分割成多个分区,每个分区可以存储在不同的Broker上,以实现数据的并行处理和高可用性。Replication:为了提高数据的可靠性和系统的可用性,Kafka支持数据的复制,每个Partition可以有多个副本。3.2Kafka原理Kafka的核心原理包括:消息持久化:Kafka将消息存储在磁盘上,以文件的形式,这使得Kafka能够处理大量数据,并提供数据的持久性。分区与复制:Kafka通过分区将数据分散存储,每个分区可以有多个副本,以提高数据的可靠性和系统的可用性。消费组:Kafka支持消费组的概念,一个Topic的消息可以被多个消费组消费,每个消费组内的消费者可以并行处理消息,但一个消息只会被一个消费者消费。偏移量:Kafka使用偏移量来记录消息的读取位置,消费者可以控制消息的读取进度,实现消息的重放和跳过。3.2.1示例:Kafka消息生产与消费以下是一个使用Java的Kafka生产者和消费者示例://Kafka生产者示例

importducer.KafkaProducer;

importducer.ProducerRecord;

importjava.util.Properties;

publicclassKafkaProducerExample{

publicstaticvoidmain(String[]args){

Propertiesprops=newProperties();

props.put("bootstrap.servers","localhost:9092");

props.put("acks","all");

props.put("retries",0);

props.put("batch.size",16384);

props.put("linger.ms",1);

props.put("buffer.memory",33554432);

props.put("key.serializer","mon.serialization.StringSerializer");

props.put("value.serializer","mon.serialization.StringSerializer");

KafkaProducer<String,String>producer=newKafkaProducer<>(props);

for(inti=0;i<100;i++){

ProducerRecord<String,String>record=newProducerRecord<>("my-topic","key-"+i,"value-"+i);

producer.send(record);

}

producer.close();

}

}//Kafka消费者示例

importorg.apache.kafka.clients.consumer.ConsumerRecord;

importorg.apache.kafka.clients.consumer.ConsumerRecords;

importorg.apache.kafka.clients.consumer.KafkaConsumer;

importjava.time.Duration;

importjava.util.Collections;

importjava.util.Properties;

publicclassKafkaConsumerExample{

publicstaticvoidmain(String[]args){

Propertiesprops=newProperties();

props.put("bootstrap.servers","localhost:9092");

props.put("group.id","my-group");

props.put("mit","true");

props.put("erval.ms","1000");

props.put("key.deserializer","mon.serialization.StringDeserializer");

props.put("value.deserializer","mon.serialization.StringDeserializer");

KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList("my-topic"));

while(true){

ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(100));

for(ConsumerRecord<String,String>record:records){

System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());

}

}

}

}在这个示例中,我们创建了一个Kafka生产者和消费者。生产者向名为my-topic的Topic发送了100条消息,每条消息包含一个键和一个值。消费者订阅了my-topic,并持续从Topic中读取消息,打印出消息的偏移量、键和值。通过这个示例,我们可以看到Kafka如何处理消息的生产和消费,以及如何使用偏移量来跟踪消息的读取位置。这展示了Kafka在处理大规模数据流时的高效性和灵活性。4消息队列:Kafka:Kafka架构与原理4.1Kafka架构4.1.1Kafka的基本组件Kafka是一个分布式流处理平台,其架构设计围绕着几个核心组件:主题(Topic):Kafka中的消息被分类存储在主题中。一个主题可以被多个消费者订阅,且每个主题可以被分区以支持并行处理。分区(Partition):主题被分割成一个或多个分区,每个分区是一个有序的、不可变的消息队列。分区可以分布在不同的服务器上,以实现数据的并行处理和高可用性。副本(Replica):为了保证数据的持久性和高可用性,Kafka的每个分区都有一个或多个副本。主副本(Leader)负责所有读写操作,而从副本(Follower)则用于故障转移。生产者(Producer):生产者是向Kafka主题发送消息的应用程序。生产者可以指定消息发送到哪个主题,以及消息的具体内容。消费者(Consumer):消费者是从Kafka主题读取消息的应用程序。消费者可以订阅一个或多个主题,并从这些主题中读取和处理消息。消费者组(ConsumerGroup):消费者可以组织成消费者组,组内的消费者可以并行处理消息,但同一组内的消费者不会处理同一消息两次,这有助于实现消息的均衡处理和故障恢复。KafkaBroker:Kafka的服务器节点被称为Broker。Broker负责接收生产者发送的消息,存储消息,并将消息发送给消费者。Broker也可以进行消息的复制和分区管理。4.1.2生产者与消费者的交互生产者和消费者与Kafka的交互是通过Broker进行的。生产者将消息发送到特定的主题,而消费者则订阅这些主题以读取消息。以下是生产者和消费者交互的基本流程:生产者发送消息生产者使用Kafka的客户端库将消息发送到Broker。消息被封装在特定的主题中,生产者可以指定消息的分区,或者让Kafka自动选择分区。以下是一个使用Java的Kafka生产者发送消息的示例:importducer.KafkaProducer;

importducer.ProducerRecord;

importjava.util.Properties;

publicclassSimpleProducer{

publicstaticvoidmain(String[]args){

//设置配置属性

Propertiesprops=newProperties();

props.put("bootstrap.servers","localhost:9092");

props.put("acks","all");

props.put("retries",0);

props.put("batch.size",16384);

props.put("linger.ms",1);

props.put("buffer.memory",33554432);

props.put("key.serializer","mon.serialization.StringSerializer");

props.put("value.serializer","mon.serialization.StringSerializer");

//创建生产者实例

KafkaProducer<String,String>producer=newKafkaProducer<>(props);

//发送消息

for(inti=0;i<100;i++){

ProducerRecord<String,String>record=newProducerRecord<>("my-topic","key-"+i,"value-"+i);

producer.send(record);

}

//关闭生产者

producer.close();

}

}消费者读取消息消费者订阅主题后,可以从Broker读取消息。消费者可以设置从哪个偏移量开始读取,这使得消费者可以重新处理消息或从特定点开始消费。以下是一个使用Java的Kafka消费者读取消息的示例:importorg.apache.kafka.clients.consumer.ConsumerRecord;

importorg.apache.kafka.clients.consumer.ConsumerRecords;

importorg.apache.kafka.clients.consumer.KafkaConsumer;

importjava.time.Duration;

importjava.util.Collections;

importjava.util.Properties;

publicclassSimpleConsumer{

publicstaticvoidmain(String[]args){

//设置配置属性

Propertiesprops=newProperties();

props.put("bootstrap.servers","localhost:9092");

props.put("group.id","my-group");

props.put("mit","true");

props.put("erval.ms","1000");

props.put("key.deserializer","mon.serialization.StringDeserializer");

props.put("value.deserializer","mon.serialization.StringDeserializer");

//创建消费者实例

KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList("my-topic"));

//读取消息

while(true){

ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(100));

for(ConsumerRecord<String,String>record:records){

System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());

}

}

}

}4.1.3Kafka集群架构Kafka集群由一个或多个Broker组成,这些Broker可以分布在不同的服务器上。集群中还包含ZooKeeper,它用于管理集群的元数据,如主题的分区信息、Broker的状态以及消费者组的偏移量。Kafka的集群架构设计确保了高可用性和可扩展性。高可用性:通过分区的副本机制,即使部分Broker失效,Kafka仍然能够提供服务。ZooKeeper跟踪Broker的状态,确保在故障发生时能够快速恢复。可扩展性:Kafka集群可以通过增加更多的Broker来扩展,以支持更多的数据和更高的吞吐量。主题的分区可以分布在不同的Broker上,实现数据的并行处理。Kafka的集群架构还支持水平扩展,即可以通过增加更多的Broker来增加系统的处理能力和存储容量,而无需修改现有的应用程序代码。4.2总结Kafka的架构设计围绕着主题、分区、副本、生产者、消费者和Broker等核心组件。通过这些组件的协同工作,Kafka能够提供高吞吐量、低延迟和高可用性的消息处理服务。生产者和消费者通过Broker进行交互,而集群架构则确保了系统的可扩展性和故障恢复能力。5Kafka工作原理5.1数据的持久化与复制Kafka作为一款分布式消息系统,其核心特性之一就是数据的持久化与复制。持久化确保了消息在磁盘上的存储,即使在节点故障的情况下,消息也不会丢失。复制则通过在集群中多个节点上存储相同的数据,提高了系统的可用性和容错性。5.1.1持久化Kafka使用日志结构存储数据,每条消息都被追加到日志的末尾。这种设计使得写入操作非常高效,因为磁盘的顺序写入速度远高于随机写入。同时,Kafka会定期将内存中的数据刷新到磁盘,以确保数据的持久性。5.1.2复制Kafka的复制机制是通过副本(Replica)实现的。每个主题的分区都有一个主副本(LeaderReplica)和零个或多个从副本(FollowerReplica)。主副本负责处理所有读写请求,而从副本则同步主副本的数据,以备主副本故障时能够快速接管。代码示例Kafka的复制机制在代码层面主要通过配置参数来实现,例如设置replication.factor来指定分区的副本数量。以下是一个使用Java客户端创建主题的例子,其中指定了副本数量:importorg.apache.kafka.clients.admin.AdminClient;

importorg.apache.kafka.clients.admin.AdminClientConfig;

importorg.apache.kafka.clients.admin.NewTopic;

importjava.util.Collections;

importjava.util.Properties;

publicclassKafkaTopicCreator{

publicstaticvoidmain(String[]args){

Propertiesprops=newProperties();

props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

try(AdminClientadminClient=AdminClient.create(props)){

NewTopictopic=newNewTopic("test-topic",1,(short)3);//创建一个名为test-topic的主题,包含1个分区,副本数量为3

adminClient.createTopics(Collections.singletonList(topic));

}

}

}5.1.3数据样例假设我们有一个主题test-topic,其中包含以下消息:消息ID主题分区偏移量时间戳键值1test-topic001628712000000null“Hello,Kafka!”2test-topic011628712001000“key1”“value1”这些消息将被持久化存储在磁盘上,并在多个节点上进行复制,以确保数据的高可用性和持久性。5.2分区与副本机制Kafka通过分区(Partition)和副本(Replica)机制来实现高吞吐量和数据的可靠性。5.2.1分区分区是Kafka主题的基本单位,每个主题可以包含多个分区。分区内的消息是有序的,而不同分区之间的消息是无序的。通过分区,Kafka能够在多个节点上并行处理消息,从而提高系统的吞吐量。5.2.2副本每个分区都有一个主副本和零个或多个从副本。主副本负责处理所有读写请求,而从副本则同步主副本的数据,以备主副本故障时能够快速接管。副本机制提高了系统的容错性和数据的可靠性。代码示例以下是一个使用Java客户端消费主题test-topic中分区0的消息的例子:importorg.apache.kafka.clients.consumer.ConsumerConfig;

importorg.apache.kafka.clients.consumer.ConsumerRecord;

importorg.apache.kafka.clients.consumer.ConsumerRecords;

importorg.apache.kafka.clients.consumer.KafkaConsumer;

importmon.serialization.StringDeserializer;

importjava.time.Duration;

importjava.util.Collections;

importjava.util.Properties;

publicclassKafkaConsumerExample{

publicstaticvoidmain(String[]args){

Propertiesprops=newProperties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-group");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList("test-topic"));

while(true){

ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(100));

for(ConsumerRecord<String,String>record:records){

if(record.partition()==0){

System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());

}

}

}

}

}5.2.3数据样例在上述代码示例中,假设test-topic主题的分区0中有以下消息:消息ID主题分区偏移量时间戳键值1test-topic001628712000000null“Hello,Kafka!”2test-topic011628712001000“key1”“value1”消费者将按照偏移量的顺序消费这些消息。5.3Kafka的消费模型Kafka的消费模型基于订阅和发布模式,允许消费者组(ConsumerGroup)消费主题中的消息。消费者组中的每个消费者都会消费主题中的一部分分区,确保了消息的并行处理和负载均衡。5.3.1消费者组消费者组是一个逻辑概念,它将多个消费者组织在一起,共同消费一个或多个主题。当一个主题有多个分区时,消费者组中的消费者将并行消费这些分区,每个分区只被组内的一个消费者消费。5.3.2消费者偏移量消费者在消费消息时,会跟踪每个分区的偏移量,以记录其消费进度。消费者可以手动提交偏移量,也可以配置自动提交。偏移量的管理确保了消息的有序消费和重试机制。代码示例以下是一个使用Java客户端手动提交偏移量的消费者示例:importorg.apache.kafka.clients.consumer.ConsumerConfig;

importorg.apache.kafka.clients.consumer.ConsumerRecord;

importorg.apache.kafka.clients.consumer.ConsumerRecords;

importorg.apache.kafka.clients.consumer.KafkaConsumer;

importmon.TopicPartition;

importmon.serialization.StringDeserializer;

importjava.time.Duration;

importjava.util.ArrayList;

java.util.Collections;

importjava.util.List;

importjava.util.Properties;

publicclassKafkaConsumerWithOffsetCommit{

publicstaticvoidmain(String[]args){

Propertiesprops=newProperties();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");

props.put(ConsumerConfig.GROUP_ID_CONFIG,"test-group");

props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());

KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

consumer.subscribe(Collections.singletonList("test-topic"));

List<TopicPartition>partitions=newArrayList<>();

partitions.add(newTopicPartition("test-topic",0));

consumer.assign(partitions);

while(true){

ConsumerRecords<String,String>records=consumer.poll(Duration.ofMillis(100));

for(ConsumerRecord<String,String>record:records){

System.out.printf("offset=%d,key=%s,value=%s%n",record.offset(),record.key(),record.value());

}

mitSync();//手动提交偏移量

}

}

}5.3.3数据样例在上述代码示例中,假设test-topic主题的分区0中有以下消息:消息ID主题分区偏移量时间戳键值1test-topic001628712000000null“Hello,Kafka!”2test-topic011628712001000“key1”“value1”消费者将按照偏移量的顺序消费这些消息,并在每次消费后手动提交偏移量,以记录其消费进度。6Kafka的高可用与高吞吐6.1Kafka的高可用性设计Kafka的高可用性主要通过其分布式架构和数据复制机制来实现。Kafka集群由多个Broker组成,每个Broker都是一个服务器,可以独立运行。数据在Kafka中以Topic的形式存储,每个Topic可以被划分为多个Partition,这些Partition可以分布在不同的Broker上,从而实现数据的水平扩展。6.1.1数据复制Kafka通过数据复制来保证数据的高可用性。每个Partition都有一个主副本(Leader)和多个从副本(Follower)。当Leader副本不可用时,Kafka会自动从Follower副本中选举出一个新的Leader,从而保证了数据的持续可用性。此外,数据的多副本存储也提高了数据的持久性和容错性。6.1.2分区策略Kafka的分区策略也是其高可用性设计的关键。通过将Topic的数据均匀分布到多个Partition中,Kafka可以实现负载均衡,避免单点故障。同时,Partition的独立性也意味着,即使某个Broker出现故障,其他Broker上的Partition仍然可以正常工作,从而保证了整个系统的稳定性。6.2Kafka的高吞吐量实现Kafka的高吞吐量主要得益于其设计的低延迟和高并发特性。Kafka使用了高效的磁盘数据结构和零拷贝技术,以及多线程处理机制,从而实现了极高的消息处理速度。6.2.1零拷贝技术Kafka使用了零拷贝技术来减少数据处理过程中的CPU开销。在传统的数据处理流程中,数据从网络接收后,需要从用户空间拷贝到内核空间,处理后再从内核空间拷贝回用户空间,这个过程会消耗大量的CPU资源。而Kafka的零拷贝技术则避免了这个过程,数据可以直接从网络接收后,通过DMA(DirectMemoryAccess)方式写入磁盘,从而大大提高了数据处理的效率。6.2.2多线程处理Kafka的Broker使用了多线程处理机制,每个Broker可以同时处理多个请求,从而实现了高并发。Kafka的Broker内部有一个线程池,每个线程负责处理一个网络请求,这样就可以充分利用多核CPU的计算能力,提高系统的吞吐量。6.3Kafka的性能调优Kafka的性能调优主要涉及到Broker配置、Topic配置、Producer和Consumer配置等多个方面。以下是一些关键的调优策略:6.3.1Broker配置磁盘I/O优化:使用SSD硬盘可以显著提高Kafka的性能,因为SSD的读写速度远高于HDD。JVM配置:合理设置JVM的堆内存大小,避免频繁的垃圾回收,可以提高Kafka的性能。网络配置:增加网络缓冲区大小,可以减少网络I/O的等待时间,提高Kafka的吞吐量。6.3.2Topic配置分区数:增加Topic的分区数,可以提高Kafka的并发处理能力,但同时也会增加Broker的负载。副本因子:增加Topic的副本因子,可以提高数据的持久性和容错性,但同时也会增加磁盘的使用量。6.3.3Producer和Consumer配置批量发送:Producer可以设置批量发送的大小,这样可以减少网络I/O的次数,提高发送效率。压缩:Producer可以设置消息的压缩方式,这样可以减少网络传输的数据量,提高传输效率。Consumer并行处理:Consumer可以设置多个线程来并行处理消息,这样可以提高消息的处理速度。6.3.4示例:KafkaProducer批量发送配置Propertiesprops=newProperties();

props.put("bootstrap.servers","localhost:9092");

props.put("acks","all");

props.put("retries",0);

props.put("batch.size",16384);//设置批量发送的大小

props.put("linger.ms",1);//设置批量发送的等待时间

props.put("buffer.memory",33554432);

props.put("key.serializer","mon.serialization.StringSerializer");

props.put("value.serializer","mon.serialization.StringSerializer");

Producer<String,String>producer=newKafkaProducer<>(props);

for(inti=0;i<100;i++){

producer.send(newProducerRecord<String,String>("my-topic",Integer.toString(i),Integer.toString(i)));

}

producer.close();在这个例子中,我们设置了Producer的批量发送大小为16384字节,这意味着Producer会等待消息的大小达到16384字节后,才会将消息发送到Broker。这样可以减少网络I/O的次数,提高发送效率。6.3.5示例:KafkaConsumer并行处理配置Propertiesprops=newProperties();

props.put("bootstrap.servers","localhost:9092");

props.put("group.id","test");

props.put("mit","true");

props.put("erval.ms","1000");

props.put("key.deserializer","mon.serialization.StringDeserializer");

props.put("value.deserializer","mon.serialization.StringDeserializer");

KafkaConsumer<String,String>consumer=newKafkaConsumer<>(props);

consumer.subscribe(Arrays.asList("my-topic"))

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论