STOMP Support
Spring Integration version 4.2 introduced STOMP (Simple Text Orientated Messaging Protocol) client support.
It is based on the architecture, infrastructure, and API from the Spring Framework’s messaging module, stomp package.
Spring Integration uses many of Spring STOMP components (such as StompSession and StompClientSupport).
For more information, see the Spring Framework STOMP Support chapter in the Spring Framework reference manual.
You need to include this dependency into your project:
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stomp</artifactId>
    <version>6.0.4</version>
</dependency>compile "org.springframework.integration:spring-integration-stomp:6.0.4"
For server side components you need to add a org.springframework:spring-websocket and/or io.projectreactor.netty:reactor-netty dependencies.
Overview
To configure STOMP, you should start with the STOMP client object. The Spring Framework provides the following implementations:
- 
WebSocketStompClient: Built on the Spring WebSocket API with support for standard JSR-356 WebSocket, Jetty 9, and SockJS for HTTP-based WebSocket emulation with SockJS Client.
- 
ReactorNettyTcpStompClient: Built onReactorNettyTcpClientfrom thereactor-nettyproject.
You can provide any other StompClientSupport implementation.
See the Javadoc of those classes.
The StompClientSupport class is designed as a factory to produce a StompSession for the provided StompSessionHandler and all the remaining work is done through the callbacks to that StompSessionHandler and StompSession abstraction.
With the Spring Integration adapter abstraction, we need to provide some managed shared object to represent our application as a STOMP client with its unique session.
For this purpose, Spring Integration provides the StompSessionManager abstraction to manage the single StompSession between any provided StompSessionHandler.
This allows the use of inbound or outbound channel adapters (or both) for the particular STOMP Broker.
See StompSessionManager (and its implementations) JavaDocs for more information.
STOMP Inbound Channel Adapter
The StompInboundChannelAdapter is a one-stop MessageProducer component that subscribes your Spring Integration application to the provided STOMP destinations and receives messages from them (converted from the STOMP frames by using the provided MessageConverter on the connected StompSession).
You can change the destinations (and therefore STOMP subscriptions) at runtime by using appropriate @ManagedOperation annotations on the StompInboundChannelAdapter.
For more configuration options, see STOMP Namespace Support and the StompInboundChannelAdapter Javadoc.
STOMP Outbound Channel Adapter
The StompMessageHandler is the MessageHandler for the <int-stomp:outbound-channel-adapter> and is used to send the outgoing Message<?> instances to the STOMP destination (pre-configured or determined at runtime with a SpEL expression) through the StompSession (which is provided by the shared StompSessionManager).
For more configuration options see STOMP Namespace Support and the StompMessageHandler Javadoc.
STOMP Headers Mapping
The STOMP protocol provides headers as part of its frame. The entire structure of the STOMP frame has the following format:
....
COMMAND
header1:value1
header2:value2
Body^@
....Spring Framework provides StompHeaders to represent these headers.
See the Javadoc for more details.
STOMP frames are converted to and from Message<?> instances and these headers are mapped to and from MessageHeaders instances.
Spring Integration provides a default HeaderMapper implementation for the STOMP adapters.
The implementation is StompHeaderMapper.
It provides fromHeaders() and toHeaders() operations for the inbound and outbound adapters, respectively.
As with many other Spring Integration modules, the IntegrationStompHeaders class has been introduced to map standard STOMP headers to MessageHeaders, with stomp_ as the header name prefix.
In addition, all MessageHeaders instances with that prefix are mapped to the StompHeaders when sending to a destination.
For more information, see the Javadoc for those classes and the mapped-headers attribute description in the STOMP Namespace Support.
STOMP Integration Events
Many STOMP operations are asynchronous, including error handling.
For example, STOMP has a RECEIPT server frame that it returns when a client frame has requested one by adding the RECEIPT header.
To provide access to these asynchronous events, Spring Integration emits StompIntegrationEvent instances, which you can obtain by implementing an ApplicationListener or by using an <int-event:inbound-channel-adapter> (see Receiving Spring Application Events).
Specifically, a StompExceptionEvent is emitted from the AbstractStompSessionManager when a stompSessionListenableFuture receives onFailure() due to failure to connect to STOMP broker.
Another example is the StompMessageHandler.
It processes ERROR STOMP frames, which are server responses to improper (unaccepted) messages sent by this StompMessageHandler.
The StompMessageHandler emits StompReceiptEvent as a part of StompSession.Receiptable callbacks in the asynchronous answers for the messages sent to the StompSession.
The StompReceiptEvent can be positive or negative, depending on whether or not the RECEIPT frame was received from the server within the receiptTimeLimit period, which you can configure on the StompClientSupport instance.
It defaults to 15 * 1000 (in milliseconds, so 15 seconds).
| The StompSession.Receiptablecallbacks are added only if theRECEIPTSTOMP header of the message to send is notnull.
You can enable automaticRECEIPTheader generation on theStompSessionthrough itsautoReceiptoption and on theStompSessionManagerrespectively. | 
See STOMP Adapters Java Configuration for more information how to configure Spring Integration to accept those ApplicationEvent instances.
STOMP Adapters Java Configuration
The following example shows a comprehensive Java configuration for STOMP adapters:
@Configuration
@EnableIntegration
public class StompConfiguration {
    @Bean
    public ReactorNettyTcpStompClient stompClient() {
        ReactorNettyTcpStompClient stompClient = new ReactorNettyTcpStompClient("127.0.0.1", 61613);
        stompClient.setMessageConverter(new PassThruMessageConverter());
        ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
        taskScheduler.afterPropertiesSet();
        stompClient.setTaskScheduler(taskScheduler);
        stompClient.setReceiptTimeLimit(5000);
        return stompClient;
    }
    @Bean
    public StompSessionManager stompSessionManager() {
        ReactorNettyTcpStompSessionManager stompSessionManager = new ReactorNettyTcpStompSessionManager(stompClient());
        stompSessionManager.setAutoReceipt(true);
        return stompSessionManager;
    }
    @Bean
    public PollableChannel stompInputChannel() {
        return new QueueChannel();
    }
    @Bean
    public StompInboundChannelAdapter stompInboundChannelAdapter() {
        StompInboundChannelAdapter adapter =
        		new StompInboundChannelAdapter(stompSessionManager(), "/topic/myTopic");
        adapter.setOutputChannel(stompInputChannel());
        return adapter;
    }
    @Bean
    @ServiceActivator(inputChannel = "stompOutputChannel")
    public MessageHandler stompMessageHandler() {
        StompMessageHandler handler = new StompMessageHandler(stompSessionManager());
        handler.setDestination("/topic/myTopic");
        return handler;
    }
    @Bean
    public PollableChannel stompEvents() {
        return new QueueChannel();
    }
    @Bean
    public ApplicationListener<ApplicationEvent> stompEventListener() {
        ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer();
        producer.setEventTypes(StompIntegrationEvent.class);
        producer.setOutputChannel(stompEvents());
        return producer;
    }
}
STOMP Namespace Support
The Spring Integration STOMP namespace implements the inbound and outbound channel adapter components. To include it in your configuration, provide the following namespace declaration in your application context configuration file:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xmlns:int="http://www.springframework.org/schema/integration"
  xmlns:int-stomp="http://www.springframework.org/schema/integration/stomp"
  xsi:schemaLocation="
    http://www.springframework.org/schema/beans
    https://www.springframework.org/schema/beans/spring-beans.xsd
    http://www.springframework.org/schema/integration
    https://www.springframework.org/schema/integration/spring-integration.xsd
    http://www.springframework.org/schema/integration/stomp
    https://www.springframework.org/schema/integration/stomp/spring-integration-stomp.xsd">
    ...
