Spring Cloud Stream

Down arrow

Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

自我介紹

朱尚禮 Sam

twitter:@samzhu0318

blog:SAM的程式筆記

GitHub

2016 Java Community Conference Taiwan Microservice & Cloud Native Speaker

2018 iT 邦幫忙鐵人賽 - 30天從零開始 使用 Spring Boot 跟 Spring Cloud 建構完整微服務架構

Spring 的愛好使用者

第一篇 Spring Boot

2014年6月17日 使用Spring-Boot快速建構Web應用

第一篇 Spring Cloud

2015年11月12日 Microservice with SpringCloud

On The Spring Blog

先談一下 Spring

Spring Design Philosophy

Provide choice at every level. Spring lets you defer design decisions as late as possible. For example, you can switch persistence providers through configuration without changing your code. The same is true for many other infrastructure concerns and integration with third-party APIs.

https://docs.spring.io/spring/docs/current/spring-framework-reference/overview.html#overview-philosophy

實際經驗

在 NoSQL 異軍突起的年代

同時要開發 MongoDB, Elasticsearch 數據操作

還有原本的 RMDB

一樣是 CRUD 你就必須使用不同 SDK 不同的 API

絕望

Spring Data 出現後

JPA

                    
                        @Entity
                        public class Order{
                            @Id private String orderId;
                            private String billingAddress;
                            private String shippingAddress;
                        }
                    
                    
                        public interface OrderRepository
                            extends PagingAndSortingRepository< Order, Long > {}
                    
                

MongoDB

                    
                        @Document
                        public class Order {
                            @Id private String orderId;
                            private String billingAddress;
                            private String shippingAddress;
                        }
                    
                    
                        public interface OrderRepository
                            extends PagingAndSortingRepository< Order, Long > {}
                    
                

Elasticsearch

                    
                        @Document(indexName = "ec", type = "order")
                        public class Order {
                            @Id private String orderId;
                            private String billingAddress;
                            private String shippingAddress;
                        }
                    
                    
                        public interface OrderRepository
                            extends PagingAndSortingRepository< Order, Long > {}
                    
                

從此之後我取資料的 API 就只有一種

                    
                       OrderRepository.getOne()
                    
                

Message Queue

訊息佇列是一種行程間通訊或同一行程的不同執行緒間的通訊方式,軟體的貯列用來處理一系列的輸入,通常是來自使用者。

Message Queue 有哪些?

Apache Kafka, RabbitMQ, Apache ActiveMQ, ZeroMQ, Apache RocketMQ(阿里), Redis, MQTT

你以為很多了嗎?

還有雲端服務

Amazon Kinesis, Google PubSub, Solace PubSub+, Azure Event Hubs

設計理念不同

Kafka 適合做大量 log 收集

RabbitMQ 中庸 好架設 可以用作聊天系統

RocketMQ 有保證抵達順序 可以用作交易系統

RabbitMQ

Topics

RabbitmqTopics

任何發送到 Topic Exchange 的消息都會被轉發到所有關心 RouteKey 中指定話題的 Queue 上

https://www.rabbitmq.com/getstarted.html

Kafka

Kafka

If all the consumer instances have the same consumer group, then the records will effectively be load balanced over the consumer instances.

https://kafka.apache.org/documentation/#intro_consumers

RocketMQ

RocketMQ

用来表示一个消费消息应用,一个Consumer Group下包含多个Consumer实例,可以是多台机器,也可以是多个进程,或者是一个进程的多个Consumer对象。一个Consumer Group下的多个Consumer以均摊方式消费消息,如果设置为广播方式,那么这个Consumer Group下的每个实例都消费全量数据。

RocketMQ开发指南

假設需要同時使用

但是這三套並不相容, 套件用法也各有不同

所以你必須同時搞懂三套的 API 用法

MQ Protocol Library
Kafka 自訂RPC kafka-clients-2.1.0.jar
RabbitMQ AMQP amqp-client-5.5.1.jar
RocketMQ JMS, MQTT rocketmq-client-4.3.2.jar

不用再頭痛了

要來講 Spring Cloud Stream

Spring Cloud Stream

功能與目的 跟 Spring Data 一樣, 由 Spring 來提供更高一階操作, 由各家來實現, 來達成減少我們開發成本.

Spring Cloud Stream

The core building blocks of Spring Cloud Stream are:

  • Destination Binders: Components responsible to provide integration with the external messaging systems.
  • Destination Bindings: Bridge between the external messaging systems and application provided Producers and Consumers of messages (created by the Destination Binders).
  • Message: The canonical data structure used by producers and consumers to communicate with Destination Binders (and thus other applications via external messaging systems).
  • https://cloud.spring.io/spring-cloud-stream/

