消息队列:RabbitMQ:RabbitMQ工作模式详解_第1页
消息队列:RabbitMQ:RabbitMQ工作模式详解_第2页
消息队列:RabbitMQ:RabbitMQ工作模式详解_第3页
消息队列:RabbitMQ:RabbitMQ工作模式详解_第4页
消息队列:RabbitMQ:RabbitMQ工作模式详解_第5页
已阅读5页,还剩21页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

消息队列:RabbitMQ:RabbitMQ工作模式详解1消息队列基础概念1.1消息队列简介消息队列是一种应用程序间通信(IPC)的模式,它允许消息在发送者和接收者之间异步传递。这种模式的核心是队列,消息被发送到队列中,接收者从队列中取出并处理消息。消息队列的主要优点包括解耦、异步处理和流量削峰。1.1.1解耦发送者和接收者之间不需要直接通信,这使得系统更加灵活,易于扩展和维护。1.1.2异步处理接收者可以在方便的时候处理消息,这有助于提高系统的响应速度和吞吐量。1.1.3流量削峰在高流量时,消息队列可以暂时存储消息,避免系统过载。1.2RabbitMQ简介RabbitMQ是一个开源的消息代理和队列服务器,实现AMQP(AdvancedMessageQueuingProtocol)协议。它提供了一种在分布式系统中存储和转发消息的可靠方式,支持多种消息队列模式,包括简单模式、工作队列、发布/订阅、路由、主题和RPC。1.2.1特性可靠性:保证消息的持久性和可靠性。灵活性:支持多种消息队列模式。可扩展性:易于在集群中扩展。安全性:支持用户认证和权限管理。1.3消息队列的作用与优势1.3.1作用异步通信:允许系统组件异步通信,提高系统响应速度。负载均衡:在多个接收者之间分发消息,实现负载均衡。错误处理:提供重试机制,确保消息被正确处理。1.3.2优势提高系统稳定性:通过异步处理和流量削峰,减少系统过载的风险。简化系统设计:通过解耦,简化了系统组件之间的直接通信,使得系统设计更加清晰。增强可扩展性:系统可以更容易地添加或删除组件,而不会影响其他部分。2RabbitMQ工作模式详解2.1简单模式简单模式是最基础的模式,一个生产者发送消息到队列,一个消费者从队列中接收消息。2.1.1示例代码importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='hello')

#发送消息

channel.basic_publish(exchange='',

routing_key='hello',

body='HelloWorld!')

print("[x]Sent'HelloWorld!'")

connection.close()importpika

defcallback(ch,method,properties,body):

print("[x]Received%r"%body)

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='hello')

#开始接收消息

channel.basic_consume(queue='hello',

on_message_callback=callback,

auto_ack=True)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()2.1.2解释在简单模式下,生产者和消费者都直接与队列交互。生产者使用basic_publish方法将消息发送到队列,消费者使用basic_consume方法监听队列并处理消息。2.2工作队列工作队列模式是简单模式的扩展,允许多个消费者从队列中接收消息,实现任务的分发和并行处理。2.2.1示例代码importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='task_queue',durable=True)

#发送消息

message='Anewtask'

channel.basic_publish(exchange='',

routing_key='task_queue',

body=message,

properties=pika.BasicProperties(

delivery_mode=2,#makemessagepersistent

))

print("[x]Sent%r"%message)

connection.close()importpika

defcallback(ch,method,properties,body):

print("[x]Received%r"%body)

#模拟任务处理

time.sleep(body.count(b'.'))

print("[x]Done")

ch.basic_ack(delivery_tag=method.delivery_tag)

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='task_queue',durable=True)

#开始接收消息

channel.basic_qos(prefetch_count=1)

channel.basic_consume(queue='task_queue',

on_message_callback=callback)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()2.2.2解释在工作队列模式下,消息被持久化存储,确保即使在消费者断开连接时消息也不会丢失。basic_qos方法用于限制消费者一次只能处理一个消息,确保消息被正确处理后才从队列中移除。2.3发布/订阅发布/订阅模式允许多个消费者订阅同一个交换机,当生产者发送消息到交换机时,所有订阅的消费者都会收到消息。2.3.1示例代码importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明交换机

channel.exchange_declare(exchange='logs',exchange_type='fanout')

#发送消息

message='Info:HelloWorld!'

channel.basic_publish(exchange='logs',

routing_key='',

body=message)