</beans>Understanding the <int-stomp:outbound-channel-adapter> Element
The following listing shows the available attributes for the STOMP outbound channel adapter:
<int-stomp:outbound-channel-adapter
                           id=""                      (1)
                           channel=""                 (2)
                           stomp-session-manager=""   (3)
                           header-mapper=""           (4)
                           mapped-headers=""          (5)
                           destination=""             (6)
                           destination-expression=""  (7)
                           auto-startup=""            (8)
                           phase=""/>                 (9)| 1 | The component bean name.
The MessageHandleris registered with a bean alias ofidplus.handler.
If you do not set thechannelattribute, aDirectChannelis created and registered in the application context with the value of thisidattribute as the bean name.
In this case, the endpoint is registered with a bean nameidplus.adapter. | 
| 2 | Identifies the channel attached to this adapter if idis present.
Seeid.
Optional. | 
| 3 | Reference to a StompSessionManagerbean, which encapsulates the low-level connection andStompSessionhandling operations.
Required. | 
| 4 | Reference to a bean that implements HeaderMapper<StompHeaders>, which maps Spring IntegrationMessageHeadersto and from
STOMP frame headers.
It is mutually exclusive withmapped-headers.
It defaults toStompHeaderMapper. | 
| 5 | Comma-separated list of names of STOMP Headers to be mapped to the STOMP frame headers.
It can be provided only if the header-mapperreference is not set.
The values in this list can also be simple patterns to be matched against the header names (such asmyheader*or*myheader).
A special token (STOMP_OUTBOUND_HEADERS) represents all the standard STOMP headers (content-length, receipt, heart-beat, and so on).
They are included by default.
If you want to add your own headers and want the standard headers to also be mapped, you must include this token or provide your ownHeaderMapperimplementation by usingheader-mapper. | 
| 6 | Name of the destination to which STOMP Messages are sent.
It is mutually exclusive with the destination-expression. | 
| 7 | A SpEL expression to be evaluated at runtime against each Spring Integration Messageas the root object.
It is mutually exclusive with thedestination. | 
| 8 | Boolean value indicating whether this endpoint should start automatically.
It defaults to true. | 
| 9 | The lifecycle phase within which this endpoint should start and stop.
The lower the value, the earlier this endpoint starts and the later it stops.
The default is Integer.MIN_VALUE.
Values can be negative.
SeeSmartLifeCycle. | 
Understanding the <int-stomp:inbound-channel-adapter> Element
The following listing shows the available attributes for the STOMP inbound channel adapter:
<int-stomp:inbound-channel-adapter
                           id=""                     (1)
                           channel=""                (2)
                           error-channel=""          (3)
                           stomp-session-manager=""  (4)
                           header-mapper=""          (5)
                           mapped-headers=""         (6)
                           destinations=""           (7)
                           send-timeout=""           (8)
                           payload-type=""           (9)
                           auto-startup=""           (10)
                           phase=""/>                (11)| 1 | The component bean name.
