版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领
文档简介
消息队列:Pulsar:Pulsar的订阅模式与消息重试1消息队列基础1.1消息队列的定义消息队列是一种应用程序间的通信方法,它允许消息的发送者不会因为接收者暂时无法处理消息而阻塞。消息队列通过在消息的生产者和消费者之间提供一个缓冲区,实现了异步通信和解耦。消息队列可以处理大量并发消息,提高系统的响应速度和吞吐量,同时还能保证消息的可靠传输。1.2消息队列的作用消息队列在现代软件架构中扮演着关键角色,主要作用包括:异步处理:允许生产者和消费者异步操作,提高系统响应速度。负载均衡:通过消息队列,可以将任务均匀地分配给多个消费者,实现负载均衡。故障恢复:消息队列可以持久化消息,即使消费者失败,消息也不会丢失,可以重新处理。解耦:生产者和消费者不需要直接通信,降低了系统的耦合度,提高了系统的可维护性和可扩展性。1.3Pulsar简介ApachePulsar是一个高性能、可扩展的分布式消息队列系统。它提供了消息持久化、分层存储、多租户、全球地理复制等功能,使其成为构建现代消息队列和流处理应用的理想选择。Pulsar的设计目标是提供一个统一的平台,支持消息队列和流处理两种模式,同时保持高性能和低延迟。1.3.1Pulsar的架构Pulsar采用了一种分层的架构,主要包括:Broker:负责消息的路由和管理,处理客户端的请求。BookKeeper:提供消息的持久化存储,保证消息的可靠性和持久性。ZooKeeper:用于协调集群中的Broker,管理集群的元数据。1.3.2Pulsar的特性持久化存储:Pulsar使用BookKeeper来存储消息,保证消息的持久化和可靠性。多租户:Pulsar支持多租户,每个租户可以有自己的命名空间和主题。全球地理复制:Pulsar支持跨地域的复制,可以将消息复制到全球的多个数据中心,提高系统的可用性和容灾能力。分层存储:Pulsar支持冷热数据的分层存储,可以将热点数据存储在高速的SSD上,将冷数据存储在低成本的HDD上,以优化存储成本和性能。1.3.3Pulsar的使用示例以下是一个使用Python客户端向Pulsar主题发送消息的示例:frompulsarimportClient
#创建Pulsar客户端
client=Client('pulsar://localhost:6650')
#创建生产者
producer=client.create_producer('persistent://public/default/my-topic')
#发送消息
foriinrange(10):
producer.send(('HelloPulsar%d'%i).encode('utf-8'))
#关闭客户端
client.close()在这个示例中,我们首先创建了一个Pulsar客户端,然后创建了一个生产者,用于向主题my-topic发送消息。我们发送了10条消息,每条消息的内容都是HelloPulsar加上一个数字。最后,我们关闭了客户端。1.3.4Pulsar的订阅模式Pulsar支持两种订阅模式:独占订阅(Exclusive)和共享订阅(Shared)。独占订阅:一个主题只能有一个消费者订阅,如果多个消费者订阅了同一个主题,只有其中一个消费者可以接收消息。共享订阅:一个主题可以有多个消费者订阅,消息会被均匀地分配给所有消费者。1.3.5Pulsar的消息重试Pulsar提供了消息重试机制,当消费者无法处理消息时,消息会被重新发送给消费者。消息重试的次数和间隔可以通过配置来控制。例如,以下是一个使用Java客户端配置消息重试的示例:importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassRetryConsumer{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(10,TimeUnit.SECONDS)//设置消息重试间隔
.maxRedeliveryCount(5)//设置消息重试次数
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.negativeAcknowledge(msg);
}
}
}
}在这个示例中,我们创建了一个共享订阅的消费者,设置了消息重试间隔为10秒,消息重试次数为5次。当消费者接收到消息后,如果处理消息时发生异常,消费者会调用negativeAcknowledge方法,将消息标记为未处理,Pulsar会根据配置的消息重试间隔和次数,重新发送这条消息给消费者。1.3.6总结Pulsar是一个功能强大的消息队列系统,它提供了消息持久化、多租户、全球地理复制和分层存储等功能,支持独占订阅和共享订阅两种模式,同时提供了消息重试机制,保证了消息的可靠处理。通过使用Pulsar,可以构建出高性能、可扩展、可靠的消息队列和流处理应用。2消息队列:Pulsar:深入理解Pulsar的订阅模式在ApachePulsar消息队列中,订阅模式是消息消费的核心机制之一,它决定了多个消费者如何处理来自同一主题的消息。Pulsar提供了四种订阅模式:独占订阅模式、共享订阅模式、故障转移订阅模式和键共享订阅模式。每种模式都有其特定的使用场景和优势,下面将详细介绍这四种订阅模式的原理和应用场景。2.1独占订阅模式(Exclusive)2.1.1原理在独占订阅模式下,一个主题只能有一个活动的订阅者。如果多个消费者尝试订阅同一主题,只有第一个订阅者能够成功接收消息,其他订阅者将被阻止直到第一个订阅者断开连接。这种模式确保了消息的顺序处理和唯一性。2.1.2示例代码//创建Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建独占订阅
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//消费消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);2.1.3解释上述代码展示了如何在Pulsar中创建一个独占订阅。SubscriptionType.Exclusive参数确保了只有创建此订阅的消费者能够接收消息。如果另一个消费者尝试使用相同的订阅名称订阅同一主题,它将被阻止直到当前订阅者断开连接。2.2共享订阅模式(Shared)2.2.1原理共享订阅模式允许多个消费者同时订阅同一主题。消息将被分发给订阅者中的任意一个,但不会重复发送给其他订阅者。这种模式提高了系统的并行处理能力,但不保证消息的顺序。2.2.2示例代码//创建Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建共享订阅
Consumer<String>consumer1=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
Consumer<String>consumer2=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
//消费消息
Message<String>msg1=consumer1.receive();
System.out.println("Consumer1receivedmessage:"+msg1.getValue());
consumer1.acknowledge(msg1);
Message<String>msg2=consumer2.receive();
System.out.println("Consumer2receivedmessage:"+msg2.getValue());
consumer2.acknowledge(msg2);2.2.3解释在共享订阅模式下,多个消费者可以使用相同的订阅名称订阅同一主题。消息将被分发给任意一个订阅者,但不会重复发送给其他订阅者。这使得系统能够并行处理消息,提高了处理效率。2.3故障转移订阅模式(Failover)2.3.1原理故障转移订阅模式类似于独占订阅,但允许多个消费者订阅同一主题,每个消费者都有一个唯一的分区。当一个消费者(分区)失败时,其未处理的消息将被重新分配给其他消费者。这种模式保证了消息的顺序处理和高可用性。2.3.2示例代码//创建Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建故障转移订阅
Consumer<String>consumer1=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.consumerName("consumer1")
.subscribe();
Consumer<String>consumer2=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Failover)
.consumerName("consumer2")
.subscribe();
//消费消息
Message<String>msg1=consumer1.receive();
System.out.println("Consumer1receivedmessage:"+msg1.getValue());
consumer1.acknowledge(msg1);
Message<String>msg2=consumer2.receive();
System.out.println("Consumer2receivedmessage:"+msg2.getValue());
consumer2.acknowledge(msg2);2.3.3解释在故障转移订阅模式下,每个消费者都有一个唯一的分区。当一个消费者失败时,其未处理的消息将被重新分配给其他消费者。这确保了即使在消费者失败的情况下,消息也能被正确处理,同时保持了消息的顺序。2.4键共享订阅模式(Key_Shared)2.4.1原理键共享订阅模式允许消息根据其键(key)被分发到特定的消费者。这种模式确保了具有相同键的消息总是被同一个消费者处理,即使有多个消费者订阅同一主题。这在需要根据消息键进行一致性处理的场景中非常有用。2.4.2示例代码//创建Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建键共享订阅
Consumer<String>consumer1=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.consumerName("consumer1")
.subscribe();
Consumer<String>consumer2=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.consumerName("consumer2")
.subscribe();
//生产者发送带有键的消息
Producer<String>producer=client.newProducer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.send("message1","key1".getBytes());
producer.send("message2","key2".getBytes());
//消费消息
Message<String>msg1=consumer1.receive();
System.out.println("Consumer1receivedmessage:"+msg1.getValue());
Message<String>msg2=consumer2.receive();
System.out.println("Consumer2receivedmessage:"+msg2.getValue());2.4.3解释在键共享订阅模式下,消息根据其键被分发到特定的消费者。在上述示例中,producer.send方法被用来发送带有键的消息。"key1"和"key2"确保了消息将被分发到不同的消费者,具有相同键的消息将始终被同一个消费者处理。2.5消息重试在Pulsar中,消息重试机制允许在消息处理失败时重新发送消息。这可以通过设置消息的重试次数和重试策略来实现,确保了消息的可靠处理。2.5.1示例代码//创建Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建消费者并设置消息重试策略
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.negativeAckRedeliveryDelay(1,TimeUnit.MINUTES)//设置消息重试延迟
.subscribe();
//消费消息
Message<String>msg=consumer.receive();
try{
System.out.println("Receivedmessage:"+msg.getValue());
//模拟消息处理失败
if(msg.getValue().equals("message1")){
consumer.negativeAcknowledge(msg);
}else{
consumer.acknowledge(msg);
}
}catch(Exceptione){
consumer.negativeAcknowledge(msg);
}2.5.2解释在上述代码中,negativeAckRedeliveryDelay方法被用来设置消息重试的延迟时间。当消息处理失败时,消费者调用negativeAcknowledge方法,这将导致消息在指定的延迟时间后被重新发送。这种机制确保了即使在处理失败的情况下,消息也能被重新尝试处理,提高了消息处理的可靠性。通过理解Pulsar的订阅模式和消息重试机制,开发者可以更有效地设计和实现消息处理系统,确保消息的正确处理和系统的高可用性。3消息重试机制3.1消息重试的重要性在分布式系统中,消息队列如ApachePulsar扮演着关键角色,用于在服务之间传递消息。然而,网络延迟、服务故障或消费者处理逻辑的复杂性可能导致消息处理失败。消息重试机制是确保消息至少被成功处理一次的关键策略。它通过在消息处理失败时自动或手动地重新发送消息,从而提高系统的可靠性和容错性。3.2Pulsar消息重试策略ApachePulsar提供了多种消息重试策略,以适应不同的业务场景和需求。这些策略包括:3.2.1自动重试Pulsar可以配置自动重试机制,当消息处理失败时,消息会被自动重新发送到消费者。自动重试次数和重试间隔可以通过Pulsar的配置参数进行调整。3.2.2手动重试在某些情况下,可能需要更精细的控制。Pulsar允许消费者在消息处理失败时手动触发重试。这通常通过在消息处理函数中抛出异常或使用特定的API来实现。3.2.3死信队列对于那些即使经过多次重试也无法成功处理的消息,Pulsar提供了死信队列(DeadLetterQueue,DLQ)机制。这些消息会被移动到DLQ中,以便后续的人工检查或特殊处理。3.3实现消息重试的步骤要实现Pulsar中的消息重试,可以遵循以下步骤:3.3.1步骤1:配置自动重试在Pulsar的消费者配置中,可以设置自动重试的次数和间隔。例如,以下是一个使用JavaAPI配置自动重试的例子:importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarConsumerWithRetry{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.negativeAckRedeliveryDelay(10,TimeUnit.SECONDS)//设置重试间隔
.redeliveryBackoff(10,TimeUnit.SECONDS)//设置重试间隔
.maxRedeliverCount(5)//设置最大重试次数
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
//消息处理逻辑
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.negativeAcknowledge(msg);//处理失败,触发重试
}
}
}
}3.3.2步骤2:手动触发重试如果需要在消息处理失败时手动触发重试,可以在消息处理函数中抛出异常或使用negativeAcknowledge方法。以下是一个示例:importorg.apache.pulsar.client.api.Message;
importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarConsumerManualRetry{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
//消息处理逻辑
System.out.println("Receivedmessage:"+msg.getValue());
if(msg.getValue().equals("error")){
thrownewException("Messageprocessingerror");
}
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.negativeAcknowledge(msg);//处理失败,触发重试
}
}
}
}3.3.3步骤3:配置死信队列对于那些即使经过多次重试也无法成功处理的消息,可以配置死信队列。以下是一个配置DLQ的例子:importorg.apache.pulsar.client.api.Consumer;
importorg.apache.pulsar.client.api.PulsarClient;
importorg.apache.pulsar.client.api.PulsarClientException;
publicclassPulsarConsumerWithDLQ{
publicstaticvoidmain(String[]args)throwsPulsarClientException{
PulsarClientclient=PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.build();
Consumer<String>consumer=client.newConsumer()
.topic("persistent://public/default/my-topic")
.subscriptionName("my-subscription")
.deadLetterTopic("persistent://public/default/my-dlq-topic")//设置DLQ主题
.subscribe();
while(true){
Message<String>msg=consumer.receive();
try{
//消息处理逻辑
System.out.println("Receivedmessage:"+msg.getValue());
consumer.acknowledge(msg);
}catch(Exceptione){
consumer.redeliverLater(msg,10,TimeUnit.SECONDS);//处理失败,延迟重试
}
}
}
}在上述示例中,如果消息处理失败,它将被重新发送到DLQ主题,而不是无限次重试。通过以上步骤,可以有效地在ApachePulsar中实现消息重试机制,从而提高系统的可靠性和容错性。4高级订阅与重试4.1订阅模式的高级用法在ApachePulsar中,订阅模式是消息消费的核心机制,它决定了消息如何被多个消费者处理。Pulsar支持两种主要的订阅模式:Exclusive和Shared,以及一种特殊的模式Failover。除此之外,Pulsar还提供了Key_Shared订阅模式,用于更细粒度的负载均衡和消息处理。4.1.1Exclusive订阅模式在Exclusive模式下,一个订阅只能被一个消费者消费。如果多个消费者订阅了同一个主题,只有第一个连接的消费者能够接收消息。当该消费者断开连接时,其他消费者才能开始接收消息。示例代码//创建一个Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建一个Exclusive订阅的消费者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//消费消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//确认消息
consumer.acknowledge(msg);4.1.2Shared订阅模式Shared模式允许多个消费者同时消费一个订阅。消息会被均匀地分发给所有消费者,每个消息只会被一个消费者处理。示例代码//创建一个Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建一个Shared订阅的消费者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
//消费消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//确认消息
consumer.acknowledge(msg);4.1.3Failover订阅模式Failover模式类似于Exclusive模式,但当一个消费者断开连接时,其未处理的消息会被重新分配给下一个消费者。4.1.4Key_Shared订阅模式Key_Shared模式允许消息根据消息键在多个消费者之间共享。这确保了具有相同键的消息总是由同一个消费者处理,从而实现一致性。示例代码//创建一个Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建一个Key_Shared订阅的消费者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Key_Shared)
.subscribe();
//消费消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//确认消息
consumer.acknowledge(msg);4.2优化消息重试在消息队列中,消息重试机制是处理失败消息的关键。Pulsar提供了多种方式来优化消息重试,包括消息重发、消息保留策略和死信队列。4.2.1消息重发当消费者无法处理消息时,可以通过negativeAcknowledge方法将消息返回到队列中,以便稍后重试。示例代码//创建一个Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建一个消费者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscribe();
//消费消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//如果消息处理失败,重试消息
if(/*处理失败的条件*/){
consumer.negativeAcknowledge(msg);
}else{
//确认消息
consumer.acknowledge(msg);
}4.2.2消息保留策略Pulsar允许设置消息保留策略,以控制消息在队列中的存储时间。这可以通过设置retentionTimeInMinutes和retentionSizeInMB来实现。示例代码//创建一个Pulsar管理员对象
Adminadmin=Admin.builder().serviceHttpUrl("http://localhost:8080").build();
//设置消息保留策略
admin.topics().setRetention("persistent://sample/standalone/ns/my-topic",
newRetentionPolicies(1,TimeUnit.DAYS),1024);4.2.3死信队列当消息在一定次数的重试后仍然无法被处理时,可以将这些消息发送到死信队列,以便进行进一步的分析或处理。4.3监控与调整重试策略Pulsar提供了丰富的监控指标,可以用来跟踪消息队列的健康状况和性能。通过监控,可以调整重试策略,以优化消息处理流程。4.3.1使用PulsarManager监控PulsarManager是一个图形界面工具,可以用来监控和管理Pulsar集群。它提供了消息队列的实时监控数据,包括消息的发送和接收速率、消息积压和消费者状态。4.3.2调整重试策略根据监控数据,可以调整消息重试的次数和间隔,以适应不同的业务需求。例如,可以增加重试次数,以提高消息处理的可靠性;或者增加重试间隔,以减轻消费者在高负载下的压力。示例代码//创建一个Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建一个消费者,设置重试策略
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.consumerName("my-consumer")
.maxUnackedMessages(1000)
.ackTimeout(10,TimeUnit.SECONDS)
.subscribe();
//消费消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//如果消息处理失败,重试消息
if(/*处理失败的条件*/){
consumer.negativeAcknowledge(msg);
}else{
//确认消息
consumer.acknowledge(msg);
}在这个例子中,maxUnackedMessages和ackTimeout是用来控制消息重试策略的参数。maxUnackedMessages定义了消费者可以同时处理的消息数量,而ackTimeout定义了消费者处理消息的超时时间。如果消费者在ackTimeout时间内没有确认消息,Pulsar会自动将消息重发给其他消费者。通过这些高级订阅模式和重试策略的使用,可以构建出更健壮、更灵活的消息处理系统。在实际应用中,应根据业务需求和系统性能,合理选择和调整订阅模式和重试策略,以达到最佳的消息处理效果。5实践案例5.1使用Pulsar处理高并发场景在处理高并发场景时,Pulsar消息队列因其高性能和可扩展性成为许多企业的首选。Pulsar支持多种订阅模式,包括独占(Exclusive)、共享(Shared)、键共享(Key_Shared)和失败重试(Failover),这些模式可以灵活地满足不同场景下的需求。5.1.1独占订阅模式独占订阅模式下,一个主题只能被一个消费者订阅。如果多个消费者尝试订阅同一主题,只有第一个订阅者能够成功接收消息,其余的将被拒绝。这种模式适用于需要确保消息只被一个消费者处理的场景。示例代码//创建一个Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建一个独占订阅的消费者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
//消费消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//确认消息已处理
consumer.acknowledge(msg);
//关闭消费者和客户端
consumer.close();
client.close();5.1.2共享订阅模式共享订阅模式下,多个消费者可以订阅同一主题,消息会被均匀地分发给所有订阅者。这种模式适用于需要水平扩展消费者以处理更多消息的场景。示例代码//创建一个Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建一个共享订阅的消费者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.subscribe();
//消费消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//确认消息已处理
consumer.acknowledge(msg);
//关闭消费者和客户端
consumer.close();
client.close();5.1.3键共享订阅模式键共享订阅模式下,消息根据消息键(如果存在)被分发到特定的消费者。这种模式适用于需要根据消息内容进行路由的场景,例如,根据用户ID将消息路由到处理该用户请求的消费者。示例代码//创建一个Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建一个键共享订阅的消费者
Consumer<String>consumer=client.newConsumer(Schema.STRING)
.topic("persistent://sample/standalone/ns/my-topic")
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.KeyShared)
.subscribe();
//消费消息
Message<String>msg=consumer.receive();
System.out.println("Receivedmessage:"+msg.getValue());
//确认消息已处理
consumer.acknowledge(msg);
//关闭消费者和客户端
consumer.close();
client.close();5.1.4失败重试订阅模式失败重试订阅模式下,如果一个消费者无法处理消息,消息会被重新分发给其他订阅者。这种模式适用于需要高可用性和容错性的场景。示例代码//创建一个Pulsar客户端
PulsarClientclient=PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
//创建一个失败重试订阅的消费者
Consumer<String>consumer=client.new
温馨提示
- 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
- 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
- 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
- 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
- 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
- 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
- 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。
最新文档
- 贵州城市职业学院《房地产策划与运营》2023-2024学年第一学期期末试卷
- 淫羊藿培育项目可行性研究报告-淫羊藿市场需求持续增大
- 贵阳人文科技学院《聚合物改性原理及方法》2023-2024学年第一学期期末试卷
- 广州中医药大学《英语教师核心素养解读》2023-2024学年第一学期期末试卷
- 2025山东省安全员-B证考试题库附答案
- 2025年云南省安全员《A证》考试题库及答案
- 广州应用科技学院《建筑给排水与消防》2023-2024学年第一学期期末试卷
- 广州现代信息工程职业技术学院《增材制造技术》2023-2024学年第一学期期末试卷
- 2025黑龙江省建筑安全员C证(专职安全员)考试题库
- 2025年河南省建筑安全员-C证(专职安全员)考试题库
- 陕西省西安市英语中考试卷与参考答案(2025年)
- 中山市2023-2024八年级上学期期末考试数学试卷
- 2024年广州市南沙区初中语文毕业班模拟考试卷(附答案解析)
- 物业服务考核办法及评分细则(表格模板)
- 2024年春九年级化学下册 第九单元 溶液教案 (新版)新人教版
- 临高后水湾开放式海洋养殖项目可行性研究报告
- DL-T 1071-2023 电力大件运输规范
- GB/T 44143-2024科技人才评价规范
- 流感防治技术方案
- 对医院领导的批评意见怎么写更合适范文(6篇)
- 2023年IEC17025检测和校准实验室管理手册
评论
0/150
提交评论