SCSt

Binder Implementations

Spring Cloud Stream supports a variety of binder implementations and the following table includes the link to the GitHub projects.

RabbitMQ

Apache Kafka

Amazon Kinesis

Google PubSub (partner maintained)

Solace PubSub+ (partner maintained)

Azure Event Hubs (partner maintained)

2018.11.16「小马哥技术周报」- 第八期 Spring Cloud Stream Binder Alibaba 实现

RocketMQ

bilibili視頻

Spring Cloud Stream

sample code

dependency


    
        
            org.springframework.boot
            spring-boot-starter-amqp
        
        
            org.springframework.cloud
            spring-cloud-stream
        
        
            org.springframework.cloud
            spring-cloud-stream-binder-rabbit
        
    
                

Producer

Producer - application.yml


spring:
  autoconfigure:
    exclude:    # 因為啟動會發生錯誤, 所以不需要預設的 RabbitTemplate
    - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
  cloud:
    stream:
      bindings:
        output:    # channelName
          destination: chat.message.v1    # destination,或者可以認為是發布-訂閱模型裏面的topic
          content-type: application/json
          producer:
            required-groups: chatGroup    # 生產者必須確保消息傳遞的組群列表(逗號分隔),即使它們是在創建之後啟動的(例如,通過在RabbitMQ中預先創建持久隊列)。
          binder: eventMq    # 綁定使用的binder
      binders:
        eventMq:
          type: rabbit    # 要使用的 MQ 系統 kafka or 其他等等
          environment:
            spring:
              rabbitmq:
                host: 192.168.56.101
                port: 5672
                

Source.java


package org.springframework.cloud.stream.messaging;

/**
 * Bindable interface with one output channel.
 *
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 * @author Dave Syer
 * @author Marius Bogoevici
 */
public interface Source {

	String OUTPUT = "output";

	@Output(Source.OUTPUT)
	MessageChannel output();

}
                

Github

Producer - DemoApplication.java


import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;

@EnableBinding(Source.class)
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

}
                

發送消息


import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;

@Component
public class ApplicationLoader {
    @Autowired
    private Source source;

    @EventListener(ApplicationReadyEvent.class)
    public void init() {
        Message< String > message = MessageBuilder.withPayload("Hi!!")
                    .build();
        source.output().send(message);
    }
}
                

Consumer

Consumer - application.yml


spring:
  autoconfigure:
    exclude:
    - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
  cloud:
    stream:
      bindings:
        input:    # channelName
          destination: chat.message.v1
          content-type: application/json
          binder: eventMq
      binders:
        eventMq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 192.168.56.101
                port: 5672
                

channelName 不一樣

Sink.java


package org.springframework.cloud.stream.messaging;

/**
 * Bindable interface with one input channel.
 *
 * @see org.springframework.cloud.stream.annotation.EnableBinding
 * @author Dave Syer
 * @author Marius Bogoevici
 */
public interface Sink {

	String INPUT = "input";

	@Input(Sink.INPUT)
	SubscribableChannel input();

}
                

Github

接收消息


import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;

@Component
public class ChatReceiver {

    @StreamListener(target = Sink.INPUT)
    public void chatReceive(Message< String > chatMessage) {
        System.out.println("收到: " + chatMessage.getPayload());
    }
}
                

測試一下

測試結果

Done!

興奮到模糊

說明一下 Spring 做了什麼

Exchanges

RabbitMQ Exchanges

Bindings

RabbitMQ Bindings

Queues

RabbitMQ Queues
RabbitMQ Queues

Queue chat.message.v1.anonymous.SD0R-ebmRUqPjI-BZsX9Vg 在服務停止後會移除

But

人生最厲害就是這個 BUT !

九把刀的書

博客來-人生最厲害就是這個 BUT!

多個 Consumer

多個 Consumer
黑人問號

重複消費問題

問題出在

兩個Queue
Exchange到兩個Queue
結構圖

Spring cloud stream 的解決方法

Consumer Groups

While the publish-subscribe model makes it easy to connect applications through shared topics, the ability to scale up by creating multiple instances of a given application is equally important. When doing so, different instances of an application are placed in a competing consumer relationship, where only one of the instances is expected to handle a given message.

Spring Cloud Stream models this behavior through the concept of a consumer group. (Spring Cloud Stream consumer groups are similar to and inspired by Kafka consumer groups.) Each consumer binding can use the spring.cloud.stream.bindings.< channelName >.group property to specify a group name. For the consumers shown in the following figure, this property would be set as spring.cloud.stream.bindings.< channelName >.group=hdfsWrite or spring.cloud.stream.bindings.< channelName >.group=average.

