This version is still in development and is not considered stable yet. For the latest stable version, please use Spring Integration 6.5.3!

AMQP 1.0 Support

Starting with version 7.0, Spring Integration provides channel adapters for RabbitMQ AMQP 1.0 support. These channel adapters are based on the org.springframework.amqp:spring-rabbitmq-client library.

The Spring AMQP documentation provides more details about RabbitMQ AMQP 1.0 support.

AMQP 1.0 Outbound Channel Adapters

The AmqpClientMessageHandler is an AbstractReplyProducingMessageHandler implementation and can act as a one-way channel adapter or as an outbound gateway depending on the setRequiresReply() configuration. The instance of this channel adapter requires an AsyncAmqpTemplate implementation for AMQP 1.0 protocol, e.g. RabbitAmqpTemplate from the mentioned above spring-rabbitmq-client library. This message handler is asynchronous by default; therefore, publication errors should be handled via errorChannel header in the request message or global default errorChannel in the application context.

The exchange to publish message (together with optional routingKey) is mutually exclusive with a queue to publish. If neither is provided, then AsyncAmqpTemplate implementation must ensure some defaults for those destination parts; otherwise the message is going to be rejected as not delivered.

By default, the MessageConverter is an org.springframework.amqp.support.converter.SimpleMessageConverter that handles String, Serializable instances, and byte arrays. Also, a default AmqpHeaderMapper is a DefaultAmqpHeaderMapper.outboundMapper(). This header mapper is also used for mapping AMQP message properties to headers back on the reply.

In a gateway mode, the replyPayloadType could be supplied to convert a reply message body. However, the MessageConverter has to be an implementation of the SmartMessageConverter like a JacksonJsonMessageConverter. Also, a mutually exclusive to the replyPayloadType, a returnMessage flag could be set to true to return the whole instance of org.springframework.amqp.core.Message as a reply message payload.