print("[x]Sent%r"%message)

connection.close()importpika

defcallback(ch,method,properties,body):

print("[x]Received%r"%body)

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明交换机

channel.exchange_declare(exchange='logs',exchange_type='fanout')

#声明队列并绑定到交换机

result=channel.queue_declare(queue='',exclusive=True)

queue_name=result.method.queue

channel.queue_bind(exchange='logs',queue=queue_name)

#开始接收消息

channel.basic_consume(queue=queue_name,

on_message_callback=callback,

auto_ack=True)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()2.3.2解释在发布/订阅模式下,生产者发送消息到fanout类型的交换机,所有绑定到该交换机的队列都会收到消息。消费者声明一个队列并绑定到交换机,然后开始监听队列接收消息。2.4路由模式路由模式允许生产者将消息发送到具有特定路由键的队列,消费者可以订阅具有相同路由键的队列。2.4.1示例代码importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明交换机

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

#发送消息

severity='info'

message='Info:HelloWorld!'

channel.basic_publish(exchange='direct_logs',

routing_key=severity,

body=message)

print("[x]Sent%r:%r"%(severity,message))

connection.close()importpika

defcallback(ch,method,properties,body):

print("[x]Received%r"%body)

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明交换机

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

#声明队列并绑定到交换机

queue=channel.queue_declare(queue='info_queue')

channel.queue_bind(exchange='direct_logs',queue='info_queue',routing_key='info')

#开始接收消息

channel.basic_consume(queue='info_queue',

on_message_callback=callback,

auto_ack=True)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()2.4.2解释在路由模式下,生产者发送消息时需要指定一个路由键,消费者在声明队列时也需指定相同的路由键来绑定队列到交换机。这样,只有具有匹配路由键的消息才会被发送到相应的队列。2.5主题模式主题模式是发布/订阅模式的扩展,允许消费者订阅具有通配符的路由键,从而实现更灵活的消息过滤。2.5.1示例代码importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明交换机

channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

#发送消息

routing_key='kern.critical'

message='Acriticalkernelerror'

channel.basic_publish(exchange='topic_logs',

routing_key=routing_key,

body=message)

print("[x]Sent%r:%r"%(routing_key,message))

connection.close()importpika

defcallback(ch,method,properties,body):

print("[x]Received%r"%body)

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明交换机

channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

#声明队列并绑定到交换机

queue=channel.queue_declare(queue='error_queue')

channel.queue_bind(exchange='topic_logs',queue='error_queue',routing_key='*.critical')

#开始接收消息

channel.basic_consume(queue='error_queue',

on_message_callback=callback,

auto_ack=True)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()2.5.2解释在主题模式下,路由键可以包含通配符*和#,*匹配一个单词,#匹配零个或多个单词。消费者可以订阅具有通配符的路由键,从而接收符合特定模式的消息。2.6RPC模式RPC(远程过程调用)模式允许一个生产者发送请求消息到队列,等待一个消费者处理请求并返回结果。2.6.1示例代码importpika

importuuid

classFibonacciRpcClient(object):

def__init__(self):

self.connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

self.channel=self.connection.channel()

result=self.channel.queue_declare(queue='',exclusive=True)

self.callback_queue=result.method.queue

self.channel.basic_consume(queue=self.callback_queue,

on_message_callback=self.on_response,

auto_ack=True)

defon_response(self,ch,method,props,body):

ifself.corr_id==props.correlation_id:

self.response=body

defcall(self,n):

self.response=None

self.corr_id=str(uuid.uuid4())

self.channel.basic_publish(exchange='',

routing_key='rpc_queue',

properties=pika.BasicProperties(

reply_to=self.callback_queue,

correlation_id=self.corr_id,

),

body=str(n))

whileself.responseisNone:

cess_data_events()

returnint(self.response)

fibonacci_rpc=FibonacciRpcClient()

print("[x]Requestingfib(30)")

response=fibonacci_rpc.call(30)

print("[.]Got%r"%response)importpika

deffib(n):

ifn==0:

return0

elifn==1:

return1

else:

returnfib(n-1)+fib(n-2)

defon_request(ch,method,props,body):

n=int(body)

print("[.]fib(%s)"%n)

response=fib(n)

ch.basic_publish(exchange='',

routing_key=props.reply_to,

properties=pika.BasicProperties(correlation_id=\

props.correlation_id),

body=str(response))