spring 解法

https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#consumer-groups

Consumer - application.yml


spring:
  autoconfigure:
    exclude:
    - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
  cloud:
    stream:
      bindings:
        input:
          destination: chat.message.v1
          content-type: application/json
          group: chatGroup    # 綁定消費組
          binder: eventMq
      binders:
        eventMq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
                

綁定 group

Done!

嚇得我目瞪口呆

Exchange

Exchange

Queue

Exchange
結構圖

其他功能

鏈路追蹤

加入


        
            org.springframework.cloud
            spring-cloud-starter-sleuth
        
                

取得 spanTraceId


    @StreamListener(target = MqChannel.gameChannel)
    public void receiveGameCreateEvent(
            Message GameCreateEvent> eventMessage) {
        log.info("Message spanTraceId={}", eventMessage.getHeaders().get("spanTraceId"));
        log.info("Message spanId={}", eventMessage.getHeaders().get("spanId"));
    }
                

取得 spanTraceId 結果


2018-12-17 11:31:59.718  INFO [gameservice-v1,2fff644a3967a533,868d7e1a8ca99ff0,false] 2052 --- [.v1.gameGroup-1]
                    c.e.demo.consumer.GameEventReceiver      : Message spanTraceId=2fff644a3967a533
2018-12-17 11:31:59.719  INFO [gameservice-v1,2fff644a3967a533,868d7e1a8ca99ff0,false] 2052 --- [.v1.gameGroup-1]
                    c.e.demo.consumer.GameEventReceiver      : Message spanId=63a6cfa6e68c0253
                

endpoint

加入



    org.springframework.boot
    spring-boot-starter-actuator

                

開啟監控 application.yml


management:
  health:
    binders:
      enabled: true
  endpoints:
    web:
      exposure:
        include: 'channels,bindings,bindings-name'
                

取得 channels 資訊


curl -X GET http://localhost:7203/actuator/channels
                


{
    "inputs": {
        "gameChannel": {
            "destination": "game.create.message.v1",
            "group": "gameGroup",
            "binder": "eventMq",
            "consumer": {
                "instanceCount": 1,
                "instanceIndex": 0,
                "maxAttempts": 1
            }
        }
    },
    "outputs": {
        "resultChannel": {
            "destination": "result.message",
            "binder": "eventMq",
            "producer": {
                "requiredGroups": [
                    "webGroup",
                    "resultGroup"
                ]
            }
        }
    }
}
                

重試

Consumer - application.yml


spring:
  autoconfigure:
    exclude:    # 因為啟動會發生錯誤, 但是並不需要預設的 RabbitTemplate
    - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
  cloud:
    stream:
      bindings:
        input:    # channelName
          destination: chat.message.v1    # destination,或者可以認為是發布-訂閱模型裏面的 topic
          content-type: application/json
          group: chatGroup    # 綁定消費組
          consumer:
            max-attempts: 3    # 默認為 3 次,1 表示禁用重試
          binder: eventMq    # 綁定使用的binder
      binders:
        eventMq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
                

注意 max-attempts 配置

重試結果

Dead letter exchanges (DLXs)

Consumer - application.yml


spring:
  autoconfigure:
    exclude:    # 因為啟動會發生錯誤, 但是並不需要預設的 RabbitTemplate
    - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
  cloud:
    stream:
      bindings:
        input:    # channelName
          destination: chat.message.v1    # destination,或者可以認為是發布-訂閱模型裏面的 topic
          content-type: application/json
          group: chatGroup    # 綁定消費組
          consumer:
            max-attempts: 3    # 默認為 3 次,1 表示禁用重試
          binder: eventMq    # 綁定使用的binder
      binders:
        eventMq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
      rabbit:
        bindings:
          input:    # 記得一定要對應到 channelName 才有效
            producer:
              auto-bind-dlq: true     # 記得 producer & consumer 兩邊都要配置, 不然會出現綁定錯誤
            consumer:
              auto-bind-dlq: true    # 建立 Exchange: DLX
              republish-to-dlq: true    # 該值為false如果設置了死信隊列,消息對原封不動的發送到死信隊列,如果為true,則消息對帶上錯誤信息發送至死信隊列
#              dlq-ttl: 10000     # 存活時間(毫秒)
                

DLX

DLX結構圖

測試結果

建立DLX

Routing key

Queue

Queue chat.message.v1.chatGroup.dlq

republish-to-dlq 的功用

將錯誤的資訊發出去