The following example demonstrates how to configure an AmqpClientMessageHandler as a simple @ServiceActivator:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow sendFlow(RabbitAmqpTemplate rabbitTemplate) {
    return f -> f
            .handle(AmqpClient.outboundAdapter(rabbitTemplate)
                    .exchange("e1")
                    .routingKeyExpression("'k1'"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
    integrationFlow {
                handle(AmqpClient.outboundAdapter(rabbitTemplate)
    		            .apply {
    		                exchange("e1")
                            routingKeyExpression("'k1'")
    		            }
    		    )
    }
@Bean
sendFlow() {
    integrationFlow {
        handle(AmqpClient.outboundAdapter(rabbitTemplate)
                .with {
                     exchange 'e1'
                     routingKeyExpression '''k1'''
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendChannel")
AmqpClientMessageHandler amqpClientMessageHandler(RabbitAmqpTemplate rabbitTemplate) {
    AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
    messageHandler.setExchangeExpressionString("headers[exchange]");
    messageHandler.setRoutingKeyExpressionString("headers[routingKey]");
    return messageHandler;
}

The gateway variant for the AmqpClientMessageHandler could be like:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow requestReplyOutboundFlow(RabbitAmqpTemplate rabbitTemplate) {
    return f -> f
            .handle(AmqpClient.outboundGateway(rabbitTemplate)
                    .queueFunction(m -> "requestReply"));
}
@Bean
fun sendFlow(rabbitTemplate: RabbitAmqpTemplate) =
    integrationFlow {
                handle(AmqpClient.outboundGateway(rabbitTemplate)
    		            .queueFunction { "requestReply" }
                )
    }
@Bean
sendFlow() {
    integrationFlow {
        handle(AmqpClient.outboundGateway(rabbitTemplate)
                .with {
                     queueFunction { 'requestReply' }
                }
        )
    }
}
@Bean
@ServiceActivator(inputChannel = "amqpClientSendAndReceiveChannel")
AmqpClientMessageHandler amqpClientGateway(RabbitAmqpTemplate rabbitTemplate) {
    AmqpClientMessageHandler messageHandler = new AmqpClientMessageHandler(rabbitTemplate);
    messageHandler.setRequiresReply(true);
    messageHandler.setReplyPayloadType(String.class);
    messageHandler.setMessageConverter(new JacksonJsonMessageConverter());
    messageHandler.setQueue("q1");
    return messageHandler;
}

AMQP 1.0 Message-Driver Channel Adapter

The AmqpClientMessageProducer is a MessageProducerSupport implementation as a Message-Driver Channel Adapter to consume messages from queues over RabbitMQ AMQP 1.0 protocol. It requires an AmqpConnectionFactory and at least one queue to consume. Its logic internally is based on the RabbitAmqpListenerContainer and IntegrationRabbitAmqpMessageListener to relay consumed AMQP messages (after conversion) to the outputChannel. Some of RabbitAmqpListenerContainer configuration options are exposed as setters from the AmqpClientMessageProducer.

By default, the MessageConverter is an org.springframework.amqp.support.converter.SimpleMessageConverter that handles String, Serializable instances, and byte arrays. Also, a default AmqpHeaderMapper is a DefaultAmqpHeaderMapper.inboundMapper(). The messageConverter option can be set to null to fully skip conversion (including header mapping), and return the received AMQP message as a payload of the Spring message to produce.

Also, the AmqpClientMessageProducer implements a Pausable contract and delegates to the respective RabbitAmqpListenerContainer API.

When AmqpClientMessageProducer.setBatchSize() > 1, this channel adapter works in a batch mode. In this case received messages are gathered until the batch size is fulfilled, or batchReceiveTimeout period is exhausted. All the batched AMQP messages then converted to Spring messages, and a result list is produced as a payload of a wrapping message to send to the outputChannel. The batch mode gives some performance gain due to the settlement for all the batched messages at once.

When autoSettle flag is set to false, the AcknowledgmentCallback instance is provided as an IntegrationMessageHeaderAccessor.ACKNOWLEDGMENT_CALLBACK message header to make settlement decision for the received message or the whole batch.

The following example demonstrates how to configure an AmqpClientMessageProducer as a simple inbound endpoint:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow receiveFlow(AmqpConnectionFactory connectionFactory) {
    return IntegrationFlow.from(AmqpClient.inboundChannelAdapter(connectionFactory, "q1"))
            .channel(c -> c.queue("receiveChannel"))
            .get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
        integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, "q1")) {
            channel("inputChannel")
        }
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
    integrationFlow(AmqpClient.inboundChannelAdapter(connectionFactory, 'q1')) {
        channel 'inputChannel'
    }
}
@Bean
AmqpClientMessageProducer batchAmqpClientMessageProducer(AmqpConnectionFactory connectionFactory,
        QueueChannel inputChannel) {

    AmqpClientMessageProducer amqpClientMessageProducer = new AmqpClientMessageProducer(connectionFactory, "q3");
    amqpClientMessageProducer.setOutputChannel(inputChannel);
    amqpClientMessageProducer.setBatchSize(2);
    return amqpClientMessageProducer;
}

AMQP 1.0 Inbound Gateway

The AmqpClientInboundGateway is a MessagingGatewaySupport implementation for receiving request and producing replies over RabbitMQ AMQP 1.0 protocol. It is similar to the AmqpClientMessageProducer mentioned above and share many RabbitAmqpListenerContainer configuration options. In addition, to produce AMQP 1.0 replies, the AmqpClientInboundGateway uses internally a RabbitAmqpTemplate.

For automatic replies correlation with their requests, a replyTo property of the request message must be supplied. For example, the RabbitAmqpTemplate.sendAndReceive() relies on the RpcClient from RabbitMQ AMQP 1.0 library which generates an exclusive and auto-deleted queue. Alternatively, the reply address could be set as a replyExchange(and optional replyRoutingKey) or replyQueue (but not both) on the AmqpClientInboundGateway which are delegated to the RabbitAmqpTemplate default options. The messageId or correlationId request message properties can be used for associating with replies. The RpcClient in the RabbitAmqpTemplate.sendAndReceive() generates one if missed. The AmqpClientInboundGateway is able to map back such a correlation key into a reply message.

The following example demonstrates how to configure an AmqpClientInboundGateway as a simple inbound gateway:

  • Java DSL

  • Kotlin DSL

  • Groovy DSL

  • Java

@Bean
IntegrationFlow amqpClientInboundGatewayFlow(AmqpConnectionFactory connectionFactory) {
    return IntegrationFlow.from(AmqpClient.inboundGateway(connectionFactory, "requestReply"))
            .channel(c -> c.queue("inputChannel"))
            .get();
}
@Bean
fun receiveFlow(connectionFactory: AmqpConnectionFactory) =
        integrationFlow(AmqpClient.inboundGateway(connectionFactory, "requestReply")) {
            channel { queue("inputChannel") }
        }
@Bean
receiveFlow(AmqpConnectionFactory connectionFactory) {
    integrationFlow(AmqpClient.inboundGateway(connectionFactory, 'requestReply')) {
        channel { queue 'inputChannel' }
    }
}
@Bean
AmqpClientInboundGateway amqpClientInboundGateway(AmqpConnectionFactory connectionFactory) {
    AmqpClientInboundGateway amqpClientInboundGateway = new AmqpClientInboundGateway(connectionFactory, "requestReply");
    amqpClientInboundGateway.setRequestChannelName("inputChannel");
    return amqpClientInboundGateway;
}