2023在Spring生态中玩转RockMQ_第1页
2023在Spring生态中玩转RockMQ_第2页
2023在Spring生态中玩转RockMQ_第3页
2023在Spring生态中玩转RockMQ_第4页
2023在Spring生态中玩转RockMQ_第5页
已阅读5页,还剩81页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

在spring生态中玩转TOC\o"1-1"\h\z\u开篇:在Spring生态中玩转 RocketMQSpring最初的故事:罗美琪和春波特的故事 RocketMQ-Spring毕业两周年,为什么能成为Spring生态中最受欢迎的messaging实 方法一:使用rocketmq-spring-boot-starter来配置、发送和消费RocketMQ消 方法二:SpringCloudStream体系及原理介绍:spring-cloud-stream-binder- 方法三:SpringCloudBus消息总线介 开篇:在Spring开篇:在Spring生态中玩转 开篇:在Spring开篇:在Spring生态中玩转ApacheRocketMQ11RocketMQSpringMessaing在Spring生态中使用RocketMQ到底有多少种方式?他们各自适用于什么场景?各spring6ApacheRocketMQrocketmq-spring毕业后的两年,是如何成为Spring生态中最受欢迎的messaging实现的?最后将通过图文和实操地方式带来给位开发者玩转在Spring生态中使用RocketMQ一、RocketMQ与Spring架,SpringMessagingSpringCloudStreamSpringBoot二、SpringSpringMessagingSpringFramework4SpringJmsTemplate的简单的使用JMS步接收消息的一整套完整的基础架构,SpringAMQPSpringBoot准,对消息发送端和消息接收端的模式进行规定,比如消息Messaging对应的模型就包括一个消息体Payload和消息头HeaderBeanSpringBoot动的POJO)SpringBootApacheRocketMQ生态中,RocketMQ-Spring-Boot-Starter(下文简称RocketMQ-Spring)就是一个支持SpringMessagingAPI标准的项目。该项目把RocketMQ的客户端使用SpringBoot的方式进行了封装,可以让用户通过简单的annotationSpringMessagingAPI持扩展出RocketMQAPIRocketMQ-Spring期,RocketMQSpringRocketMQ-Spring引出一段罗美琪(RocketMQ)和春波特(SpringBoot)故事的佳话[1Spring布道师JoshLong向国外同学介绍如何使用RocketMQSpring收发消息[2]。RocketMQ-SpringSpring-KafkaSpring-AMQP(注:两者均由Spring)SpringMessaging三、SpringCloudSpringCloudStreamSpringIntegrationSpringCloudStream@Input)和输编程,而不需要关心运行时具体的Binder在运行时,SpringCloudStream能够自动探测并使用在classpath下找到的构建时包含进不同的Binder。在更加复杂的使用场景中,也可以在应用中打包多个BinderBinder,甚至在运行时为不同的通道使用不同的Binder。Binder抽象使得SpringCloudStream应用可以灵活的连接到中间件,加之SpringCloudStreamSpringBootSpringBoot(包括应用启动参数、环境destination(例如,RocketMQtopicRabbitMQexchange)。SpringCloudStream屏蔽了底层消息中间件的实现细节,希望以统一的一套APIBinderSpringRabbitbinderKafkaBinder。SpringCloudAlibabaRocketMQBinder[3],其主要实现原理是把发送消息最终代理给了RocketMQ-Spring的RocketMQTemplate,在消费端则内部会启动RocketMQ-SpringConsumerContainer来接收消息。以此为基础,SpringCloudAlibabaSpringCloudBusRocketMQRocketMQSpringCloud体系内的消息总线,来连接分布式系统的所有节点。通过SpringCloudStreamRocketMQBinder,RocketMQSpringCloudSpringCloudDataFlow、SpringCloudFuntionRocketMQSpringServerless(SpringCloudStreamRocketMQBinderSpringCloudBusRocketMQSpringCloud最活跃的实现。四、如何在Spring生态中选择RocketMQSpringRocketMQSpringRocketMQ-Spring、SpringCloudStreamRocketMQBinder、SpringCloudBusRocketMQ、SpringDataFlowSpringCloudFunction。它们RocketMQ-生态用到RocketMQ并且支持SpringMessagingAPIRocketMQJavaSDK适合在SpringBoot中使用RocketMQ望能用到RocketMQjavaSpring和自动配置简化SpringCloudStreamRocketMQ屏蔽底层MQ实现细节,上层SpringCloudStream的API是统一的。如果想从Kafka切到RocketMQ,直接改个配置即可。SpringCloudSpringCloudDataFlow,这上面的流计算都是SpringCloudStream;SpringCloudBus消息总线内部也是用的SpringCloudStre在代码层面能完全屏蔽底层消息且希望能项目能SpringCloud生态(SpringCloudDataFlow、SpringCloudFuntcion3.SpringCloudStreamSpringCloudBusRocketMQ将RocketMQ作为事件的“传输器”,通过发Spring希望用RocketMQ做消息总线的用置中心客户端刷DataFlow以Source/Processor/SinkRocketMQ作为流处理过程中的中间存储SpringJavaFunctionServerlessRocketMQ作为业务消息的首选,在消息和流处理领域被广泛应用。而微服务生态Spring框架也是业务开发中最被,两者的完美契合使得RocketMQSpringMessaing实现中最受欢迎的消息实现。书的后半部分讲给各位开发者详细讲述在SpringRocketMQ RocketMQSpring最初的故事:罗美琪和春波特的故事RocketMQSpringRocketMQSpring最初的故事:罗美琪和春波特的故事 RocketMQSpring最初的故事:罗美琪rocketmq-spring6ApacheRocketMQBoot的方式进行了封装,可以让用户通过简单的annotationSpringMessagingAPISpring社区的原创人员对我们的代码进行了ReviewslackSpringSpringBootReviewSpringBootRocketMQSpring(SpringBoot)的故事。故事的开始是这样的,罗美琪美眉有一套RocketMQ的客户端代码,负责发送消息SpringBootSpringRocketMQSpringSpringBean,并且相关属性要能够根据配置文件的配置自动设置,命名它为:RocketMQTemplate,同时让它封装发@Resourceprivate@ResourceprivateRocketMQTemplateSendResultsendResult=rocketMQTemplate.syncSend(xxxTopic,"Hello,@Service@RocketMQMessageListener(topic=@Service@RocketMQMessageListener(topic="xxx",consumerGroup="xxx_consumer")publicclassStringConsumerimplementsRocketMQListener<String>{@OverridepublicvoidonMessage(Stringmessage){@RocketMQMessageListener定义消息消费的配置参数(如:消费的topic,化,详见ListenerContainerConfiguration类及其实现SmartInitializingSingletonAutoConfiguration@ConfigurationRocketMQ需要的SpringBean,如上面所提到的RocketMQTemplate和能够处理消费回调Listener的容器,每个Listener对应一个容器SpringBean来启动MQPushConsumer,ListenerRocketMQAutoConfiguration.java(编者注:这个是最终发布的类,没有review)上面定义的Configuration类,它本身并不会“自动”配置,需要由META-INF/层用户不需要关心自动配置类的细节和开关,只要classpathMETA-INFConfigurationConfiguration@EnableConfiguraitonPropertiesConfigurationProperties,可参考RocketMQProperties.java上层用户可以根据这个类里定义的属性来配置相关的属性文件(即META-INF/perties或META-INF/application.yaml)RocketMQSpringBootstarterReview,接/spring-projects/spring-boot/wiki/Building-On-Spring-https://docs.spring.io/spring-boot/docs/current/reference/html/boot--developing-auto-然后解释道:"在SpringBoot中包含两个概念:auto-configuration和POMsauto-configurationSpringBean。它放在用户的CLASSPATH中结合在CLASSPATH中的其它依赖就可以提供相关的功能;mavenprojectPOM文件,不需要包含任classesresources.|---rocketmq-spring-boot- |---rocketmq-spring- auto-configuraiton|---rocketmq-spring- starterpom.xml|---rocketmq-spring- starter"很好,这样的模块结构就清晰多了",春波特小哥哥点头,"但是这个AutoConfiguration文件里的一些标签的用法并不正确,帮你注释一下,另外,考虑到Spring8SpringBoot1.X将不再提供支持,所以建议实现直接支持SpringBoot2.X"@Order~~春波特:OrderruntimeBeanpublicclassRocketMQAutoConfiguration@ConditionalOnClass(DefaultMQProducer.class)~~春波特:要用(nameclasspathroup~~春波特:nameServername-server@Order(1)~~春波特:删掉 publicDefaultMQProduceriesrocketMQProperties)@ConditionalOnMissingBean(name="rocketMQMessageObjectMapper")~~春波特:议与具体的实例名绑定,设计的意图是使用系统中已经存在的ObjectMapper,如果没有,则在这里实publicObjectMapperrocketMQMessageObjectMapper()returnnew@Bean(destroyMethod=@ConditionalOnMissingBean(name="rocketMQTemplate")~~春波特:与上面一样@Order(2~~春波特:publicRocketMQTemplaterocketMQTemplate(DefaultMQProducer@Autowired(required= ~~春波特:@Qualifier("rocketMQMessageObjectMapper~~春波特:ObjectMapperobjectMapper)RocketMQTemplaterocketMQTemplate=newif(Objects.nonNull(objectMapper))@Bean(name=@Role(BeanDefinition.ROLE_INFRASTRUCTURE)~~春波特:这个orBeanPostProcessor,(TransactionBeanpublicRocketMQTransactionAnnotationProcessorreturnnew@Configuration~~春波特Configuration@Import在主Configuration@ConditionalOnProperty(prefixspring.rocketmqvaluenameServer~~春波特ame-InitializingBean@Resource~~春波特:annotation,fieldinjectionprivateStandardEnvironment@Autowired(requiredfalse)~~春波特@Qualifier("rocketMQMessageObjectMapper")ObjectMapperobjectMapper){春波特:@Qualifier性名的松散规则(relaxedrules)。Iftheydon'twebasicallyregisterthepost-processoratthesame"time"asalltheotherbeansinthatclassandthecontractofBPPisthatitmustberegisteredveryearlyon.ThismaynotmakeadifferenceforthisparticularclassbutflaggingitasstaticasthesideeffecttomakeclearyourBPPimplementationisnotsupposedtodragotherbeansviadependencypublicpublicclassListenerContainerConfigurationimplementsApplicationContextAware,SmartprivateObjectMapperobjectMapper=newObjectMapper();~~春波特:性能上考虑,不要RocketMQListenerrocketMQListener=(RocketMQListener)bean;nerannotation=validate(annotation);~~春波特:BeanSpring4.x以考虑使用Spring5.0GenericApplicationContext.registerBean,通过supplierew来构造Bean实例[beanBuilder.addPropertyValue(PROP_NAMESERVER,StringcontainerBeanName=String.format("%s_%s",lass.getName(),DefaultRocketMQListenerContainercontainer=eDefaultRocketMQListenerContainer.class);~~春波特:你这里的启动方法是通过if(!container.isStarted())try}catch(Exceptione)log.error("startedlog.error("startedcontainerfailed.{}",container,thrownew注[3]:使用GenericApplicationContext.registerBeanpublicfinal<T>voidClass<T>beanClass,Supplier<T>supplier,BeanDefinitionCustomizer…SpringBootSpringAssertJavaassertSpringBootAssert:importimportprivateApplicationContextRunnerrunner=new@Test(expected=NoSuchBeanDefinitionException.class)publicrun((context)->{processor注解处理器,这样它能够生成辅助元数据文件,加快启动时间。详情见-autoconfigure/**…*)和单行//...)员变量,方法或者代码逻辑应该提供多行注释;有些简单的代码逻辑注释也可以使用单;在变量和方法命名时尽量用词准确,并且尽量不要使用缩写,如:sendMsgTimeout,sendMessageTimeout;包名supportssupport。Lombokconstructor,IDELombokSpringLombokIDE如果一个包目录下没有任何class,建议要去掉这个包目录。例如org.apache.rocketmq.spring.starter在spring目录下没有具体的class应该去掉这层目录(编者注:我们最终把package改为org.apache.rocketmq.spring,Enumorg.apache.rocketmq.spring.enums下,这个包命名Enumenumspackageprivate:TransactionHandlerstatic+final一个类的static方法不要结合final,除非这个这个类本身是final并且声明private使用@Autowared,@Resource4]注[4]:下面的截图是有FieldInjection罗美琪根据上述的要求调整了代码,使代码质量有了很大的提高,并且总结了SpringBootautoconfigurationBean@Conditionalsetter方法来设置变量,避免使用FieldInjection方式;多个ConfigurationBean可以使用@ImportSpring20提供的AutoConfigruation注意一些细节:staticBeanPostProcessor;Lifecycle约束条件,信心满满地提交了最终的代码,又可以邀请RocketMQ社区的小伙伴们一起RocketMQspringSpringrocketmq-spring-starterSpringInitializr,让用户可以直接在/bootstrap.html网站上RocketMQ-Spring毕业两周年,为什么能成为RocketMQ-Spring毕业两周年,为什么能成为Spring生态中最受欢迎的messaging实 RocketMQ-SpringRocketMQ-SpringSpring生态中最受欢迎的messagingRocketMQ-Spring毕业两周年,为什么能成为Spring生态中最受欢迎的messaging实现2019年1月,孵化6个月的RocketMQ-Spring作为ApacheRocketMQ的子项目正式毕业,发布了第一个Release版本2.0.1。该项目是把RocketMQ的客户端使用SpringBoot的方式进行了封装,可以让用户通过简单的annotation和标准的SpringMessagingAPI编写代码来进行消息的发送和消费。当时RocketMQ社区同学请Spring社区的同学对RocketMQSpring代码进行review,引出一段罗美琪(RocketMQ)和春波特(SpringBoot)的故事。时隔两年,RocketMQ-Spring正式发布2.2.0。在这期间,RocketMQ-Spring迭代了数个版本,以RocketMQ-Spring为基础实现的SpringCloudStreamRocketMQBinder、SpringCloudBusRocketMQ登上了Spring的官网,Spring布道师baeldung向国外同学介绍如何使用RocketMQ-Spring,越来越多国内外的同学开始使用RocketMQ-Spring收发消息,RocketMQ-Spring仓库的star数也在短短两年时间内超越了Spring-Kafka和Spring-AMQP(注:两者均由Spring社区维护),成为ApacheRocketMQ最受欢迎的生态项目之一。RocketMQ-Spring的受欢迎一方面得益于支持丰富业务场景的RocketMQ与微服务生态Spring的完美契合,另一方面也与RocketMQ-Spring本身严格遵循SpringMessagingAPISpringMessagingAPISpringMessagingAPI对消息发送端和消息接收端的模式进行规定,不同的消息中间件提供商可以在这个模式下提供自己的Spring实现:在消息发送端需要实现的是一个XXXTemplate形式的JavaBean,结合SpringBoot的自动接口(实现方式通常会使用一个注解来声明一个消息驱动的POJO),提供回调方法来SpringBoot的自动化选项和一些定制化的属RocketMQ-SpringSpringMessagingAPIRocketMQ自身的功能特点提供了相应的API。在消息的发送端,RocketMQ-Spring通过实现RocketMQTemplate完成消息的发送。如下图所示,RocketMQTemplate继承AbstractMessageSendingTemplate抽象类,来支持SpringMessagingAPI标准的消doSenddoSendsyncSend,由DefaultMQProducer实现。除SpringMessagingAPI规范中的方法,RocketMQTemplate还实现了RocketMQ于原生客户端需要自己去构建RocketMQMessage(比如将对象序列化成byte数组放入Message对象),RocketMQTemplate可以直接将对象、字符串或者byte数组作为参数发送出去(对象序列化操作由RocketMQ-Spring内置完成),在消费端约定好对应的Schema即可正常收发。RocketMQTemplateRocketMQTemplateSendvoidasyncSend(Stringdestination,Message<?>message,SendCallbacksendCallback)voidasyncSend(Stringdestination,Message<?>message,SendCallbacksendCallback)在消费端,需要实现一个包含@RocketMQMessageListener注解的类(需要实现RocketMQListener接口,并实现onMessage方法,在注解中进行topic、consumerGroup等属性配置),这个Listener会一对一的被放置到DefaultRocketMQListenerContainer容器对象中,容器对象会根据消费的方式(并发或顺序),将RocketMQListener封装到具体的RocketMQ内部的并发或者顺序接口实现。在容器中创建RocketMQDefaultPushConsumer对象,启动并监听定制的Topic消息,完成约定Schema对象的转换,回调到Listener的onMessage方法。@RocketMQMessageListener(topic="${demo.rocketmq.topic}",consumerGroup=merimplementsRocketMQListener<String>publicvoidonMessage(Stringmessage)System.out.printf("-------StringConsumerreceived:%s\n",除此Push接口之外,在最新的2.2.0版本中,RocketMQ-Spring实现了RocketMQLitePullConsumer。通过在配置文件中进行consumer的配置,利用RocketMQTemplate的Recevie方法即可主动Pull消息。配置文件配置文件resource/perties:PullConsumerwhile(!isStop)RocketMQSpring消息类型支持方面与RocketMQ原生客户端完全对齐,包括同步/异步/one-way、顺序、延迟、批量、事务以及Request-Reply消息。在这里,主要介绍较为特殊的事务消息和request-reply消息。RocketMQ的事务消息不同于SpringMessaging中的事务消息,依然采用RocketMQ原生事务消息的方案。如下所示,发送事务消息时需要实现一个包含@RocketMQTransactionListener注解的类,并实现executeLocalTransaction和checkLocalTransaction方法,从而来完成执行本地事务以及检查本地事务执行结果。////BuildaSpringMessageforsendingintransactionMessagemsg=//mustbesamewiththe@RocketMQTransactionListener'smemberfield'transName'//DefinetransactionlistenerwiththeannotationpublicRocketMQLocalTransactionStateexecuteLocalTransaction(Messagemsg,ObjectreturnRocketMQLocalTransactionState.UNKNOWN;publicRocketMQLocalTransactionStatecheckLocalTransaction(Messagemsg)//...checktransactionstatusandreturnbollback,commitorunknownreturnRocketMQLocalTransactionState.COMMIT;2.1.0RocketMQ-Spring中每一个group对应一个TransactionProducer,而在新版本中改为每一个RocketMQTemplate对应一个TransationProducer,从而解决了并发使用多个事务消息的问题。当用户需要在单进程使用多个事务消息时,可以使用ExtRocketMQTemplate来完成(一般情况下,推荐一个进程使用一个RocketMQTemplate,ExtRocketMQTemplate可以使用在同进程中需要使用多个Producer/LitePullConsumer的场景,可以为ExtRocketMQTemplate指定与标准模版RocketMQTemplate不同的nameserver、group等配置),并在对应的RocketMQTransactionListener注解中指定rocketMQTemplateBeanName为ExtRocketMQTemplate的BeanName。Request-Reply在2.1.0版本中,RocketMQ-Spring开始支持Request-Reply消息。Request-Reply消息指的是上游服务投递消息后进入等待被通知的状态,直到消费端返回结果并返回给发送端。在RocketMQ-Spring中,发送端通过RocketMQTemplate的sendAndReceivce方法进行发送,如下所示,主要有同步和异步两种方式。异步方式中通过实现RocketMQLocalRequestCallback进行回调。//异步发送request并且等待UserrocketMQTemplate.sendAndReceive("objectRequestTopic",newUser("requestUserName",(byte)9),newRocketMQLocalRequestCallback<User>(){@OverridepublicvoidonSuccess(Usermessage){@OverridepublicvoidonException(Throwablee)在消费端,仍然需要实现一个包含@RocketMQMessageListener注解的类,但需RocketMQReplyListener<T,R(<T>接口),其中T表示接收值的类型,R表示返回值的类型,接口需要实现带返回onMessageProducer。@RocketMQMessageListener(topic="stringRequestTopic",consumerGroup="stringRequepublicclassStringConsumerWithReplyStringimplementsRocketMQReplyListener<String,String>{publicStringonMessage(Stringmessage)return"replyRocketMQ-Spring遵循Spring约定大于配置(Conventionoverconfiguration)的理念,通过启动器(SpringBootStarter)的方式,在pom文件引入依赖(SpringBoot中集成所有RocketMQ客户端的所有功能,通过简单的注解使用即可完RocketMQ-SpringGithubWiki中有更加详细的用法和常见问题解据统计,从RocketMQ-Spring发布第一个正式版本以来,RocketMQ-Spring完成16个bug修复,37个imporvement,其中包括事务消息重构,消息过滤、消息序列化、多实例RocketMQTemplate优化等重要优化,欢迎更多的小伙伴能参与到RocketMQ社区的建设中来,罗美琪(RocketMQ)和春波特(SpringBoot)的故事还 方法一:使用rocketmq-spring-boot-starter来配置、发送和消费RocketMQRocketMQ作者简介:辽天,阿里巴巴技术专家,ApacheRocketMQ内核控,拥有多年分布注RocketMQ内核优化以及Messaging生态建设。JavaBeansXMLJava对象(PlainOldJavaObjects)的Spring技术应运而生,依赖注入(DependencyInjection),控制反转(随着pringAnnotaton)ML2014年4月1日,SpringBoot1.0.0正式发布,它基于“约定大于配置”(Conventionoveronfiguration)pring应用,并能通过简单地与各种启动器如spring-boot-web-starter)结合,让应用直接以命ApacheRocketMQBrokerPoducer)BoerConume)Boer为了利用SpringBootRocketMQ端,ApacheRocketMQspring-boot-starterRocketMQ顺便在这里讨论一下在Spring中关于消息的两个主要的框架,即Springesagng和pringCoudtreamprngBootSpringSpringMessagingSpringFramework4SpringJmsTemplate的简单的使用JMS步接收消息的一整套完整的基础架构,SpringAMQPSpringBoot单纯对于客户端而言,SpringMessagingAPISpringXXXTemplateJavaBeanSpringBootXXXMessageListener(实现方式通常会使用一个注解来声明一个POJO)SpringBootSpringMessaging及针对不同的消息产品的使用,推荐阅SpringMessagingRocketMQspring-boot-starterRocketMQAPISpringCloudSpringCloudStreamSpringIntegration该图片引自springcloudSpringCloudStream@Input)和输编程,而不需要关心运行时具体的Binder绑定的消息中间件。在运行时,SpringCloudStreamclasspathBinder。Binder。在更加复杂的使用场景中,也可以在应用中打包多个BinderBinderBinder。BinderSpringCloudStream应用可以灵活的连接到中间件,加之SpringCloudStreamSpringBootSpringBoo(包括应用启动参数、环境变量和application.yml或者perties文件),部署人员可以在运行时动destination(例如,KafkatopicRabbitMQexchange)。SpringCloudSteamRocketMQBinder,我们计PR或三、spring-boot-starterSpringspring-boot-starter封装给开发者,让开发者非常方便集成和使用,这里我们详细的介绍一下RocketMQ(客户端)的starterspring-boot-starterpom.xml<artifactId>spring-boot-starter-它分为两个部分:A、SpringB、RocketMQRocketMQPropertiesBeanspringboot更具文中中所指定的自动化配置类来自动初始化相关的BeanComponentService,它的内容如下:在RocketMQAutoConfiguration类的具体实现中,定义开放给用户直接使用的Bean.包括:RocketMQPropertiesRocketMQTemplateListenerContainerConfigurationBean类,这个类要求:由@RocketMQMessageListener注解标注;实现RocketMQListener最后具体的RocketMQSpringMessagingRocketMQTemplatePOJOSpringMessaging的发送模板兼容,在RocketMQTemplateAbstractMessageSendingTemplate抽象类,来支持相关的消息转换和发送方法,这些方法最终会代理给doSend()方法doSend()RocoketMQ向和顺序等方法直接添加到RoketMQTempalte中,这些方法直接代理调用到RocketMQProducerAPIRocketMQTemplate里加入了一个发送事务消息的方法sendMessageInTransactioProducerTransactionListener实现类,以便在发送消息后能够对TransactionListener在消费端Spring-Boot应用启动后,会扫描所有包含@RocketMQMessageListenerRocketMQListener接口,并实现onMessage()方法),这个ListenerDefaultRocketMQListenerContainer或顺序)RocketMQListenerRocketMQRocketMQConsumerTopicListeneronMessage()方法。RocketMQ服务端的准备NameServerRocketMQSpring-BootRocketMQ并启动。可以参考RocketMQ主站的快速开始来进行操作。确保启动NameServer和Brokerbashbashbin/mqadminupdateTopic-cDefaultCluster-tstring-gitmvncleaninstallmvncleanmavenpom.xmlJavaJava如果需要了解更多的调用方式,如:异步发送,对象消息体,指定tag标签以及指定事 方法二:SpringCloudStream体系及PhotobyMedBadrChemmaouionSpringCloudStream在SpringCloud体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在SpringCloud应用程序中的开发。SpringCloudStream(后面以SCS代替SpringCloudStream)本身内容很多,而且它还有很多外部的依赖,想要熟悉SCS,必须要先了解SpringMessagingSpringIntegration这两个项目,接下来,文章将从围绕以下三点进行展开:SpringSpring什么是SCS一、SpringSpringMessaging是SpringFramework中的一个模块,其作用就是统一消息的 对应的模型就包括一个消息体Payload和消息头packagepackageorg.springframework.messaging;publicinterfaceMessage<T>{TgetPayload();MessageHeadersgetHeaders();消息通 用于接收消息,调用send方法可以将消息发送至publicinterfaceMessageChannel{longINDEFINITE_TIMEOUT=-defaultbooleansend(Message<?>message){returnsend(message,INDEFINITE_TIMEOUT);booleansend(Message<?>message,longSubscribableChannel由消息通道的子接口可订阅的消息通 实现,SubscribableChannel消息处理器所订阅publicpublicinterfaceSubscribableChannelextendsMessageChannel{booleansubscribe(MessageHandlerhandler);booleanunsubscribe(MessageHandler 真正地消费/处理消息voidhandleMessage(Message<?>message)throwsSpringMessaging消息接收参数及返回值处理:消息接收参数处理器@Payload@Headeresolver配 等注解使用;消息接收后的返回值处理@Payload@Headeresolver 二、SpringSpringIntegrationSpring编程模型的扩展用来支持企业集成模式(EnterpriseIntegrationPatternsSpringMessagingMessaMessageRout、消息过滤Filter、消息转

PublishSubscribeChannelExecutorChannelDirectChannelPublishSubscribeChannelExecutorChannelDirectChannel SubscribableChannelmessageChannel=newDirectChannel();//1messageChannel.subscribe(msg->{//2System.out.println("receive:SubscribableChannelmessageChannel=newDirectChannel();//1messageChannel.subscribe(msg->{//2System.out.println("receive:"+msg.getPayload()); 发送一条消息到这个消息通道,消息最终被消息通道里 最后控制台打印出receivemsgfrom 的消息通 单播的分发器,只能选择一个消息通道。那么如何选择呢?内部提供了 SubscribableChannelmessageChannel=newDirectChannel();messageChannel.subscribe(msg->{SubscribableChannelmessageChannel=newDirectChannel();messageChannel.subscribe(msg->{messageChannel.subscribe(msg->{System.out.println("receive2:"+msg.getPayload());messageChannel.send(MessageBuilder.withPayload("msgfrom receive1:receive1:msgfromreceive2:msgfrom SubscribableChannelmessageChannel=newPublishSubscribeChannel();messageChannel.subscribe(msg->{SubscribableChannelmessageChannel=newPublishSubscribeChannel();messageChannel.subscribe(msg->{System.out.println("receive1:"+messageChannel.subscribe(msg->{System.out.println("receive2:"+msg.getPayload());messageChannel.send(MessageBuilder.withPayload("msgfrom receive1:receive1:msgfromalibabareceive2:msgfromalibabareceive1:msgfromreceive2:msgfrom三、SpringCloudSCSSCS在SpringIntegration的基础上进行了封装,提出了Binder,等概念/bindings,SCS与SpringBootActuator整合,提供/bindings,SCS与SpringBootExternalizedConfiguration整合提供 等外部化配置类SCSSCSSpringIntegrationSpringBootSpringCloudBus的基础。它屏蔽了底层消息中间件的实现细节,希望以统一的一套API来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的Binder完成。Binding2 RabbitBinder和KafkaBinder,SpringCloudAlibaba内部已经实现了RocketMQBinder。从图中可以看出,Binding是连接应用程序跟消息中间件的桥梁,用于消息的消费和生产。我们来看一个最简单的使用RocketMQBinder的例子,然后分析一下它的底publicclassSendAndReceiveApplication{SpringApplication.run(SendAndReceiveApplication.class,args);@Bean//publicCustomRunnercustomRunner(){returnnewCustomRunner();publicstaticclassCustomRunnerimplementsCommandLineRunner{intcount=5;for(intindex=1;index<=count;index++){@StreamListener(Sink.INPUT)//4publicvoidreceiveByStreamListener1(StringreceiveMsg){这段代码很简单,没有涉及到RocketMQ相关的代码,消息的发送和接收都是基于SCS体系完成的。如果想切换成RabbitMQ或Kafka,只需修改配置文件即可,代码我们来分析下这段代码的原理

是SCS内部提供的。SCS

outputinput方法返回的MessageChannel output和input方法修饰注解对应的value是配置文件中binding的namepublicinterfaceSource{StringOUTPUT="output";publicinterfaceSource{StringOUTPUT="output";StringINPUT="input";配置文件里bindings的name为output和input,对 spring.cloud.stream.bindings.input.group=test-构造CommandLineRunner,程序启动的时候会执 Source里的output发送消息 消息通道之后会被 MessageHandler不同的消息中间件对应的MessageHandlerSpringMessageMessagebroker; inputbindingnameinput不同的消息中间件对应的方法会使用Consumer订阅消息,订阅到消息后内部会把中间件对应的Message模型转换成SpringMessage;消息转换之后会把SpringMessage发送至name为input对应 订阅了nameBinder实现以及MQ基本的订阅发布功能):SCS章节的最后,我们来看一段SCSpublicvoidreceiveByHeader(Messagemsg){System.out.println("receivebyheaders['index']=='1':"+publicvoidreceivePerson(@PayloadPersonperson){@StreamListener(value=Sink.INPUT)@StreamListener(value=publicvoidreceiveHeaderAndMsg(@Header("index")Stringindex,Messagemsg){System.out.println("receivebyHeaderAndMsgbyStreamListener.content:"+msg);有没有发现这段代码跟SpringMVCController中接收请求的代码很像?实际上SpringMessaging上图是SCS体系相关类说明的总结,关于SCS以及RocketMQBinder更多相RocketMQBinderDemos(Demos),包含了消息的聚合、分割、过滤;消息异常处理;消息标签、SQLSpringCloudBusSpringCloud作用,并逐步展开,分析SpringCloudAlibaba中的RocketMQBinder是如何实现SpringCloudStream标准的。SpringCloudAlibaba方法三:SpringCloud方法三:SpringCloudBus消息总线介 方法三:Spring方法三:SpringCloudBus方法三:SpringCloudBus消息总线介本期我们来了解下SpringCloud体系中的另外一个组件SpringCloudBus(建议先熟悉SpringCloudStream,不然无法理解SpringCloudBus内部的代码)。SpringCloudBus对自己的定位是SpringCloud体系内的消息总线,使用messagebroker来连接分布式系统的所有节点。Bus官方的Reference文档比较简SpringCloudBus一、Bus实例演示在分析Bus的实现之前,我们先来看两个使用SpringCloudBusBus的例子比较简单,因为Bus的AutoConfiguration层都有了默认的配置,只需要引入消息中间件对应的SpringCloudStream以及SpringCloudBus依赖即可,之后所有启动的应用都会使用同一个Topic进行消息的接收和发送。BusDemogithub(/fangjian0423rocketmqbinderdemotreemasterrocketmqbusdemo)Demo会模拟启动5个节点,只需要对其中任意的一个实例新增配置项,所有节点都curlcurl-XGET所有节点返回的结果都是unknown,因为所有节点的配置中没 keyEnvironmentBusEndpointBus内部提供 这个Endpoint通过EnvironmentBusEndpoint对应的:/进行配置项的新增(比如访问node1'content-type: $curl-XPOST'http://localhost:10001/actuator/bus-env?name=hangzhou&value=alibaba'keyhangzhouvaluealibaba。这个配置项是通过Bus提供 DD画的一张图片,SpringCloudConfig配合Bus完成所有节比如在node1上指定destination为rocketmq-bus-node2(node2配置了为 获取配置(由于在node1上发送消息,Bus也会对发送方的节node1进行配置修改):$$curl-XPOST'http://localhost:10001/actuator/bus-env/rocketmq-bus-node2?name=hang可以看到,只有node1和node2修改了配置,其余的3二、Bus的实现1BusBusRemoteApplicationEventSpringEnvironmentChangeRemoteApplicationEvent:一个Map类型的数据并更新到Spring上下文 中的事件。文AckRemoteApplicationEvent:远程确认事件。Bus UnknownRemoteApplicationEvent:远程未知事件。Bus内部消息体进行转换远-Bus内部还存在一个 -Trace 进行操作,比如 implementsApplicationListener<EnvironmentChangeRemoteApplicationEvent>{

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论