If you do not set the channelattribute, aDirectChannelis created and registered in the application context with the value of thisidattribute as the bean name.
In this case, the endpoint is registered with the bean nameidplus.adapter. | 
| 2 | Identifies the channel attached to this adapter. | 
| 3 | The MessageChannelbean reference to whichErrorMessageinstances should be sent. | 
| 4 | See the same option on the <int-stomp:outbound-channel-adapter>. | 
| 5 | Comma-separated list of names of STOMP Headers to be mapped from the STOMP frame headers.
You can only provide this if the header-mapperreference is not set.
The values in this list can also be simple patterns to be matched against the header names (for example,myheader*or*myheader).
A special token (STOMP_INBOUND_HEADERS) represents all the standard STOMP headers (content-length, receipt, heart-beat, and so on).
They are included by default.
If you want to add your own headers and want the standard headers to also be mapped, you must also include this token or provide your ownHeaderMapperimplementation usingheader-mapper. | 
| 6 | See the same option on the <int-stomp:outbound-channel-adapter>. | 
| 7 | Comma-separated list of STOMP destination names to subscribe.
The list of destinations (and therefore subscriptions) can be modified at runtime through the addDestination()andremoveDestination()@ManagedOperationannotations. | 
| 8 | Maximum amount of time (in milliseconds) to wait when sending a message to the channel if the channel can block.
For example, a QueueChannelcan block until space is available if its maximum capacity has been reached. | 
| 9 | Fully qualified name of the Java type for the target payloadto convert from the incoming STOMP frame.
It defaults toString.class. | 
| 10 | See the same option on the <int-stomp:outbound-channel-adapter>. | 
| 11 | See the same option on the <int-stomp:outbound-channel-adapter>. |