消息队列:RabbitMQ:消息队列基础概念_第1页
消息队列:RabbitMQ:消息队列基础概念_第2页
消息队列:RabbitMQ:消息队列基础概念_第3页
消息队列:RabbitMQ:消息队列基础概念_第4页
消息队列:RabbitMQ:消息队列基础概念_第5页
已阅读5页,还剩10页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

消息队列:RabbitMQ:消息队列基础概念1消息队列基础1.1消息队列简介1.1.1消息队列的历史消息队列的概念起源于20世纪70年代的IBM,最初是为了解决大型机之间的通信问题。随着计算机技术的发展,消息队列逐渐被应用于分布式系统中,成为解决异步通信、服务解耦、流量削峰等场景的重要工具。在互联网时代,消息队列因其高可用、高并发、高效率的特性,被广泛应用于各种复杂系统架构中,如电子商务、金融交易、大数据处理等。1.1.2消息队列的作用消息队列在现代软件架构中扮演着关键角色,主要作用包括:异步处理:允许发送者和接收者异步通信,提高系统响应速度。服务解耦:通过消息队列,服务之间可以独立开发、部署和扩展,降低系统间的耦合度。流量削峰:在高并发场景下,消息队列可以作为缓冲,避免后端系统因瞬时大量请求而崩溃。数据持久化:消息队列可以保证消息的持久化存储,即使接收者暂时不可用,消息也不会丢失。消息路由:支持消息的灵活路由,可以根据不同的规则将消息发送到不同的接收者。1.2RabbitMQ介绍1.2.1RabbitMQ的历史背景RabbitMQ最初由LShift公司于2007年开发,基于Erlang语言实现。它遵循AMQP(AdvancedMessageQueuingProtocol)标准,是一个开源的消息队列服务。2010年,RabbitMQ被PivotalSoftware收购,随后成为CloudFoundry平台的一部分,进一步推动了其在企业级应用中的普及。RabbitMQ因其稳定性和可扩展性,被全球众多大型企业采用,包括Cisco、eBay、Adobe等。1.2.2RabbitMQ的特点与优势RabbitMQ作为一款成熟的消息队列服务,具有以下显著特点和优势:高可用性:支持集群部署,可以实现消息的自动重路由和故障转移,确保服务的连续性和消息的可靠性。灵活的消息路由:支持多种消息路由模式,包括直接路由(Direct)、主题路由(Topic)、头部分发(Headers)等,满足不同场景的需求。消息持久化:可以将消息存储在磁盘上,即使服务器重启,消息也不会丢失。多语言支持:提供了多种语言的客户端库,包括Java、Python、C#等,方便不同开发团队的集成。易于管理和监控:提供了Web界面和API,可以方便地管理和监控队列、交换机、连接等状态。1.3示例:使用Python与RabbitMQ交互下面是一个使用Python与RabbitMQ交互的简单示例,包括发送和接收消息的基本流程。1.3.1发送消息importpika

#建立与RabbitMQ的连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个队列

channel.queue_declare(queue='hello')

#发送消息

channel.basic_publish(exchange='',

routing_key='hello',

body='HelloRabbitMQ!')

print("[x]Sent'HelloRabbitMQ!'")

connection.close()1.3.2接收消息importpika

defcallback(ch,method,properties,body):

print("[x]Received%r"%body)

#建立与RabbitMQ的连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个队列

channel.queue_declare(queue='hello')

#设置队列的回调函数

