Spring Cloud Stream
Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在 Spring Cloud 应用程序中的开发。
是一款用于构建消息驱动的微服务应用程序的轻量级框架

应用程序通过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互
Spring Cloud Stream 的 binder 负责与消息中间件交互
Binder
RabbitMQ
Apache Kafka
Kafka Streams
Amazon Kinesis
RocketMQ
通过Spring Cloud Stream访问 RabbitMQ
1.RabbitMQ
RabbitMQ is the most widely deployed open source message broker
RabbitMQ是一个基于AMQP协议的高级消息中间件,它主要的技术特点是可用性,安全性,集群,多协议支持,可视化的客户端,活跃的社区
2.docker安装rabbitmq
使用的是带 web 管理插件的镜像
docker pull rabbitmq:management
运行
docker run --name rabbitmq -d -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=spring -e RABBITMQ_DEFAULT_PASS=spring rabbitmq:management
5672 应用访问端口
15672 控制台Web端口号
访问管理端了 http://宿主机IP:15672
eg:
http://192.168.99.100:15672/

说明:
默认创建了一个 guest 用户,密码也是 guest
这里在启动的时候指定了用户名spring密码spring

在线的RabbitMQ模拟器 http://tryrabbitmq.com

3.示例
(1)service-provider
添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>配置
server.port=8010
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
spring.application.name=service-provider
spring.cloud.consul.host=192.168.99.100
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.health-check-path=/actuator/health
spring.cloud.consul.discovery.service-name=${spring.application.name}
spring.cloud.consul.discovery.heartbeat.enabled=true
spring.cloud.consul.discovery.prefer-ip-address=true
spring.rabbitmq.host=192.168.99.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=spring
spring.rabbitmq.password=spring
spring.cloud.stream.bindings.finishedOrders.group=service-provider启动类
@EnableBinding注解,传入Barista告诉Spring加载
package com.xyz.provider;
import com.xyz.provider.integration.Barista;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableDiscoveryClient
@SpringBootApplication
@EnableBinding(Barista.class)
public class ProviderApplication {
public static void main(String[] args) {
SpringApplication.run(ProviderApplication.class, args);
}
}使用SubscribableChannel和@Input注解连接到newOrders,消息数据将被推送这里
监听通道创建一个绑定
package com.xyz.provider.integration;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Barista {
String NEW_ORDERS = "newOrders";
String FINISHED_ORDERS = "finishedOrders";
@Input
SubscribableChannel finishedOrders();
@Output
MessageChannel newOrders();
}消费消息的类
package com.xyz.provider.integration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class OrderListener {
@StreamListener(Barista.FINISHED_ORDERS)
public void listenFinishedOrders(Integer num) {
log.info("We‘ve finished an order [{}].", num);
}
}控制器
package com.xyz.provider.controller;
import com.xyz.provider.service.OrderService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@Slf4j
public class demoController {
@Autowired
private OrderService orderService;
@RequestMapping("/rabbitmq")
public String rabbitmq(Integer num) {
log.info("msq num: ", num);
orderService.updateNum(num);
return "ok";
}
}消费消息的类
package com.xyz.provider.service;
import com.xyz.provider.integration.Barista;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
@Transactional
@Slf4j
public class OrderService{
@Autowired
private Barista barista;
public boolean updateNum(Integer num) {
num++;
System.out.println(num);
barista.newOrders().send(MessageBuilder.withPayload(num).build());
return true;
}
}service-comsumer
依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-discovery</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>配置
server.port=8015
spring.application.name=service-comsumer
management.endpoints.web.exposure.include=*
management.endpoint.health.show-details=always
spring.cloud.consul.host=192.168.99.100
spring.cloud.consul.port=8500
spring.cloud.consul.discovery.health-check-path=/actuator/health
spring.cloud.consul.discovery.service-name=${spring.application.name}
spring.cloud.consul.discovery.heartbeat.enabled=true
spring.rabbitmq.host=192.168.99.100
spring.rabbitmq.port=5672
spring.rabbitmq.username=spring
spring.rabbitmq.password=spring
spring.cloud.stream.bindings.newOrders.group=service-comsumer启动类
@EnableBinding注解,传入Waiter告诉Spring加载
package com.xyz.comsumer;
import com.xyz.comsumer.integration.Waiter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.openfeign.EnableFeignClients;
import org.springframework.cloud.stream.annotation.EnableBinding;
@EnableFeignClients
@SpringBootApplication
@EnableBinding(Waiter.class)
public class ComsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ComsumerApplication.class, args);
}
}消费消息的类
使用MessageBuilder创建一个String类型的消息
使用MessageChannel上的.send()方法来发布消息
package com.xyz.comsumer.integration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
@Component
@Slf4j
@Transactional
public class OrderListener {
@Autowired
@Qualifier(Waiter.FINISHED_ORDERS)
private MessageChannel finishedOrdersMessageChannel;
@StreamListener(Waiter.NEW_ORDERS)
public void processNewOrder(Integer num) {
num++;
log.info("Receive a new order",
num);
System.out.println(num);
finishedOrdersMessageChannel.send(MessageBuilder.withPayload(num).build());
}
}监听通道创建一个绑定
package com.xyz.comsumer.integration;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Waiter {
String NEW_ORDERS = "newOrders";
String FINISHED_ORDERS = "finishedOrders";
@Input(NEW_ORDERS)
SubscribableChannel newOrders();
@Output(FINISHED_ORDERS)
MessageChannel finishedOrders();
}启动rabbitmq
启动consul
启动service-provider
启动service-comsumer
GET http://172.27.0.17:8010/rabbitmq?num=1
返回OK
控制台消息也输出了

说明:
如果出现
ERROR 13220 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.beans.factory.BeanDefinitionStoreException: Failed to process import candidates for configuration class [com.xyz.provider.ProviderApplication]; nested exception is java.lang.IllegalStateException: Failed to introspect annotated methods on class org.springframework.cloud.stream.config.BinderFactoryConfiguration
可以换下
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.4.RELEASE</version>
<relativePath/>
</parent>及
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Greenwich.SR1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>