死信接收(Rabbit)


@Slf4j
@Component
public class DlqResultReceiverAmqp {
    private static final String ORIGINAL_QUEUE = "chat.message.v1.chatGroup";
    private static final String DLQ = ORIGINAL_QUEUE + ".dlq";

    @RabbitListener(queues = DLQ)
    public void receiveDLQResult(Message failedMessage) {
        log.info("amqp spanTraceId={}", headers.get("spanTraceId"));
        log.info("amqp spanId={}", headers.get("spanId"));
        log.info("amqp x-exception-message={}", headers.get("x-exception-message"));
        log.info("amqp x-exception-stacktrace={}", headers.get("x-exception-stacktrace"));
        String messageBody = new String(failedMessage.getBody());
        log.info("amqp messageBody={}", messageBody);
    }
}
                

Condition

支援 SpEL


    @StreamListener(target = Processor.INPUT, condition = "headers['eventType']=='gameCreate'")
    public void createGamePlus(
            @Header(name = "username", required = false) String username,
            Message< ResponseData > eventMessage) {
        ResponseData responseDataToWeb = new ResponseData()
                .setCode("0")
                .setMessage(responseData.getGameName() + ", 歡迎 VIP 馬上充值 送好禮!!")
                .setDateTime(LocalDateTime.now());
        Message< ResponseData > responseDataMessage = MessageBuilder.withPayload(responseDataToWeb)
                .setHeader("username", username)
                .build();
    }
                

為 事件驅動 鋪路?

未來

针对事件驱动架构的Spring Cloud Stream - 云+社区 - 腾讯云

等下來介紹 事件驅動

測試

可不依賴 RabbitMQ 進行測試


@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment= SpringBootTest.WebEnvironment.RANDOM_PORT)
public class ExampleTest {

  @Autowired
  private Processor processor;

  @Autowired
  private MessageCollector messageCollector;

  @Test
  @SuppressWarnings("unchecked")
  public void testWiring() {
    Message< String> message = new GenericMessage<>("hello");
    processor.input().send(message);
    Message< String> received = (Message< String>) messageCollector.forChannel(processor.output()).poll();
    assertThat(received.getPayload(), equalTo("hello world"));
  }


  @SpringBootApplication
  @EnableBinding(Processor.class)
  public static class MyProcessor {

    @Autowired
    private Processor channels;

    @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT)
    public String transform(String in) {
      return in + " world";
    }
  }
}
                

https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_testing

如果我不想直接重試 想讓消息重新排隊呢?

Consumer - application.yml


spring:
  autoconfigure:
    exclude:    # 因為啟動會發生錯誤, 但是並不需要預設的 RabbitTemplate
    - org.springframework.boot.autoconfigure.amqp.RabbitAutoConfiguration
  cloud:
    stream:
      bindings:
        input:    # channelName
          destination: chat.message.v1    # destination,或者可以認為是發布-訂閱模型裏面的 topic
          content-type: application/json
          group: chatGroup    # 綁定消費組
          consumer:
            max-attempts: 1    # 默認為 3 次,1 表示禁用重試
          binder: eventMq    # 綁定使用的binder
      binders:
        eventMq:
          type: rabbit
          environment:
            spring:
              rabbitmq:
                host: 127.0.0.1
                port: 5672
      rabbit:
        bindings:
          input:
            producer:
              auto-bind-dlq: true
            consumer:
              auto-bind-dlq: true    # 建立 Exchange: DLX
#              republish-to-dlq: true    # 無法共用
              requeue-rejected: true    # 消息消費失敗之後,並不會將該消息拋棄,而是將消息重新放入隊列
                

requeue-rejected

消息消費失敗之後,並不會將該消息拋棄,而是將消息重新放入隊列

重複循環

利用 AmqpRejectAndDontRequeueException


@Component
public class ChatReceiver {
    // 放 Redis 之類
    private Map< String, Integer> redis = new HashMap();

    @StreamListener(target = Sink.INPUT)
    public void chatReceive(Message< String> chatMessage) {
        System.out.println(LocalDateTime.now() + " 收到: " + chatMessage.getPayload());
        Integer retries = 0;
        // 這邊應該用雪花算法或是雜湊
        if (redis.get(chatMessage.getPayload()) != null) {
            retries = redis.get(chatMessage.getPayload());
        }
        if (retries == 3) {
            throw new AmqpRejectAndDontRequeueException("tried 3 times failed, send to dlq!");
        } else {
            retries = retries + 1;
            redis.put(chatMessage.getPayload(), retries);
            throw new RuntimeException("Message consumer failed!");
        }
    }
}
                