channel.basic_consume(queue='hello',

on_message_callback=callback,

auto_ack=True)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()在这个示例中,我们首先使用pika库建立与RabbitMQ服务器的连接。然后,我们声明一个名为hello的队列。在发送消息的示例中,我们使用basic_publish方法将消息发送到队列中。在接收消息的示例中,我们定义了一个回调函数callback,当队列中有消息时,这个函数会被调用,处理接收到的消息。通过basic_consume方法,我们设置队列的回调函数,并开始消费队列中的消息。通过这个示例,我们可以看到RabbitMQ的基本使用流程,包括连接、队列声明、消息发送和接收。RabbitMQ的强大功能和灵活性使其成为处理复杂消息传递场景的理想选择。2RabbitMQ核心概念2.1交换机2.1.1交换机的作用交换机在RabbitMQ中扮演着消息分发的角色,它接收来自生产者的消息,然后根据配置的规则将消息发送到一个或多个队列中。交换机的存在使得RabbitMQ能够实现更复杂的消息路由逻辑,而不仅仅是简单的点对点通信。2.1.2交换机的类型RabbitMQ支持多种类型的交换机,每种类型都有其特定的路由规则:DirectExchange:直接交换机,消息根据绑定的路由键(routingkey)直接发送到队列。FanoutExchange:扇出交换机,将所有接收到的消息广播到所有绑定的队列,类似于发布/订阅模式。TopicExchange:主题交换机,支持模式匹配的路由键,可以实现更灵活的消息路由。HeadersExchange:头信息交换机,不使用路由键,而是通过消息头信息进行路由。示例:使用DirectExchangeimportpika

#连接到RabbitMQ服务器

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个直接类型的交换机

channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

#声明队列并绑定到交换机

channel.queue_declare(queue='error')

channel.queue_bind(exchange='direct_logs',queue='error',routing_key='error')

#发送消息

channel.basic_publish(exchange='direct_logs',routing_key='error',body='Criticalerroroccurred')

#关闭连接

connection.close()在这个例子中,我们创建了一个名为direct_logs的直接类型交换机,并声明了一个名为error的队列,然后将队列绑定到交换机上,使用路由键error。当消息被发送到direct_logs交换机时,只有绑定的error队列会接收到消息。2.2队列2.2.1队列的创建与管理队列是消息的容器,生产者将消息发送到队列,消费者从队列中读取消息。队列可以被创建、绑定到交换机、以及被删除。创建队列importpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='hello')

#发送消息到队列

channel.basic_publish(exchange='',routing_key='hello',body='HelloWorld!')

#关闭连接

connection.close()在这个例子中,我们创建了一个名为hello的队列,并发送了一条消息HelloWorld!到这个队列。2.2.2队列的持久化策略RabbitMQ提供了队列的持久化策略,以确保消息在服务器重启后不会丢失。持久化可以通过声明队列时设置durable=True来实现。示例:持久化队列importpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个持久化队列

channel.queue_declare(queue='hello',durable=True)

#发送持久化消息

channel.basic_publish(exchange='',routing_key='hello',body='HelloWorld!',properties=pika.BasicProperties(delivery_mode=2))

#关闭连接

connection.close()在这个例子中,我们声明了一个持久化的队列hello,并通过设置delivery_mode=2来发送持久化消息。2.3消息路由2.3.1消息的发布与订阅模式发布与订阅模式允许消息被广播到所有订阅的队列,这在需要将消息发送给多个消费者时非常有用。示例:FanoutExchangeimportpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个扇出类型的交换机

channel.exchange_declare(exchange='logs',exchange_type='fanout')

#声明队列并绑定到交换机

channel.queue_declare(queue='log1')

channel.queue_bind(exchange='logs',queue='log1')

channel.queue_declare(queue='log2')

channel.queue_bind(exchange='logs',queue='log2')

#发送消息

channel.basic_publish(exchange='logs',routing_key='',body='Logmessage')

#关闭连接

connection.close()在这个例子中,我们创建了一个名为logs的扇出类型交换机,并声明了两个队列log1和log2,然后将这两个队列都绑定到交换机上。当消息被发送到logs交换机时,所有绑定的队列都会接收到这条消息。2.3.2消息的路由模式路由模式允许消息根据特定的路由键被发送到特定的队列,这在需要根据消息内容进行精确路由时非常有用。示例:使用TopicExchangeimportpika

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明一个主题类型的交换机

channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