ch.basic_ack(delivery_tag=method.delivery_tag)

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='rpc_queue')

#开始接收请求

channel.basic_qos(prefetch_count=1)

channel.basic_consume(queue='rpc_queue',

on_message_callback=on_request)

print("[x]AwaitingRPCrequests")

channel.start_consuming()2.6.2解释在RPC模式下,生产者发送请求消息到队列,并在消息中包含一个回调队列和一个相关ID。消费者处理请求后,将结果发送回生产者的回调队列。生产者通过相关ID来匹配请求和响应,从而获取结果。通过以上介绍,我们了解了RabbitMQ的几种工作模式,包括简单模式、工作队列、发布/订阅、路由、主题和RPC模式。每种模式都有其特定的使用场景,选择合适的工作模式可以更好地满足不同场景下的需求。3消息队列:RabbitMQ:RabbitMQ基本使用3.1安装与配置RabbitMQ3.1.1安装RabbitMQ在开始之前,确保你的系统上已经安装了Erlang,因为RabbitMQ是基于Erlang开发的。以下是在Ubuntu系统上安装RabbitMQ的步骤:#更新系统包

sudoaptupdate

#安装Erlang

sudoaptinstallesl-erlang

#安装RabbitMQ

sudoaptinstallrabbitmq-server安装完成后,启动RabbitMQ服务:sudosystemctlstartrabbitmq-server3.1.2配置RabbitMQRabbitMQ的配置文件位于/etc/rabbitmq/rabbitmq.config。你可以通过编辑这个文件来配置RabbitMQ。例如,要启用所有插件,可以添加以下内容:[

{rabbit,[

{eval,"application:load(rabbitmq_management)."},

{eval,"rabbitmq_management:app_start()."}

]}

]但是,更简单的方法是通过命令行来启用插件:sudorabbitmq-pluginsenablerabbitmq_management这将启用RabbitMQ的管理界面,你可以通过访问http://localhost:15672来查看和管理RabbitMQ。3.2RabbitMQ基本操作3.2.1创建队列在RabbitMQ中,消息被发送到队列。队列是持久的,即使RabbitMQ服务重启,队列中的消息也不会丢失。以下是一个使用Python的pika库创建队列的例子:importpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

channel.queue_declare(queue='hello')3.2.2发送消息消息被发送到队列,然后由消费者从队列中取出。以下是一个发送消息的例子:importpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

channel.queue_declare(queue='hello')

channel.basic_publish(exchange='',

routing_key='hello',

body='HelloWorld!')

print("[x]Sent'HelloWorld!'")

connection.close()3.2.3接收消息消费者从队列中取出消息。以下是一个接收消息的例子:importpika

defcallback(ch,method,properties,body):

print("[x]Received%r"%body)

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

channel.queue_declare(queue='hello')

channel.basic_consume(queue='hello',

on_message_callback=callback,

auto_ack=True)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()3.3RabbitMQ管理界面使用RabbitMQ的管理界面是一个非常强大的工具,可以用来监控和管理RabbitMQ。你可以通过访问http://localhost:15672来打开管理界面,然后使用默认的用户名和密码(都是guest)登录。在管理界面中,你可以查看队列、交换机、连接、频道等信息,也可以创建、删除队列和交换机,还可以发送和接收消息。例如,要创建一个队列,你可以点击Queues,然后点击Declare。在弹出的对话框中,输入队列的名称,然后点击OK。要发送一个消息,你可以点击Exchanges,然后点击Direct。在弹出的对话框中,输入交换机的名称,然后点击OK。然后,你可以点击Publish,在弹出的对话框中,输入消息的路由键和消息体,然后点击Publish。要接收一个消息,你可以点击Queues,然后点击你创建的队列。在队列的页面中,你可以看到队列中的消息数量。然后,你可以点击Get,在弹出的对话框中,输入你想要获取的消息数量,然后点击Get。以上就是RabbitMQ的基本使用,包括安装与配置、基本操作和管理界面的使用。希望这个教程能帮助你更好地理解和使用RabbitMQ。4消息队列:RabbitMQ工作模式详解4.1发布/订阅模式发布/订阅模式(Publish/Subscribe)是消息队列中最常见的模式之一。在这种模式下,生产者发送的消息不会直接发送给消费者,而是发送到一个交换器(Exchange)。交换器将消息广播到所有绑定的队列上,然后这些队列上的消费者就可以接收到消息。4.1.1代码示例importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个fanout类型的交换器

