Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.
朱尚禮 Sam
twitter:@samzhu0318
blog:SAM的程式筆記
2016 Java Community Conference Taiwan Microservice & Cloud Native Speaker
2018 iT 邦幫忙鐵人賽 - 30天從零開始 使用 Spring Boot 跟 Spring Cloud 建構完整微服務架構
Spring 的愛好使用者
第一篇 Spring Boot
2014年6月17日 使用Spring-Boot快速建構Web應用
第一篇 Spring Cloud
2015年11月12日 Microservice with SpringCloud
On The Spring Blog
Spring Design Philosophy
Provide choice at every level. Spring lets you defer design decisions as late as possible. For example, you can switch persistence providers through configuration without changing your code. The same is true for many other infrastructure concerns and integration with third-party APIs.
實際經驗
在 NoSQL 異軍突起的年代
同時要開發 MongoDB, Elasticsearch 數據操作
還有原本的 RMDB
一樣是 CRUD 你就必須使用不同 SDK 不同的 API
Spring Data 出現後
JPA
@Entity public class Order{ @Id private String orderId; private String billingAddress; private String shippingAddress; }
public interface OrderRepository extends PagingAndSortingRepository< Order, Long > {}
MongoDB
@Document public class Order { @Id private String orderId; private String billingAddress; private String shippingAddress; }
public interface OrderRepository extends PagingAndSortingRepository< Order, Long > {}
Elasticsearch
@Document(indexName = "ec", type = "order") public class Order { @Id private String orderId; private String billingAddress; private String shippingAddress; }
public interface OrderRepository extends PagingAndSortingRepository< Order, Long > {}
從此之後我取資料的 API 就只有一種
OrderRepository.getOne()
訊息佇列是一種行程間通訊或同一行程的不同執行緒間的通訊方式,軟體的貯列用來處理一系列的輸入,通常是來自使用者。
Apache Kafka, RabbitMQ, Apache ActiveMQ, ZeroMQ, Apache RocketMQ(阿里), Redis, MQTT
你以為很多了嗎?
Amazon Kinesis, Google PubSub, Solace PubSub+, Azure Event Hubs
Kafka 適合做大量 log 收集
RabbitMQ 中庸 好架設 可以用作聊天系統
RocketMQ 有保證抵達順序 可以用作交易系統
RabbitMQ
Topics
任何發送到 Topic Exchange 的消息都會被轉發到所有關心 RouteKey 中指定話題的 Queue 上
Kafka
If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.
RocketMQ
用来表示一个消费消息应用,一个Consumer Group下包含多个Consumer实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象。一个Consumer Group下的多个Consumer以均摊方式消费消息,如果设置为广播方式,那么这个Consumer Group下的每个实例都消费全量数据。
但是這三套並不相容, 套件用法也各有不同
所以你必須同時搞懂三套的 API 用法
MQ | Protocol | Library |
---|---|---|
Kafka | 自訂RPC | kafka-clients-2.1.0.jar |
RabbitMQ | AMQP | amqp-client-5.5.1.jar |
RocketMQ | JMS, MQTT | rocketmq-client-4.3.2.jar |
不用再頭痛了
要來講 Spring Cloud Stream 了
功能與目的 跟 Spring Data 一樣, 由 Spring 來提供更高一階操作, 由各家來實現, 來達成減少我們開發成本.
Spring Cloud Stream
The core building blocks of Spring Cloud Stream are:
Binder Implementations
Spring Cloud Stream supports a variety of binder implementations and the following table includes the link to the GitHub projects.
RabbitMQ
Apache Kafka
Amazon Kinesis
Google PubSub (partner maintained)
Solace PubSub+ (partner maintained)
Azure Event Hubs (partner maintained)
2018.11.16「小马哥技术周报」- 第八期 Spring Cloud Stream Binder Alibaba 实现
sample code
dependency
org.springframework.boot
spring-boot-starter-amqp
org.springframework.cloud
spring-cloud-stream
org.springframework.cloud
spring-cloud-stream-binder-rabbit
Producer
Producer - application.yml
spring:
autoconfigure:
exclude: # 因為啟動會發生錯誤, 所以不需要預設的 RabbitTemplate
- org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
cloud:
stream:
bindings:
output: # channelName
destination: chat.message.v1 # destination,或者可以認為是發布-訂閱模型裏面的topic
content-type: application/json
producer:
required-groups: chatGroup # 生產者必須確保消息傳遞的組群列表(逗號分隔),即使它們是在創建之後啟動的(例如,通過在RabbitMQ中預先創建持久隊列)。
binder: eventMq # 綁定使用的binder
binders:
eventMq:
type: rabbit # 要使用的 MQ 系統 kafka or 其他等等
environment:
spring:
rabbitmq:
host: 192.168.56.101
port: 5672
Source.java
package org.springframework.cloud.stream.messaging;
/**
* Bindable interface with one output channel.
*
* @see org.springframework.cloud.stream.annotation.EnableBinding
* @author Dave Syer
* @author Marius Bogoevici
*/
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
Producer - DemoApplication.java
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
@EnableBinding(Source.class)
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, args);
}
}
發送消息
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
@Component
public class ApplicationLoader {
@Autowired
private Source source;
@EventListener(ApplicationReadyEvent.class)
public void init() {
Message< String > message = MessageBuilder.withPayload("Hi!!")
.build();
source.output().send(message);
}
}
Consumer
Consumer - application.yml
spring:
autoconfigure:
exclude:
- org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
cloud:
stream:
bindings:
input: # channelName
destination: chat.message.v1
content-type: application/json
binder: eventMq
binders:
eventMq:
type: rabbit
environment:
spring:
rabbitmq:
host: 192.168.56.101
port: 5672
channelName 不一樣
Sink.java
package org.springframework.cloud.stream.messaging;
/**
* Bindable interface with one input channel.
*
* @see org.springframework.cloud.stream.annotation.EnableBinding
* @author Dave Syer
* @author Marius Bogoevici
*/
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
接收消息
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
public class ChatReceiver {
@StreamListener(target = Sink.INPUT)
public void chatReceive(Message< String > chatMessage) {
System.out.println("收到: " + chatMessage.getPayload());
}
}
測試一下
說明一下 Spring 做了什麼
Exchanges
Bindings
Queues
Queue chat.message.v1.anonymous.SD0R-ebmRUqPjI-BZsX9Vg 在服務停止後會移除
人生最厲害就是這個 BUT !
多個 Consumer
重複消費問題
問題出在
Spring cloud stream 的解決方法
Consumer Groups
While the publish-subscribe model makes it easy to connect applications through shared topics, the ability to scale up by creating multiple instances of a given application is equally important. When doing so, different instances of an application are placed in a competing consumer relationship, where only one of the instances is expected to handle a given message.
Spring Cloud Stream models this behavior through the concept of a consumer group. (Spring Cloud Stream consumer groups are similar to and inspired by Kafka consumer groups.) Each consumer binding can use the spring.cloud.stream.bindings.< channelName >.group property to specify a group name. For the consumers shown in the following figure, this property would be set as spring.cloud.stream.bindings.< channelName >.group=hdfsWrite or spring.cloud.stream.bindings.< channelName >.group=average.
https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#consumer-groups
Consumer - application.yml
spring:
autoconfigure:
exclude:
- org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
cloud:
stream:
bindings:
input:
destination: chat.message.v1
content-type: application/json
group: chatGroup # 綁定消費組
binder: eventMq
binders:
eventMq:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
綁定 group
Exchange
Queue
加入
org.springframework.cloud
spring-cloud-starter-sleuth
取得 spanTraceId
@StreamListener(target = MqChannel.gameChannel)
public void receiveGameCreateEvent(
Message GameCreateEvent> eventMessage) {
log.info("Message spanTraceId={}", eventMessage.getHeaders().get("spanTraceId"));
log.info("Message spanId={}", eventMessage.getHeaders().get("spanId"));
}
取得 spanTraceId 結果
2018-12-17 11:31:59.718 INFO [gameservice-v1,2fff644a3967a533,868d7e1a8ca99ff0,false] 2052 --- [.v1.gameGroup-1]
c.e.demo.consumer.GameEventReceiver : Message spanTraceId=2fff644a3967a533
2018-12-17 11:31:59.719 INFO [gameservice-v1,2fff644a3967a533,868d7e1a8ca99ff0,false] 2052 --- [.v1.gameGroup-1]
c.e.demo.consumer.GameEventReceiver : Message spanId=63a6cfa6e68c0253
加入
org.springframework.boot
spring-boot-starter-actuator
開啟監控 application.yml
management:
health:
binders:
enabled: true
endpoints:
web:
exposure:
include: 'channels,bindings,bindings-name'
取得 channels 資訊
curl -X GET http://localhost:7203/actuator/channels
{
"inputs": {
"gameChannel": {
"destination": "game.create.message.v1",
"group": "gameGroup",
"binder": "eventMq",
"consumer": {
"instanceCount": 1,
"instanceIndex": 0,
"maxAttempts": 1
}
}
},
"outputs": {
"resultChannel": {
"destination": "result.message",
"binder": "eventMq",
"producer": {
"requiredGroups": [
"webGroup",
"resultGroup"
]
}
}
}
}
Consumer - application.yml
spring:
autoconfigure:
exclude: # 因為啟動會發生錯誤, 但是並不需要預設的 RabbitTemplate
- org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
cloud:
stream:
bindings:
input: # channelName
destination: chat.message.v1 # destination,或者可以認為是發布-訂閱模型裏面的 topic
content-type: application/json
group: chatGroup # 綁定消費組
consumer:
max-attempts: 3 # 默認為 3 次,1 表示禁用重試
binder: eventMq # 綁定使用的binder
binders:
eventMq:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
注意 max-attempts 配置
重試結果
Consumer - application.yml
spring:
autoconfigure:
exclude: # 因為啟動會發生錯誤, 但是並不需要預設的 RabbitTemplate
- org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
cloud:
stream:
bindings:
input: # channelName
destination: chat.message.v1 # destination,或者可以認為是發布-訂閱模型裏面的 topic
content-type: application/json
group: chatGroup # 綁定消費組
consumer:
max-attempts: 3 # 默認為 3 次,1 表示禁用重試
binder: eventMq # 綁定使用的binder
binders:
eventMq:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
rabbit:
bindings:
input: # 記得一定要對應到 channelName 才有效
producer:
auto-bind-dlq: true # 記得 producer & consumer 兩邊都要配置, 不然會出現綁定錯誤
consumer:
auto-bind-dlq: true # 建立 Exchange: DLX
republish-to-dlq: true # 該值為false如果設置了死信隊列,消息對原封不動的發送到死信隊列,如果為true,則消息對帶上錯誤信息發送至死信隊列
# dlq-ttl: 10000 # 存活時間(毫秒)
DLX
測試結果
建立DLX
Routing key
Queue
Queue chat.message.v1.chatGroup.dlq
republish-to-dlq 的功用
將錯誤的資訊發出去
死信接收(Rabbit)
@Slf4j
@Component
public class DlqResultReceiverAmqp {
private static final String ORIGINAL_QUEUE = "chat.message.v1.chatGroup";
private static final String DLQ = ORIGINAL_QUEUE + ".dlq";
@RabbitListener(queues = DLQ)
public void receiveDLQResult(Message failedMessage) {
log.info("amqp spanTraceId={}", headers.get("spanTraceId"));
log.info("amqp spanId={}", headers.get("spanId"));
log.info("amqp x-exception-message={}", headers.get("x-exception-message"));
log.info("amqp x-exception-stacktrace={}", headers.get("x-exception-stacktrace"));
String messageBody = new String(failedMessage.getBody());
log.info("amqp messageBody={}", messageBody);
}
}
支援 SpEL
@StreamListener(target = Processor.INPUT, condition = "headers['eventType']=='gameCreate'")
public void createGamePlus(
@Header(name = "username", required = false) String username,
Message< ResponseData > eventMessage) {
ResponseData responseDataToWeb = new ResponseData()
.setCode("0")
.setMessage(responseData.getGameName() + ", 歡迎 VIP 馬上充值 送好禮!!")
.setDateTime(LocalDateTime.now());
Message< ResponseData > responseDataMessage = MessageBuilder.withPayload(responseDataToWeb)
.setHeader("username", username)
.build();
}
為 事件驅動 鋪路?
等下來介紹 事件驅動
可不依賴 RabbitMQ 進行測試
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ExampleTest {
@Autowired
private Processor processor;
@Autowired
private MessageCollector messageCollector;
@Test
@SuppressWarnings("unchecked")
public void testWiring() {
Message< String> message = new GenericMessage<>("hello");
processor.input().send(message);
Message< String> received = (Message< String>) messageCollector.forChannel(processor.output()).poll();
assertThat(received.getPayload(), equalTo("hello world"));
}
@SpringBootApplication
@EnableBinding(Processor.class)
public static class MyProcessor {
@Autowired
private Processor channels;
@Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
public String transform(String in) {
return in + " world";
}
}
}
https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_testing
Consumer - application.yml
spring:
autoconfigure:
exclude: # 因為啟動會發生錯誤, 但是並不需要預設的 RabbitTemplate
- org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
cloud:
stream:
bindings:
input: # channelName
destination: chat.message.v1 # destination,或者可以認為是發布-訂閱模型裏面的 topic
content-type: application/json
group: chatGroup # 綁定消費組
consumer:
max-attempts: 1 # 默認為 3 次,1 表示禁用重試
binder: eventMq # 綁定使用的binder
binders:
eventMq:
type: rabbit
environment:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
rabbit:
bindings:
input:
producer:
auto-bind-dlq: true
consumer:
auto-bind-dlq: true # 建立 Exchange: DLX
# republish-to-dlq: true # 無法共用
requeue-rejected: true # 消息消費失敗之後,並不會將該消息拋棄,而是將消息重新放入隊列
requeue-rejected
消息消費失敗之後,並不會將該消息拋棄,而是將消息重新放入隊列
重複循環
利用 AmqpRejectAndDontRequeueException
@Component
public class ChatReceiver {
// 放 Redis 之類
private Map< String, Integer> redis = new HashMap();
@StreamListener(target = Sink.INPUT)
public void chatReceive(Message< String> chatMessage) {
System.out.println(LocalDateTime.now() + " 收到: " + chatMessage.getPayload());
Integer retries = 0;
// 這邊應該用雪花算法或是雜湊
if (redis.get(chatMessage.getPayload()) != null) {
retries = redis.get(chatMessage.getPayload());
}
if (retries == 3) {
throw new AmqpRejectAndDontRequeueException("tried 3 times failed, send to dlq!");
} else {
retries = retries + 1;
redis.put(chatMessage.getPayload(), retries);
throw new RuntimeException("Message consumer failed!");
}
}
}
http://blog.didispace.com/spring-cloud-starter-finchley-7-5/
沒有 republish-to-dlq
另一種
結合 Condition 處理
@Component
public class ChatReceiver {
@Autowired
private Processor processor;
@StreamListener(target = Processor.INPUT, condition = "headers['x-retries'] == null or headers['x-retries'] < 3")
public void chatReceive(Message< String> chatMessage) {
System.out.println(LocalDateTime.now() + " 收到: " + chatMessage.getPayload());
System.out.println(LocalDateTime.now() + " 收到 x-retries: " + chatMessage.getHeaders().get("x-retries"));
// 假設出錯自己丟回 MQ
Integer retries = 0;
if (chatMessage.getHeaders().get("x-retries") != null) {
retries = (Integer) chatMessage.getHeaders().get("x-retries") + 1;
}
Message< String> message = MessageBuilder.withPayload(chatMessage.getPayload())
.setHeader("x-retries", retries)
.build();
processor.output().send(message);
}
@StreamListener(target = Processor.INPUT, condition = "headers['x-retries'] >= 3")
public void chatReceiveDl(Message< String> chatMessage) {
System.out.println(LocalDateTime.now() + " DLQ 收到: " + chatMessage.getPayload());
System.out.println(LocalDateTime.now() + " DLQ 收到 x-retries: " + chatMessage.getHeaders().get("x-retries"));
}
2018-12-21T12:12:33.842 收到: Hi!!
2018-12-21T12:12:33.842 收到 x-retries: null
2018-12-21T12:12:33.854 收到: Hi!!
2018-12-21T12:12:33.854 收到 x-retries: 0
2018-12-21T12:12:33.855 收到: Hi!!
2018-12-21T12:12:33.855 收到 x-retries: 1
2018-12-21T12:12:33.857 收到: Hi!!
2018-12-21T12:12:33.857 收到 x-retries: 2
2018-12-21T12:12:33.858 DLQ 收到: Hi!!
2018-12-21T12:12:33.858 DLQ 收到 x-retries: 3
Consumer - application.yml
rabbit:
bindings:
input:
producer:
auto-bind-dlq: true
consumer:
auto-bind-dlq: true # 建立 Exchange: DLX
republish-to-dlq: true # 該值為false如果設置了死信隊列,消息對原封不動的發送到死信隊列,如果為true,則消息對帶上錯誤信息發送至死信隊列
acknowledge-mode: MANUAL # 設定為手動
import com.rabbitmq.client.Channel;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
@Component
public class ChatReceiver {
@StreamListener(target = Sink.INPUT)
public void chatReceive(
Message< String> chatMessage,
@Header(AmqpHeaders.CHANNEL) Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {
System.out.println(LocalDateTime.now() + " 收到: " + chatMessage.getPayload());
channel.basicAck(deliveryTag, false);
/**
*第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
* 第二个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
*/
}
}
事件驅動?
Spring Data 開始有 @DomainEvents ?
DDD aggregates and @DomainEvents | Baeldung
Spring Cloud Stream 可以用 condition
Domain-Driven Design
Event Sourcing & CQRS
解決分散式交易一致性問題
追本溯源
結果無法竄改
讀寫分別最佳化
Axon
FB
facebook - Domain Driven Design Taiwan 1st MeetUp at 2019 facebook - Domain Driven Design(Taiwan)
Josh Long
YouTube
YouTube
Spring 官方 blog
Josh Long 的 This Week in Spring
近期演講
其他網站
謝謝指教
範例程式