#声明队列并绑定到交换机

channel.queue_declare(queue='kern.error')

channel.queue_bind(exchange='topic_logs',queue='kern.error',routing_key='kern.*')

channel.queue_declare(queue='work')

channel.queue_bind(exchange='topic_logs',queue='work',routing_key='*.network')

#发送消息

channel.basic_publish(exchange='topic_logs',routing_key='kern.critical',body='Kernelcriticalerror')

#关闭连接

connection.close()在这个例子中,我们创建了一个名为topic_logs的主题类型交换机,并声明了两个队列kern.error和work,然后将队列绑定到交换机上,使用模式匹配的路由键。当消息被发送到topic_logs交换机时,只有匹配到的队列会接收到消息。例如,路由键kern.critical会匹配到kern.error队列,而work队列则不会接收到这条消息。3RabbitMQ工作模式3.1简单模式3.1.1发送者与接收者在RabbitMQ的简单模式中,发送者直接将消息发送到队列,接收者从队列中获取消息。这种模式是最基础的,适用于一对一的通信场景。代码示例发送者代码示例:importpika

#建立与RabbitMQ的连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='hello')

#发送消息

channel.basic_publish(exchange='',

routing_key='hello',

body='HelloWorld!')

print("[x]Sent'HelloWorld!'")

#关闭连接

connection.close()接收者代码示例:importpika

defcallback(ch,method,properties,body):

print("[x]Received%r"%body)

#建立与RabbitMQ的连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='hello')

#开始消费消息

channel.basic_consume(queue='hello',

on_message_callback=callback,

auto_ack=True)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()3.1.2工作队列模式负载均衡的概念工作队列模式是RabbitMQ中用于实现负载均衡的一种方式。在这种模式下,多个接收者可以同时从队列中获取消息,RabbitMQ会将消息均匀地分发给不同的接收者,确保工作负载的均衡分配。工作队列的实现发送者代码示例:importpika

#建立与RabbitMQ的连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='task_queue',durable=True)

#发送消息

message='Aheavytask'

channel.basic_publish(exchange='',

routing_key='task_queue',

body=message,

properties=pika.BasicProperties(

delivery_mode=2,#makemessagepersistent

))

print("[x]Sent%r"%message)

#关闭连接

connection.close()接收者代码示例:importpika

importtime

defcallback(ch,method,properties,body):

print("[x]Received%r"%body)

time.sleep(body.count(b'.'))

print("[x]Done")

ch.basic_ack(delivery_tag=method.delivery_tag)

#建立与RabbitMQ的连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='task_queue',durable=True)

#开始消费消息

channel.basic_qos(prefetch_count=1)

channel.basic_consume(queue='task_queue',

on_message_callback=callback)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()3.1.3发布确认模式发布确认的原理发布确认模式是RabbitMQ中用于确保消息可靠传输的一种机制。当发送者将消息发送到RabbitMQ时,它会等待RabbitMQ的确认。如果RabbitMQ确认消息已接收,发送者将继续发送下一条消息;如果未收到确认,发送者将重新发送消息。发布确认的代码实现发送者代码示例:importpika

#建立与RabbitMQ的连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#开启发布确认

channel.confirm_delivery()

#发送消息

message='HelloWorld!'

channel.basic_publish(exchange='',

routing_key='confirm_queue',

body=message)

print("[x]Sent%r"%message)

#检查确认状态

whileTrue:

ifchannel._delivery_confirmation:

print("[x]Confirmreceived")

break

else:

print("[x]Waitingforconfirm")

time.sleep(1)

#关闭连接

connection.close()接收者代码示例:importpika

defcallback(ch,method,properties,body):

print("[x]Received%r"%body)

ch.basic_ack(delivery_tag=method.delivery_tag)

#建立与RabbitMQ的连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='confirm_queue')

#开始消费消息

channel.basic_consume(queue='confirm_queue',

on_message_callback=callback)