channel.exchange_declare(exchange='logs',exchange_type='fanout')

#生成一个随机的队列名称

result=channel.queue_declare(queue='',exclusive=True)

queue_name=result.method.queue

#将队列绑定到交换器

channel.queue_bind(exchange='logs',queue=queue_name)

#定义一个回调函数来处理接收到的消息

defcallback(ch,method,properties,body):

print("[x]%r"%body)

#开始消费消息

channel.basic_consume(queue=queue_name,on_message_callback=callback,auto_ack=True)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()在这个例子中,我们创建了一个fanout类型的交换器logs,然后创建了一个随机的队列,并将这个队列绑定到logs交换器上。当生产者向logs交换器发送消息时,这个消息会被广播到所有绑定的队列上,然后队列上的消费者就可以接收到消息。4.2工作队列模式工作队列模式(WorkQueues)是最简单的一种模式,它允许你将任务分发到多个工作节点上,这些工作节点会轮流处理队列中的任务。4.2.1代码示例importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个队列

channel.queue_declare(queue='task_queue',durable=True)

#发送一个消息到队列

message="HelloWorld!"

channel.basic_publish(exchange='',

routing_key='task_queue',

body=message,

properties=pika.BasicProperties(

delivery_mode=2,#makemessagepersistent

))

print("[x]Sent%r"%message)

connection.close()在这个例子中,我们创建了一个持久化的队列task_queue,然后向这个队列发送了一个消息HelloWorld!。当消费者从task_queue队列中获取消息时,RabbitMQ会将消息分发到不同的消费者,以实现负载均衡。4.3路由模式路由模式(Routing)允许你根据消息的路由键(RoutingKey)将消息发送到特定的队列上。4.3.1代码示例importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个direct类型的交换器

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

#声明两个队列

channel.queue_declare(queue='error')

channel.queue_declare(queue='info')

#将队列绑定到交换器,指定路由键

channel.queue_bind(exchange='direct_logs',queue='error',routing_key='error')

channel.queue_bind(exchange='direct_logs',queue='info',routing_key='info')

#发送一个消息到交换器,指定路由键

message="Info:HelloWorld!"

channel.basic_publish(exchange='direct_logs',

routing_key='info',

body=message)

print("[x]Sent%r"%message)

connection.close()在这个例子中,我们创建了一个direct类型的交换器direct_logs,然后创建了两个队列error和info,并将这两个队列分别绑定到direct_logs交换器上,指定路由键为error和info。当生产者向direct_logs交换器发送消息时,需要指定路由键,交换器会根据路由键将消息发送到特定的队列上。4.4主题模式主题模式(Topics)允许你根据消息的主题将消息发送到特定的队列上。主题模式是路由模式的扩展,它允许你使用通配符来匹配路由键。4.4.1代码示例importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个topic类型的交换器

channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

#声明两个队列

channel.queue_declare(queue='kern.error')

channel.queue_declare(queue='')

#将队列绑定到交换器,指定路由键

channel.queue_bind(exchange='topic_logs',queue='kern.error',routing_key='kern.*')

channel.queue_bind(exchange='topic_logs',queue='',routing_key='')

#发送一个消息到交换器,指定路由键

message="Info:HelloWorld!"

channel.basic_publish(exchange='topic_logs',

routing_key='',

body=message)

print("[x]Sent%r"%message)

connection.close()在这个例子中,我们创建了一个topic类型的交换器topic_logs,然后创建了两个队列kern.error和,并将这两个队列分别绑定到topic_logs交换器上,指定路由键为kern.*和。当生产者向topic_logs交换器发送消息时,需要指定路由键,交换器会根据路由键将消息发送到特定的队列上。4.5头模式头模式(Headers)允许你根据消息的头部信息将消息发送到特定的队列上。这种模式在实际应用中比较少见,因为它需要在消息中包含额外的头部信息,这会增加消息的大小。4.5.1代码示例importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个headers类型的交换器

channel.exchange_declare(exchange='headers_logs',exchange_type='headers')

#声明两个队列

channel.queue_declare(queue='error')

channel.queue_declare(queue='info')

#将队列绑定到交换器,指定头部信息

channel.queue_bind(exchange='headers_logs',queue='error',arguments={'x-match':'any','type':'error'})

channel.queue_bind(exchange='headers_logs',queue='info',arguments={'x-match':'any','type':'info'})

