您当前的位置: 首页 >  学无止境 >  文章详情

微服务架构之Spring Cloud Stream详解

时间: 2023-10-27 【学无止境】 阅读量:共485人围观

简介 Spring Cloud Stream是构建消息驱动的微服务应用程序的框架。Spring Cloud Stream基于Spring Boot建立独立的生产级Spring应用程序,并使用Spring Integration提供与消息代理的连接。SpringCloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

什么是SpringCloudStream?

官方定义 Spring Cloud stream 是一个构建消息驱动微服务的框架。

  1. 应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream中binder对象交互。

  2. 通过我们配置来binding(绑定),而 Spring Cloud Stream 的 binder对象负责与消息中间件交互.

  3. 所以,我们只需要搞清楚如何与 Spring Cloud Stream 交互就可以方便使用消息驱动的方式。

目前仅支持RabbitMQ、Kafka

通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。
使用框架

  • RabbitMQ和Kafka是两个不同的框架,两个消息模型上也存在着差异,并且代码上用法也不一样。Spring Cloud Stream就是不再关注具体MQ的细节,可以在不改代码的基础上,来完成Rabbit和Kafka两个不同的消息中间件的切换(这里的切换指的是原本用的RabbitMQ,但是用着用着发现kafka比较符合,所以想要换框架)。

总结成一句话:屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型

设计思想

42275e22a3d744599e8de4a44c7d4ac2.png

  • Message:生产者/消费者之间靠消息媒介传递信息内容
  • MessageChannel:消息必须走特定的通道
  • 队列:假如发消息会先发到消息队列当中
  • 消息队列的消息如何被消费呢:订阅的人可以进行消费

cloud Stream设计

  • 通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

    在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务可以关注更多自己的业务流程。

8ab5014afa8c4b9e9a317a7e04bc88e61.png

  • Binder可以生成Binding,Binding用来绑定消息容器的生产者和消费者,它有两种类型,INPUT和OUTPUT,INPUT对应于消费者,OUTPUT对应于生产者。

通过stream连接RabbitMQ

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>

通过stream连接Kafka

<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>

Stream中的消息通信方式遵循了发布-订阅模式,Topic主题进行广播,在RabbitMQ就是Exchange,在Kakfa中就是Topic。

微信图片_20231027161444.png

  • Binder: 很方便的连接中间件,屏蔽差异
  • Channel: 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channe对队列进行配置
  • Source(源:发送者)和Sink(水槽:接受者): 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入。

应用模型各组成

  • Middleware 中间件,目前只支持 RabbitMQ 和 Kafka
  • Binder Binder 是应用于消息中间件之间的封装,可以动态的改变消息类型
  • @Input 注解标识输入通道,通过改输入通道接收到的消息进入应用程序
  • @Output 注解标识输出通道,发布的消息通过该通道离开应用程序
  • @StreamListener 监听队列,用于消费者队列的消息接收
  • @EnableBinding 指信道 channel 和 exchange 绑定在一起

生产者配置bootstrap.yml

server: port: 8001 spring: application: name: cloud-stream-provider cloud: stream: binders: # 在此处配置需要绑定的rabbitMq的服务信息 defaultRabbit: ## 表示定义的名称,用于binding的整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / bindings: ## 服务的整合处理 output: # 这个名字是一个通道的名称,消息发送方使用的output、消息接收方使用的是input destination: qyExchange # 表示要是用的exchange名称定义 content-type: application/json # 设置消息类型,本次为json,文本则设置为text/plain ## 设置要绑定的消息服务的具体设置 binder: defaultRabbit eureka: client: register-with-eureka: true #向注册中心注册自己 fetch-registry: true #从EurekaServer抓取已有的注册信息,集群必须设置成true,才能配合ribbon负载均衡 service-url: defaultZone: http://eureka7001.com:7001/eureka instance: instance-id: provider8001 #主机名称修改 prefer-ip-address: true #访问路径可以显示ip

注意:生产者配置的是output

启动类

@SpringBootApplication @EnableEurekaClient public class StreamProviderApplication { public static void main(String[] args) { SpringApplication.run(StreamProviderApplication.class, args); } }

生产者业务类
cloud-stream-provider工程业务类处理,模拟发送消息的业务

// controller @RestController public class SendMessageController { @Resource private IMessageProviderService messageProviderService; @GetMapping(value = "/sendMessage") public String sendMessage() { return messageProviderService.send(); } } // Service接口 public interface IMessageProviderService { String send() ; } // Service接口实现类 @Slf4j @EnableBinding(Source.class) //定义消息的推送管道,即:源 //将@EnableBinding注释应用于应用程序的配置类之一。@EnableBinding注释本身使用@Configuration进行元注释 /*此处不再需要引入 spring 注解 @Service,这里的业务实现类是与RabbitMQ配合的,使用的 SpringCloud Stream 的注解*/ public class MessageProviderServiceImpl implements IMessageProviderService { @Resource //在Spring Cloud Stream 1.0中,唯一支持的可绑定组件是Spring消息传递MessageChannel及其扩展名SubscribableChannel和PollableChannel private MessageChannel output; //消息发送管道 @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); log.info("*****serial:" + serial); return "RabbitMQ 消息发送方:" + serial; } }

Stream消息驱动之消费者
消费者配置bootstrap.yml

server: port: 8002 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的 rabbitmq 的服务信息 defaultRabbit: # 表示定义的名称,用于 binding 整合 type: rabbit # 消息组件类型 environment: # 设置 rabbitmq 的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称,消息发送方使用的output、消息接收方使用的是input destination: qyExchange # 表示要使用的 Exchange 名称定义 content-type: application/json #设置消息类型,本次为json,文本则设置 text/plain binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: consumerGroup eureka: client: register-with-eureka: true #向注册中心注册自己 fetch-registry: true #从EurekaServer抓取已有的注册信息,集群必须设置成true,才能配合ribbon负载均衡 service-url: defaultZone: http://eureka7001.com:7001/eureka instance: instance-id: consumer8002 #主机名称修改 prefer-ip-address: true #访问路径可以显示ip

注意:消费者配置的是input

消费者业务类

/** *增加订阅监听器 **/ @Component @EnableBinding(Sink.class) public class ReceiveMessageListenerController { @Value(value = "${server.port}") private String serverPort; @StreamListener(Sink.INPUT)//使用@StreamListener进行自动内容类型处理 //@StreamListener注释提供了一种更简单的处理入站邮件的模型,特别是在处理涉及内容类型管理和类型强制的用例时。 public void input(Message<String> message) { System.out.println("消费者 1 号,----->接收到的消息:" + message.getPayload() + "\t port:" + serverPort); } }

如何解决消息的重复消费?消息丢失?

当集群方式进行消息消费时,就会存在消息的重复消费问题。通过分组解决,只要是一个组的消费者,就处于竞争关系,一次只能有一个去消费,这就可以解决重复消费的问题了。而且分组(group)还解决了持久化的问题。

修改 8002(group1)、8003(group2) 的 yml 配置文件,添加group分组
在实际的生产过程中,一定要配置消息分组(group),以免造成服务宕机造成的消息丢失的问题

文章评论
总共 0 条评论
这篇文章还没有收到评论,赶紧来抢沙发吧~
Copyright (C) 2023-现在 若熙站点 保留所有权利 蜀ICP备 17034318号-4  公安备案号 50010302505321