print('[*]Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()以上代码示例展示了如何在RabbitMQ中实现简单模式、工作队列模式和发布确认模式。通过这些示例,你可以理解RabbitMQ的基本工作原理和如何在实际应用中使用这些模式。4RabbitMQ高级特性4.1事务与确认4.1.1事务的使用在RabbitMQ中,事务提供了一种确保消息处理可靠性的机制。通过开启事务,可以确保消息在被确认前的任何操作(如发布、取消发布等)要么全部成功,要么全部失败,从而保证数据的一致性。示例代码importpika

#建立连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#开启事务

channel.confirm_delivery()

#发布消息

try:

channel.basic_publish(exchange='',routing_key='queue_name',body='HelloWorld!')

#提交事务

channel.tx_commit()

exceptExceptionase:

#回滚事务

channel.tx_rollback()

print(f"Erroroccurred:{e}")

#关闭连接

connection.close()解释在上述代码中,我们首先通过pika库建立了一个到RabbitMQ服务器的连接。然后,我们调用confirm_delivery方法开启事务模式,这意味着所有在事务模式下的操作都会被跟踪,直到事务被提交或回滚。在事务模式下,我们尝试发布一条消息到队列queue_name。如果消息发布成功,我们调用tx_commit方法提交事务;如果在发布过程中发生任何异常,我们调用tx_rollback方法回滚事务,确保不会有任何消息被错误地处理。4.1.2确认机制的深入理解确认机制是RabbitMQ中用于确保消息可靠传递的关键特性。当消息被发布到队列后,RabbitMQ会等待消费者确认消息的接收和处理。如果消费者在接收消息后没有发送确认,RabbitMQ会将消息重新发布,以防止数据丢失。示例代码importpika

defcallback(ch,method,properties,body):

print("Receivedmessage:",body.decode())

#模拟消息处理

#...

#确认消息

ch.basic_ack(delivery_tag=method.delivery_tag)

#建立连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明队列

channel.queue_declare(queue='queue_name')

#设置确认模式

channel.basic_qos(prefetch_count=1)

#开始消费

channel.basic_consume(queue='queue_name',on_message_callback=callback)

print('Waitingformessages.ToexitpressCTRL+C')

channel.start_consuming()解释在本例中,我们定义了一个callback函数,该函数在接收到消息时被调用。在callback函数中,我们首先打印接收到的消息,然后模拟消息的处理过程。处理完成后,我们通过调用basic_ack方法确认消息的接收和处理。basic_qos方法被用来设置确认模式,其中prefetch_count=1表示RabbitMQ在等待消费者确认前,只会发送一条消息给消费者,这有助于防止消费者在处理消息时因异常而丢失消息。4.2死信队列4.2.1死信队列的原理死信队列(DeadLetterQueue,DLQ)是RabbitMQ中用于处理无法被正常消费的消息的队列。当消息在原队列中达到最大重试次数、过期、或被显式标记为无法处理时,这些消息会被转移到DLQ中,以便进行进一步的处理或分析。4.2.2死信队列的配置要配置DLQ,需要在原队列的声明中设置arguments参数,其中包含x-dead-letter-exchange和x-dead-letter-routing-key,分别指定死信交换机和死信队列的路由键。示例代码importpika

#建立连接

connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))

channel=connection.channel()

#声明死信交换机

channel.exchange_declare(exchange='dlx_exchange',exchange_type='direct')

#声明死信队列

channel.queue_declare(queue='dlq_queue')

#将死信队列绑定到死信交换机

channel.queue_bind(exchange='dlx_exchange',queue='dlq_queue',routing_key='dlq_routing_key')

#声明原队列,并设置死信队列参数

channel.queue_declare(queue='original_queue',arguments={

'x-dead-letter-exchange':'dlx_exchange',

'x-dead-letter-routing-key':'dlq_routing_key'

})

#发布消息

channel.basic_publish(exchange='',routing_key='original_queue',body='HelloWorld!')

#关闭连接

conne

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论