#发送一个消息到交换器,指定头部信息

message="Info:HelloWorld!"

properties=pika.BasicProperties(headers={'type':'info'})

channel.basic_publish(exchange='headers_logs',

routing_key='',

body=message,

properties=properties)

print("[x]Sent%r"%message)

connection.close()在这个例子中,我们创建了一个headers类型的交换器headers_logs,然后创建了两个队列error和info,并将这两个队列分别绑定到headers_logs交换器上,指定头部信息为type:error和type:info。当生产者向headers_logs交换器发送消息时,需要在消息中包含头部信息,交换器会根据头部信息将消息发送到特定的队列上。4.6RPC模式RPC模式(RemoteProcedureCall)允许你将RabbitMQ用作远程过程调用的中间件。在这种模式下,生产者发送一个请求消息到队列,然后等待队列上的消费者处理请求并返回结果。4.6.1代码示例importpika

importuuid

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个队列

result=channel.queue_declare(queue='',exclusive=True)

callback_queue=result.method.queue

#发送一个请求消息到队列

response=None

corr_id=str(uuid.uuid4())

defon_response(ch,method,props,body):

globalresponse

ifcorr_id==props.correlation_id:

response=body

channel.basic_consume(queue=callback_queue,on_message_callback=on_response,auto_ack=True)

message="HelloWorld!"

channel.basic_publish(exchange='',

routing_key='rpc_queue',

properties=pika.BasicProperties(

reply_to=callback_queue,

correlation_id=corr_id,

),

body=message)

#等待消费者返回结果

whileresponseisNone:

cess_data_events()

print("[.]Got%r"%response)

connection.close()在这个例子中,我们创建了一个队列rpc_queue,然后向这个队列发送了一个请求消息HelloWorld!,并指定了回调队列和相关ID。当消费者从rpc_queue队列中获取请求消息时,会处理请求并返回结果到回调队列上,然后生产者就可以从回调队列中获取结果。4.7Fanout模式详解Fanout模式是一种广播模式,它将消息发送到所有绑定到交换器的队列上。4.7.1代码示例importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个fanout类型的交换器

channel.exchange_declare(exchange='logs',exchange_type='fanout')

#声明两个队列

channel.queue_declare(queue='queue1')

channel.queue_declare(queue='queue2')

#将队列绑定到交换器

channel.queue_bind(exchange='logs',queue='queue1')

channel.queue_bind(exchange='logs',queue='queue2')

#发送一个消息到交换器

message="HelloWorld!"

channel.basic_publish(exchange='logs',routing_key='',body=message)

print("[x]Sent%r"%message)

connection.close()在这个例子中,我们创建了一个fanout类型的交换器logs,然后创建了两个队列queue1和queue2,并将这两个队列分别绑定到logs交换器上。当生产者向logs交换器发送消息时,这个消息会被广播到所有绑定的队列上,然后队列上的消费者就可以接收到消息。4.8Direct模式详解Direct模式允许你根据消息的路由键将消息发送到特定的队列上。4.8.1代码示例importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个direct类型的交换器

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

#声明两个队列

channel.queue_declare(queue='error')

channel.queue_declare(queue='info')

#将队列绑定到交换器,指定路由键

channel.queue_bind(exchange='direct_logs',queue='error',routing_key='error')

channel.queue_bind(exchange='direct_logs',queue='info',routing_key='info')

#发送一个消息到交换器,指定路由键

message="Info:HelloWorld!"

channel.basic_publish(exchange='direct_logs',

routing_key='info',

body=message)

print("[x]Sent%r"%message)

connection.close()在这个例子中,我们创建了一个direct类型的交换器direct_logs,然后创建了两个队列error和info,并将这两个队列分别绑定到direct_logs交换器上,指定路由键为error和info。当生产者向direct_logs交换器发送消息时,需要指定路由键,交换器会根据路由键将消息发送到特定的队列上。4.9Topic模式详解Topic模式允许你根据消息的主题将消息发送到特定的队列上。主题模式是Direct模式的扩展,它允许你使用通配符来匹配路由键。4.9.1代码示例importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个topic类型的交换器

channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

#声明两个队列

channel.queue_declare(queue='kern.error')

channel.queue_declare(queue='')

#将队列绑定到交换器,指定路由键

channel.queue_bind(exchange='topic_logs',queue='kern.error',routing_key='kern.*')

channel.queue_bind(exchange='topic_logs',queue='',routing_key='')

