activeMQ队列与主题订阅实例.doc_第1页
activeMQ队列与主题订阅实例.doc_第2页
activeMQ队列与主题订阅实例.doc_第3页
activeMQ队列与主题订阅实例.doc_第4页
activeMQ队列与主题订阅实例.doc_第5页
已阅读5页,还剩9页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

ActiveMQ队列与主题订阅实例概述Java Message Service(JMS)是SUN 提出的旨在统一各种MOM 系统接口的规范,它包含点对点(Point to Point,PTP)和发布/订阅(Publish/Subscribe,pub/sub)两种消息模型,提供可靠消息传输、事务和消息过滤等机制。JAVA 消息服务(JMS)定义了Java 中访问消息中间件的接口。JMS 只是接口,并没有给予实现,实现JMS 接口的消息中间件称为JMS Provider,例如ActiveMQ。术语JMS Provider:实现JMS 接口的消息中间件;PTP:Point to Point,即点对点的消息模型;Pub/Sub:Publish/Subscribe,即发布/订阅的消息模型;Queue:队列目标;Topic:主题目标;ConnectionFactory:连接工厂,JMS 用它创建连接;Connection:JMS 客户端到JMS Provider 的连接;Destination:消息的目的地;Session:会话,一个发送或接收消息的线程;MessageProducer:由Session 对象创建的用来发送消息的对象;MessageConsumer:由Session 对象创建的用来接收消息的对象;Acknowledge:签收;Transaction:事务。JMS 编程域JMS 支持两种截然不同的消息传送模型:PTP(即点对点模型)和Pub/Sub(即发布/订阅模型),分别称作:PTP Domain 和Pub/Sub Domain。PTP(使用Queue 即队列目标) 消息从一个生产者传送至一个消费者。在此传送模型中,目标是一个队列。消息首先被传送至队列目标,然后根据队列传送策略,从该队列将消息传送至向此队列进行注册的某一个消费者,一次只传送一条消息。可以向队列目标发送消息的生产者的数量没有限制,但每条消息只能发送至、并由一个消费者成功使用。如果没有已经向队列目标注册的消费者,队列将保留它收到的消息,并在某个消费者向该队列进行注册时将消息传送给该消费者。Pub/Sub(使用Topic 即主题目标) 消息从一个生产者传送至任意数量的消费者。在此传送模型中,目标是一个主题。消息首先被传送至主题目标,然后传送至所有已订阅此主题的活动消费者。可以向主题目标发送消息的生产者的数量没有限制,并且每个消息可以发送至任意数量的订阅消费者。主题目标也支持持久订阅的概念。持久订阅表示消费者已向主题目标进行注册,但在消息传送时此消费者可以处于非活动状态。当此消费者再次处于活动状态时,它将接收此信息。如果没有已经向主题目标注册的消费者,主题不保留其接收到的消息,除非有非活动消费者注册了持久订阅。实例1(queue方式)先上工程图Queue包中Producer.javapackage queue;import java.util.Date;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.DeliveryMode;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MapMessage;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageProducer;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Producer /* * param args */public static void main(String args) /*method 1String user = ActiveMQConnection.DEFAULT_USER; String password = ActiveMQConnection.DEFAULT_PASSWORD; String url = ActiveMQConnection.DEFAULT_BROKER_URL; String subject = TOOL.DEFAULT;ConnectionFactory contectionFactory = new ActiveMQConnectionFactory(user,password,url);try Connection connection = contectionFactory.createConnection();connection.start();Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue(subject);MessageProducer producer = session.createProducer(destination);for(int i = 0;i=20;i+)MapMessage message = session.createMapMessage();message.setLong(count, new Date().getTime();Thread.sleep(1000);producer.send(message);System.out.println(-发送消息:+new Date();mit();session.close();connection.close(); catch (JMSException e) e.printStackTrace(); catch (InterruptedException e) e.printStackTrace();*/send();/* * send() */public static void send() try / 创建一个连接工厂 String url = tcp:/localhost:61616; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); / 设置用户名和密码,这个用户名和密码在conf目录下的perties文件中,也可以在activemq.xml中配置 connectionFactory.setUserName(system); connectionFactory.setPassword(manager); / 创建连接 Connection connection = connectionFactory.createConnection(); connection.start(); / 创建Session,参数解释: / 第一个参数是否使用事务:当消息发送者向消息提供者(即消息代理)发送消息时,消息发送者等待消息代理的确认,没有回应则抛出异常,消息发送程序负责处理这个错误。 / 第二个参数消息的确认模式: / AUTO_ACKNOWLEDGE : 指定消息提供者在每次收到消息时自动发送确认。消息只向目标发送一次,但传输过程中可能因为错误而丢失消息。 / CLIENT_ACKNOWLEDGE : 由消息接收者确认收到消息,通过调用消息的acknowledge()方法(会通知消息提供者收到了消息) / DUPS_OK_ACKNOWLEDGE : 指定消息提供者在消息接收者没有确认发送时重新发送消息(这种确认模式不在乎接收者收到重复的消息)。 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); / 创建目标,就创建主题也可以创建队列 Destination destination = session.createQueue(test); / 创建消息生产者 MessageProducer producer = session.createProducer(destination); / 设置持久化,DeliveryMode.PERSISTENT和DeliveryMode.NON_PERSISTENT producer.setDeliveryMode(DeliveryMode.PERSISTENT); / 创建消息 String text = Hello ActiveMQ!; TextMessage message = session.createTextMessage(text); / 发送消息到ActiveMQ producer.send(message); System.out.println(Message is sent!); / 关闭资源 session.close(); connection.close(); catch (Exception e) e.printStackTrace(); Consumer.javapackage queue;import java.util.Date;import javax.jms.Connection;import javax.jms.ConnectionFactory;import javax.jms.Destination;import javax.jms.JMSException;import javax.jms.MapMessage;import javax.jms.Message;import javax.jms.MessageConsumer;import javax.jms.MessageListener;import javax.jms.Session;import javax.jms.TextMessage;import org.apache.activemq.ActiveMQConnection;import org.apache.activemq.ActiveMQConnectionFactory;public class Consumer /* * param args */public static void main(String args) /*String user = ActiveMQConnection.DEFAULT_USER;String password = ActiveMQConnection.DEFAULT_PASSWORD;String url = ActiveMQConnection.DEFAULT_BROKER_URL;String subject = TOOL.DEFAULT;ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);Connection connection;try connection = connectionFactory.createConnection();connection.start();final Session session = connection.createSession(Boolean.TRUE,Session.AUTO_ACKNOWLEDGE);Destination destination = session.createQueue(subject);MessageConsumer message = session.createConsumer(destination);message.setMessageListener(new MessageListener() public void onMessage(Message msg) MapMessage message = (MapMessage) msg;try System.out.println(-收到消息:+ new Date(message.getLong(count);mit(); catch (JMSException e) e.printStackTrace(););Thread.sleep(30000);session.close();connection.close(); catch (JMSException e) e.printStackTrace(); catch (InterruptedException e) e.printStackTrace();*/get();public static void get() try String url = tcp:/localhost:61616; ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url); / 设置用户名和密码,这个用户名和密码在conf目录下的perties文件中,也可以在activemq.xml中配置 connectionFactory.setUserName(system); connectionFactory.setPassword(manager); / 创建连接 Connection connection = connectionFactory.createConnection(); connection.start(); / 创建Session Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); / 创建目标,就创建主题也可以创建队列 Destination destination = session.createQueue(test); / 创建消息消费者 MessageConsumer consumer = session.createConsumer(destination); /* consumer.setMessageListener(new MessageListener()Overridepublic void onMessage(Message message) / TODO Auto-generated method stub );*/ / 接收消息,参数:接收消息的超时时间,为0的话则不超时,receive返回下一个消息,但是超时了或者消费者被关闭,返回null Message message = consumer.receive(1000); if (message instanceof TextMessage) TextMessage textMessage = (TextMessage) message; String text = textMessage.getText(); System.out.println(Received: + text); else System.out.println(Received: + message); consumer.close(); session.close(); connection.close(); catch (Exception e) e.printStackTrace(); Topic实例package ducer;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class TopicPublisher public static void main(String args) throws JMSException ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcp:/localhost:61616);Connection connection = factory.createConnection();connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic(myTopic.messages);/Topic control=session.createTopic(myTopic.control);MessageProducer producer = session.createProducer(topic);producer.setDeliveryMode(DeliveryMode.PERSISTENT);int i=0;while(i10) TextMessage message = session.createTextMessage();message.setText(message_ + i+:+System.currentTimeMillis();producer.send(message);System.out.println(Sent message: + message.getText();try Thread.sleep(1000); catch (InterruptedException e) e.printStackTrace();i+;session.close();connection.stop();connection.close();package topic.consumer;import org.apache.activemq.ActiveMQConnectionFactory;import javax.jms.*;public class TopicSubscriber public static void main(String args) throws JMSException ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(tcp:/localhost:61616);Connection connection = factory.createConnection();connection.setClientID(1);connection.start();Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);Topic topic = session.createTopic(myTopic.messages);MessageConsumer consumer = session.createConsumer(topic);consumer.setMessageListener(new MessageListener() public void onMessage(Message message) TextMessage tm = (TextMessage) message;try System.out.println(Received message: + tm.getText(); catch (JMSException e) e.printStackTrace(););/session.close();/connection.stop();/connection.close();实例三queue与spring结合全部类文件 Spring bean配置文件Beans-core.xml配置内容如果设置成目标为topic,更改bean id=destination的

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论