http://blog.didispace.com/spring-cloud-starter-finchley-7-5/

沒有 republish-to-dlq

另一種

結合 Condition 處理


@Component
public class ChatReceiver {
    @Autowired
    private Processor processor;

    @StreamListener(target = Processor.INPUT, condition = "headers['x-retries'] == null or headers['x-retries'] < 3")
    public void chatReceive(Message< String> chatMessage) {
        System.out.println(LocalDateTime.now() + " 收到: " + chatMessage.getPayload());
        System.out.println(LocalDateTime.now() + " 收到 x-retries: " + chatMessage.getHeaders().get("x-retries"));
        // 假設出錯自己丟回 MQ
        Integer retries = 0;
        if (chatMessage.getHeaders().get("x-retries") != null) {
            retries = (Integer) chatMessage.getHeaders().get("x-retries") + 1;
        }
        Message< String> message = MessageBuilder.withPayload(chatMessage.getPayload())
                .setHeader("x-retries", retries)
                .build();
        processor.output().send(message);
    }

    @StreamListener(target = Processor.INPUT, condition = "headers['x-retries'] >= 3")
    public void chatReceiveDl(Message< String> chatMessage) {
        System.out.println(LocalDateTime.now() + " DLQ 收到: " + chatMessage.getPayload());
        System.out.println(LocalDateTime.now() + " DLQ 收到 x-retries: " + chatMessage.getHeaders().get("x-retries"));
    }
                

            2018-12-21T12:12:33.842 收到: Hi!!
            2018-12-21T12:12:33.842 收到 x-retries: null
            2018-12-21T12:12:33.854 收到: Hi!!
            2018-12-21T12:12:33.854 收到 x-retries: 0
            2018-12-21T12:12:33.855 收到: Hi!!
            2018-12-21T12:12:33.855 收到 x-retries: 1
            2018-12-21T12:12:33.857 收到: Hi!!
            2018-12-21T12:12:33.857 收到 x-retries: 2
            2018-12-21T12:12:33.858 DLQ 收到: Hi!!
            2018-12-21T12:12:33.858 DLQ 收到 x-retries: 3
                

ack操作

Consumer - application.yml


      rabbit:
        bindings:
          input:
            producer:
              auto-bind-dlq: true
            consumer:
              auto-bind-dlq: true    # 建立 Exchange: DLX
              republish-to-dlq: true    # 該值為false如果設置了死信隊列,消息對原封不動的發送到死信隊列,如果為true,則消息對帶上錯誤信息發送至死信隊列
              acknowledge-mode: MANUAL  # 設定為手動
                

import com.rabbitmq.client.Channel;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;

@Component
public class ChatReceiver {

    @StreamListener(target = Sink.INPUT)
    public void chatReceive(
            Message< String> chatMessage,
            @Header(AmqpHeaders.CHANNEL) Channel channel,
            @Header(AmqpHeaders.DELIVERY_TAG) Long deliveryTag) throws IOException {
        System.out.println(LocalDateTime.now() + " 收到: " + chatMessage.getPayload());
        channel.basicAck(deliveryTag, false);
        /**
         *第一个参数deliveryTag:发布的每一条消息都会获得一个唯一的deliveryTag,deliveryTag在channel范围内是唯一的
         * 第二个参数requeue:表示如何处理这条消息,如果值为true,则重新放入RabbitMQ的发送队列,如果值为false,则通知RabbitMQ销毁这条消息
         */
    }
}
                

補充

事件驅動?

未來

针对事件驱动架构的Spring Cloud Stream - 云+社区 - 腾讯云

Spring Data 開始有 @DomainEvents ?

DDD aggregates and @DomainEvents | Baeldung

Spring Cloud Stream 可以用 condition

Domain-Driven Design

DDD

Implementing Domain-Driven Design

Event Sourcing & CQRS

解決分散式交易一致性問題

追本溯源

結果無法竄改

讀寫分別最佳化

Axon

Bootiful CQRS and Event Sourcing with Axon Framework

FB

FB

facebook - Domain Driven Design Taiwan 1st MeetUp at 2019 facebook - Domain Driven Design(Taiwan)

小小心得分享

Josh Long

@starbuxman

YouTube

YouTube

SpringDeveloper

Spring I/O

Spring Framework Guru

Spring 官方 blog

https://spring.io/blog

Josh Long 的 This Week in Spring

近期演講

云原生Spring_腾讯视频

其他網站

Spring Cloud中国社区

程序猿DD

謝謝指教

範例程式

https://github.com/samzhu/spring_cloud_stream_demo