#发送一个消息到交换器,指定路由键

message="Info:HelloWorld!"

channel.basic_publish(exchange='topic_logs',

routing_key='',

body=message)

print("[x]Sent%r"%message)

connection.close()在这个例子中,我们创建了一个topic类型的交换器topic_logs,然后创建了两个队列kern.error和,并将这两个队列分别绑定到topic_logs交换器上,指定路由键为kern.*和。当生产者向topic_logs交换器发送消息时,需要指定路由键,交换器会根据路由键将消息发送到特定的队列上。4.10Headers模式详解Headers模式允许你根据消息的头部信息将消息发送到特定的队列上。这种模式在实际应用中比较少见,因为它需要在消息中包含额外的头部信息,这会增加消息的大小。4.10.1代码示例importpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个headers类型的交换器

channel.exchange_declare(exchange='headers_logs',exchange_type='headers')

#声明两个队列

channel.queue_declare(queue='error')

channel.queue_declare(queue='info')

#将队列绑定到交换器,指定头部信息

channel.queue_bind(exchange='headers_logs',queue='error',arguments={'x-match':'any','type':'error'})

channel.queue_bind(exchange='headers_logs',queue='info',arguments={'x-match':'any','type':'info'})

#发送一个消息到交换器,指定头部信息

message="Info:HelloWorld!"

properties=pika.BasicProperties(headers={'type':'info'})

channel.basic_publish(exchange='headers_logs',

routing_key='',

body=message,

properties=properties)

print("[x]Sent%r"%message)

connection.close()在这个例子中,我们创建了一个headers类型的交换器headers_logs,然后创建了两个队列error和info,并将这两个队列分别绑定到headers_logs交换器上,指定头部信息为type:error和type:info。当生产者向headers_logs交换器发送消息时,需要在消息中包含头部信息,交换器会根据头部信息将消息发送到特定的队列上。5高级特性与最佳实践5.1消息持久化消息持久化是RabbitMQ中一个重要的特性,它确保即使在RabbitMQ服务重启或崩溃后,消息也不会丢失。要实现消息持久化,需要在发送消息时将delivery_mode属性设置为2,这将使消息存储在磁盘上。5.1.1示例代码importpika

#建立连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列,设置持久化

channel.queue_declare(queue='durable_queue',durable=True)

#发送消息,设置消息持久化

message="Hello,persistentworld!"

channel.basic_publish(exchange='',

routing_key='durable_queue',

body=message,

properties=pika.BasicProperties(

delivery_mode=2,#makemessagepersistent

))

print("[x]Sent%r"%message)

connection.close()5.1.2解释在上述代码中,我们首先声明了一个持久化的队列durable_queue,然后在发送消息时,通过properties参数将delivery_mode设置为2,这使得消息在磁盘上持久化,从而在RabbitMQ重启后仍然可以找到这些消息。5.2消息确认机制消息确认机制是RabbitMQ中用于确保消息被正确处理的特性。当消费者接收到消息并完成处理后,它会向RabbitMQ发送一个确认信号。如果RabbitMQ在指定时间内没有收到确认信号,它会将消息重新发送给其他消费者。5.2.1示例代码importpika

defcallback(ch,method,properties,body):

print("[x]Received%r"%body)

#模拟长时间处理

time.sleep(5)

#处理完成后确认消息

ch.basic_ack(delivery_tag=method.delivery_tag)

#建立连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='ack_queue')

#开启消息确认

channel.basic_consume(queue='ack_queue',

on_message_callback=callback,

auto_ack=False)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()5.2.2解释在本例中,我们定义了一个callback函数来处理接收到的消息。处理完成后,我们通过basic_ack函数向RabbitMQ发送确认信号。如果消息处理失败或消费者断开连接,RabbitMQ会将消息重新发送给其他消费者。5.3消息延迟与死信队列消息延迟和死信队列是RabbitMQ中用于处理需要延迟发送或在一定条件下无法处理的消息的特性。通过设置消息的x-delay属性或使用x-dead-letter-exchange和x-dead-letter-routing-key属性,可以将消息发送到特定的死信队列中。5.3.1示例代码importpika

importtime

#建立连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列,设置死信交换机和死信路由键

channel.queue_declare(queue='delay_queue',arguments={'x-dead-letter-exchange':'dlx_exchange','x-dead-letter-routing-key':'dlx_routing_key'})

channel.queu

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论