Adding Behavior to Endpoints
Prior to Spring Integration 2.2, you could add behavior to an entire Integration flow by adding an AOP Advice to a poller’s <advice-chain/> element.
However, suppose you want to retry, say, just a REST Web Service call, and not any downstream endpoints.
For example, consider the following flow:
inbound-adapter->poller->http-gateway1->http-gateway2->jdbc-outbound-adapterIf you configure some retry-logic into an advice chain on the poller and the call to http-gateway2 failed because of a network glitch, the retry causes both http-gateway1 and http-gateway2 to be called a second time.
Similarly, after a transient failure in the jdbc-outbound-adapter, both HTTP gateways are called a second time before again calling the jdbc-outbound-adapter.
Spring Integration 2.2 adds the ability to add behavior to individual endpoints.
This is achieved by the addition of the <request-handler-advice-chain/> element to many endpoints.
The following example shows how to the <request-handler-advice-chain/> element within an outbound-gateway:
<int-http:outbound-gateway id="withAdvice"
    url-expression="'http://localhost/test1'"
    request-channel="requests"
    reply-channel="nextChannel">
    <int-http:request-handler-advice-chain>
        <ref bean="myRetryAdvice" />
    </int-http:request-handler-advice-chain>
</int-http:outbound-gateway>In this case, myRetryAdvice is applied only locally to this gateway and does not apply to further actions taken downstream after the reply is sent to nextChannel.
The scope of the advice is limited to the endpoint itself.
| At this time, you cannot advise an entire  However, a  | 
Provided Advice Classes
In addition to providing the general mechanism to apply AOP advice classes, Spring Integration provides these out-of-the-box advice implementations:
- 
RequestHandlerRetryAdvice(described in Retry Advice)
- 
RequestHandlerCircuitBreakerAdvice(described in Circuit Breaker Advice)
- 
ExpressionEvaluatingRequestHandlerAdvice(described in Expression Evaluating Advice)
- 
RateLimiterRequestHandlerAdvice(described in Rate Limiter Advice)
- 
CacheRequestHandlerAdvice(described in Caching Advice)
- 
ReactiveRequestHandlerAdvice(described in Reactive Advice)
- 
ContextHolderRequestHandlerAdvice(described in Context Holder Advice)
Retry Advice
The retry advice (o.s.i.handler.advice.RequestHandlerRetryAdvice) leverages the rich retry mechanisms provided by the Spring Retry project.
The core component of spring-retry is the RetryTemplate, which allows configuration of sophisticated retry scenarios, including RetryPolicy and BackoffPolicy strategies (with a number of implementations) as well as a RecoveryCallback strategy to determine the action to take when retries are exhausted.
- Stateless Retry
- 
Stateless retry is the case where the retry activity is handled entirely within the advice. The thread pauses (if configured to do so) and retries the action. 
- Stateful Retry
- 
Stateful retry is the case where the retry state is managed within the advice but where an exception is thrown and the caller resubmits the request. An example for stateful retry is when we want the message originator (for example,JMS) to be responsible for resubmitting, rather than performing it on the current thread. Stateful retry needs some mechanism to detect a retried submission. 
For more information on spring-retry, see the project’s Javadoc and the reference documentation for Spring Batch, where spring-retry originated.
| The default back off behavior is to not back off. Retries are attempted immediately. Using a back off policy that causes threads to pause between attempts may cause performance issues, including excessive memory use and thread starvation. In high-volume environments, back off policies should be used with caution. | 
Configuring the Retry Advice
The examples in this section use the following <service-activator> that always throws an exception:
public class FailingService {
    public void service(String message) {
        throw new RuntimeException("error");
    }
}
- Simple Stateless Retry
- 
The default RetryTemplatehas aSimpleRetryPolicywhich tries three times. There is noBackOffPolicy, so the three attempts are made back-to-back-to-back with no delay between attempts. There is noRecoveryCallback, so the result is to throw the exception to the caller after the final failed retry occurs. In a Spring Integration environment, this final exception might be handled by using anerror-channelon the inbound endpoint. The following example usesRetryTemplateand shows itsDEBUGoutput:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"/> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3
- Simple Stateless Retry with Recovery
- 
The following example adds a RecoveryCallbackto the preceding example and uses anErrorMessageSendingRecovererto send anErrorMessageto a channel:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...] DEBUG [task-scheduler-2]Retry: count=0 DEBUG [task-scheduler-2]Checking for rethrow: count=1 DEBUG [task-scheduler-2]Retry: count=1 DEBUG [task-scheduler-2]Checking for rethrow: count=2 DEBUG [task-scheduler-2]Retry: count=2 DEBUG [task-scheduler-2]Checking for rethrow: count=3 DEBUG [task-scheduler-2]Retry failed last attempt: count=3 DEBUG [task-scheduler-2]Sending ErrorMessage :failedMessage:[Payload=...]
- Stateless Retry with Customized Policies, and Recovery
- 
For more sophistication, we can provide the advice with a customized RetryTemplate. This example continues to use theSimpleRetryPolicybut increases the attempts to four. It also adds anExponentialBackoffPolicywhere the first retry waits one second, the second waits five seconds and the third waits 25 (for four attempts in all). The following listing shows the example and itsDEBUGoutput:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> <property name="retryTemplate" ref="retryTemplate" /> </bean> </int:request-handler-advice-chain> </int:service-activator> <bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> <property name="retryPolicy"> <bean class="org.springframework.retry.policy.SimpleRetryPolicy"> <property name="maxAttempts" value="4" /> </bean> </property> <property name="backOffPolicy"> <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> <property name="initialInterval" value="1000" /> <property name="multiplier" value="5.0" /> <property name="maxInterval" value="60000" /> </bean> </property> </bean> 27.058 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...] 27.071 DEBUG [task-scheduler-1]Retry: count=0 27.080 DEBUG [task-scheduler-1]Sleeping for 1000 28.081 DEBUG [task-scheduler-1]Checking for rethrow: count=1 28.081 DEBUG [task-scheduler-1]Retry: count=1 28.081 DEBUG [task-scheduler-1]Sleeping for 5000 33.082 DEBUG [task-scheduler-1]Checking for rethrow: count=2 33.082 DEBUG [task-scheduler-1]Retry: count=2 33.083 DEBUG [task-scheduler-1]Sleeping for 25000 58.083 DEBUG [task-scheduler-1]Checking for rethrow: count=3 58.083 DEBUG [task-scheduler-1]Retry: count=3 58.084 DEBUG [task-scheduler-1]Checking for rethrow: count=4 58.084 DEBUG [task-scheduler-1]Retry failed last attempt: count=4 58.086 DEBUG [task-scheduler-1]Sending ErrorMessage :failedMessage:[Payload=...]
- Namespace Support for Stateless Retry
- 
Starting with version 4.0, the preceding configuration can be greatly simplified, thanks to the namespace support for the retry advice, as the following example shows: <int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <ref bean="retrier" /> </int:request-handler-advice-chain> </int:service-activator> <int:handler-retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:handler-retry-advice>In the preceding example, the advice is defined as a top-level bean so that it can be used in multiple request-handler-advice-chaininstances. You can also define the advice directly within the chain, as the following example shows:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <int:retry-advice id="retrier" max-attempts="4" recovery-channel="myErrorChannel"> <int:exponential-back-off initial="1000" multiplier="5.0" maximum="60000" /> </int:retry-advice> </int:request-handler-advice-chain> </int:service-activator>A <handler-retry-advice>can have a<fixed-back-off>or<exponential-back-off>child element or have no child element. A<handler-retry-advice>with no child element uses no back off. If there is norecovery-channel, the exception is thrown when retries are exhausted. The namespace can only be used with stateless retry.For more complex environments (custom policies etc.), use normal <bean>definitions.
- Simple Stateful Retry with Recovery
- 
To make retry stateful, we need to provide the advice with a RetryStateGeneratorimplementation. This class is used to identify a message as being a resubmission so that theRetryTemplatecan determine the current state of retry for this message. The framework provides aSpelExpressionRetryStateGenerator, which determines the message identifier by using a SpEL expression. This example again uses the default policies (three attempts with no back off). As with stateless retry, these policies can be customized. The following listing shows the example and itsDEBUGoutput:<int:service-activator input-channel="input" ref="failer" method="service"> <int:request-handler-advice-chain> <bean class="o.s.i.handler.advice.RequestHandlerRetryAdvice"> <property name="retryStateGenerator"> <bean class="o.s.i.handler.advice.SpelExpressionRetryStateGenerator"> <constructor-arg value="headers['jms_messageId']" /> </bean> </property> <property name="recoveryCallback"> <bean class="o.s.i.handler.advice.ErrorMessageSendingRecoverer"> <constructor-arg ref="myErrorChannel" /> </bean> </property> </bean> </int:request-handler-advice-chain> </int:service-activator> 24.351 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 24.368 DEBUG [Container#0-1]Retry: count=0 24.387 DEBUG [Container#0-1]Checking for rethrow: count=1 24.387 DEBUG [Container#0-1]Rethrow in retry for policy: count=1 24.387 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 24.391 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 25.412 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 25.412 DEBUG [Container#0-1]Retry: count=1 25.413 DEBUG [Container#0-1]Checking for rethrow: count=2 25.413 DEBUG [Container#0-1]Rethrow in retry for policy: count=2 25.413 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 25.414 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 26.418 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 26.418 DEBUG [Container#0-1]Retry: count=2 26.419 DEBUG [Container#0-1]Checking for rethrow: count=3 26.419 DEBUG [Container#0-1]Rethrow in retry for policy: count=3 26.419 WARN [Container#0-1]failure occurred in gateway sendAndReceive org.springframework.integration.MessagingException: Failed to invoke handler ... Caused by: java.lang.RuntimeException: foo ... 26.420 DEBUG [Container#0-1]Initiating transaction rollback on application exception ... 27.425 DEBUG [Container#0-1]preSend on channel 'input', message: [Payload=...] 27.426 DEBUG [Container#0-1]Retry failed last attempt: count=3 27.426 DEBUG [Container#0-1]Sending ErrorMessage :failedMessage:[Payload=...]If you compare the preceding example with the stateless examples, you can see that, with stateful retry, the exception is thrown to the caller on each failure. 
- Exception Classification for Retry
- 
Spring Retry has a great deal of flexibility for determining which exceptions can invoke retry. The default configuration retries for all exceptions and the exception classifier looks at the top-level exception. If you configure it to, say, retry only on MyExceptionand your application throws aSomeOtherExceptionwhere the cause is aMyException, retry does not occur.Since Spring Retry 1.0.3, the BinaryExceptionClassifierhas a property calledtraverseCauses(the default isfalse). Whentrue, it traverses exception causes until it finds a match or runs out of causes traversing.To use this classifier for retry, use a SimpleRetryPolicycreated with the constructor that takes the max attempts, theMapofExceptionobjects, and thetraverseCausesboolean. Then you can inject this policy into theRetryTemplate.
| traverseCausesis required in this case because user exceptions may be wrapped in aMessagingException. | 
Circuit Breaker Advice
The general idea of the circuit breaker pattern is that, if a service is not currently available, do not waste time (and resources) trying to use it.
The o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice implements this pattern.
When the circuit breaker is in the closed state, the endpoint attempts to invoke the service.
The circuit breaker goes to the open state if a certain number of consecutive attempts fail.
When it is in the open state, new requests “fail fast” and no attempt is made to invoke the service until some time has expired.
When that time has expired, the circuit breaker is set to the half-open state. When in this state, if even a single attempt fails, the breaker immediately goes to the open state. If the attempt succeeds, the breaker goes to the closed state, in which case it does not go to the open state again until the configured number of consecutive failures again occur. Any successful attempt resets the state to zero failures for the purpose of determining when the breaker might go to the open state again.
Typically, this advice might be used for external services, where it might take some time to fail (such as a timeout attempting to make a network connection).
The RequestHandlerCircuitBreakerAdvice has two properties: threshold and halfOpenAfter.
The threshold property represents the number of consecutive failures that need to occur before the breaker goes open.
It defaults to 5.
The halfOpenAfter property represents the time after the last failure that the breaker waits before attempting another request.
The default is 1000 milliseconds.
The following example configures a circuit breaker and shows its DEBUG and ERROR output:
<int:service-activator input-channel="input" ref="failer" method="service">
    <int:request-handler-advice-chain>
        <bean class="o.s.i.handler.advice.RequestHandlerCircuitBreakerAdvice">
            <property name="threshold" value="2" />
            <property name="halfOpenAfter" value="12000" />
        </bean>
    </int:request-handler-advice-chain>
</int:service-activator>
05.617 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=...]
05.638 ERROR [task-scheduler-1]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
10.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
10.600 ERROR [task-scheduler-2]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
15.598 DEBUG [task-scheduler-3]preSend on channel 'input', message: [Payload=...]
15.599 ERROR [task-scheduler-3]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
20.598 DEBUG [task-scheduler-2]preSend on channel 'input', message: [Payload=...]
20.598 ERROR [task-scheduler-2]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivator
...
25.598 DEBUG [task-scheduler-5]preSend on channel 'input', message: [Payload=...]
25.601 ERROR [task-scheduler-5]org.springframework.messaging.MessageHandlingException: java.lang.RuntimeException: foo
...
30.598 DEBUG [task-scheduler-1]preSend on channel 'input', message: [Payload=foo...]
30.599 ERROR [task-scheduler-1]org.springframework.messaging.MessagingException: Circuit Breaker is Open for ServiceActivatorIn the preceding example, the threshold is set to 2 and halfOpenAfter is set to 12 seconds.
A new request arrives every 5 seconds.
The first two attempts invoked the service.
The third and fourth failed with an exception indicating that the circuit breaker is open.
The fifth request was attempted because the request was 15 seconds after the last failure.
The sixth attempt fails immediately because the breaker immediately went to open.
Expression Evaluating Advice
The final supplied advice class is the o.s.i.handler.advice.ExpressionEvaluatingRequestHandlerAdvice.
This advice is more general than the other two advices.
It provides a mechanism to evaluate an expression on the original inbound message sent to the endpoint.
Separate expressions are available to be evaluated, after either success or failure.
Optionally, a message containing the evaluation result, together with the input message, can be sent to a message channel.
A typical use case for this advice might be with an <ftp:outbound-channel-adapter/>, perhaps to move the file to one directory if the transfer was successful or to another directory if it fails:
The advice has properties to set an expression when successful, an expression for failures, and corresponding channels for each.
For the successful case, the message sent to the successChannel is an AdviceMessage, with the payload being the result of the expression evaluation.
An additional property, called inputMessage, contains the original message sent to the handler.
A message sent to the failureChannel (when the handler throws an exception) is an ErrorMessage with a payload of MessageHandlingExpressionEvaluatingAdviceException.
Like all MessagingException instances, this payload has failedMessage and cause properties, as well as an additional property called evaluationResult, which contains the result of the expression evaluation.
| Starting with version 5.1.3, if channels are configured, but expressions are not provided, the default expression is used to evaluate to the payloadof the message. | 
When an exception is thrown in the scope of the advice, by default, that exception is thrown to the caller after any failureExpression is evaluated.
If you wish to suppress throwing the exception, set the trapException property to true.
The following advice shows how to configure an advice with Java DSL:
@SpringBootApplication
public class EerhaApplication {
    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(EerhaApplication.class, args);
        MessageChannel in = context.getBean("advised.input", MessageChannel.class);
        in.send(new GenericMessage<>("good"));
        in.send(new GenericMessage<>("bad"));
        context.close();
    }
    @Bean
    public IntegrationFlow advised() {
        return f -> f.<String>handle((payload, headers) -> {
            if (payload.equals("good")) {
                return null;
            }
            else {
                throw new RuntimeException("some failure");
            }
        }, c -> c.advice(expressionAdvice()));
    }
    @Bean
    public Advice expressionAdvice() {
        ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
        advice.setSuccessChannelName("success.input");
        advice.setOnSuccessExpressionString("payload + ' was successful'");
        advice.setFailureChannelName("failure.input");
        advice.setOnFailureExpressionString(
                "payload + ' was bad, with reason: ' + #exception.cause.message");
        advice.setTrapException(true);
        return advice;
    }
    @Bean
    public IntegrationFlow success() {
        return f -> f.handle(System.out::println);
    }
    @Bean
    public IntegrationFlow failure() {
        return f -> f.handle(System.out::println);
    }
}
Rate Limiter Advice
The Rate Limiter advice (RateLimiterRequestHandlerAdvice) allows to ensure that an endpoint does not get overloaded with requests.
When the rate limit is breached the request will go in a blocked state.
A typical use case for this advice might be an external service provider not allowing more than n number of request per minute.
The RateLimiterRequestHandlerAdvice implementation is fully based on the Resilience4j project and requires either RateLimiter or RateLimiterConfig injections.
Can also be configured with defaults and/or custom name.
The following example configures a rate limiter advice with one request per 1 second:
@Bean
public RateLimiterRequestHandlerAdvice rateLimiterRequestHandlerAdvice() {
    return new RateLimiterRequestHandlerAdvice(RateLimiterConfig.custom()
            .limitRefreshPeriod(Duration.ofSeconds(1))
            .limitForPeriod(1)
            .build());
}
@ServiceActivator(inputChannel = "requestChannel", outputChannel = "resultChannel",
		adviceChain = "rateLimiterRequestHandlerAdvice")
public String handleRequest(String payload) {
    ...
}
Caching Advice
Starting with version 5.2, the CacheRequestHandlerAdvice has been introduced.
It is based on the caching abstraction in Spring Framework and aligned with the concepts and functionality provided by the @Caching annotation family.
The logic internally is based on the CacheAspectSupport extension, where proxying for caching operations is done around the AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage method with the request Message<?> as the argument.
This advice can be configured with a SpEL expression or a Function to evaluate a cache key.
The request Message<?> is available as the root object for the SpEL evaluation context, or as the Function input argument.
By default, the payload of the request message is used for the cache key.
The CacheRequestHandlerAdvice must be configured with cacheNames, when a default cache operation is a CacheableOperation, or with a set of any arbitrary CacheOperation s.
Every CacheOperation can be configured separately or have shared options, like a CacheManager, CacheResolver and CacheErrorHandler, can be reused from the CacheRequestHandlerAdvice configuration.
This configuration functionality is similar to Spring Framework’s @CacheConfig and @Caching annotation combination.
If a CacheManager is not provided, a single bean is resolved by default from the BeanFactory in the CacheAspectSupport.
The following example configures two advices with different set of caching operations:
@Bean
public CacheRequestHandlerAdvice cacheAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice(TEST_CACHE);
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    return cacheRequestHandlerAdvice;
}
@Transformer(inputChannel = "transformerChannel", outputChannel = "nullChannel", adviceChain = "cacheAdvice")
public Object transform(Message<?> message) {
    ...
}
@Bean
public CacheRequestHandlerAdvice cachePutAndEvictAdvice() {
    CacheRequestHandlerAdvice cacheRequestHandlerAdvice = new CacheRequestHandlerAdvice();
    cacheRequestHandlerAdvice.setKeyExpressionString("payload");
    CachePutOperation.Builder cachePutBuilder = new CachePutOperation.Builder();
    cachePutBuilder.setCacheName(TEST_PUT_CACHE);
    CacheEvictOperation.Builder cacheEvictBuilder = new CacheEvictOperation.Builder();
    cacheEvictBuilder.setCacheName(TEST_CACHE);
    cacheRequestHandlerAdvice.setCacheOperations(cachePutBuilder.build(), cacheEvictBuilder.build());
    return cacheRequestHandlerAdvice;
}
@ServiceActivator(inputChannel = "serviceChannel", outputChannel = "nullChannel",
    adviceChain = "cachePutAndEvictAdvice")
public Message<?> service(Message<?> message) {
    ...
}
Reactive Advice
Starting with version 5.3, a ReactiveRequestHandlerAdvice can be used for request message handlers producing a Mono replies.
A BiFunction<Message<?>, Mono<?>, Publisher<?>> has to be provided for this advice and it is called from the Mono.transform() operator on a reply produced by the intercepted handleRequestMessage() method implementation.
Typically, such a Mono customization is necessary when we would like to control network fluctuations via timeout(), retry() and similar support operators.
For example when we can an HTTP request over WebFlux client, we could use below configuration to not wait for response more than 5 seconds:
.handle(WebFlux.outboundGateway("https://somehost/"),
                       e -> e.customizeMonoReply((message, mono) -> mono.timeout(Duration.ofSeconds(5))));
The message argument is the request message for the message handler and can be used to determine request-scope attributes.
The mono argument is the result of this message handler’s handleRequestMessage() method implementation.
A nested Mono.transform() can also be called from this function to apply, for example, a Reactive Circuit Breaker.
Context Holder Advice
Starting with version 6.1, the ContextHolderRequestHandlerAdvice has been introduced.
This advice takes some value from the request message as and stores it in the context holder.
The value is clear from the context when an execution is finished on the target MessageHandler.
The best way to think about this advice is similar to the programming flow where we store some value into a ThreadLocal, get access to it from the target call and then clean up the ThreadLocal after execution.
The ContextHolderRequestHandlerAdvice requires these constructor arguments: a Function<Message<?>, Object> as a value provider, Consumer<Object> as a context set callback and Runnable as a context clean up hook.
Following is a sample how a ContextHolderRequestHandlerAdvice can be used in combination with a o.s.i.file.remote.session.DelegatingSessionFactory:
@Bean
DelegatingSessionFactory<?> dsf(SessionFactory<?> one, SessionFactory<?> two) {
    return new DelegatingSessionFactory<>(Map.of("one", one, "two", two), null);
}
@Bean
ContextHolderRequestHandlerAdvice contextHolderRequestHandlerAdvice(DelegatingSessionFactory<String> dsf) {
    return new ContextHolderRequestHandlerAdvice(message -> message.getHeaders().get("FACTORY_KEY"),
                                      dsf::setThreadKey, dsf::clearThreadKey);
}
@ServiceActivator(inputChannel = "in", adviceChain = "contextHolderRequestHandlerAdvice")
FtpOutboundGateway ftpOutboundGateway(DelegatingSessionFactory<?> sessionFactory) {
	return new FtpOutboundGateway(sessionFactory, "ls", "payload");
}
And it is just enough to send a message to the in channel with a FACTORY_KEY header set to either one or two.
The ContextHolderRequestHandlerAdvice sets the value from that header into a DelegatingSessionFactory via its setThreadKey.
Then when FtpOutboundGateway executes an ls command a proper delegating SessionFactory is chosen from the DelegatingSessionFactory according to the value in its ThreadLocal.
When the result is produced from the FtpOutboundGateway, a ThreadLocal value in the DelegatingSessionFactory is cleared according to the clearThreadKey() call from the ContextHolderRequestHandlerAdvice.
See Delegating Session Factory for more information.
Custom Advice Classes
In addition to the provided advice classes described earlier, you can implement your own advice classes.
While you can provide any implementation of org.aopalliance.aop.Advice (usually org.aopalliance.intercept.MethodInterceptor), we generally recommend that you subclass o.s.i.handler.advice.AbstractRequestHandlerAdvice.
This has the benefit of avoiding the writing of low-level aspect-oriented programming code as well as providing a starting point that is specifically tailored for use in this environment.
Subclasses need to implement the doInvoke() method, the definition of which follows:
/**
 * Subclasses implement this method to apply behavior to the {@link MessageHandler} callback.execute()
 * invokes the handler method and returns its result, or null).
 * @param callback Subclasses invoke the execute() method on this interface to invoke the handler method.
 * @param target The target handler.
 * @param message The message that will be sent to the handler.
 * @return the result after invoking the {@link MessageHandler}.
 * @throws Exception
 */
protected abstract Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception;
The callback parameter is a convenience to avoid subclasses that deal with AOP directly.
Invoking the callback.execute() method invokes the message handler.
The target parameter is provided for those subclasses that need to maintain state for a specific handler, perhaps by maintaining that state in a Map keyed by the target.
This feature allows the same advice to be applied to multiple handlers.
The RequestHandlerCircuitBreakerAdvice uses advice this to keep circuit breaker state for each handler.
The message parameter is the message sent to the handler.
While the advice cannot modify the message before invoking the handler, it can modify the payload (if it has mutable properties).
Typically, an advice would use the message for logging or to send a copy of the message somewhere before or after invoking the handler.
The return value would normally be the value returned by callback.execute().
However, the advice does have the ability to modify the return value.
Note that only AbstractReplyProducingMessageHandler instances return values.
The following example shows a custom advice class that extends AbstractRequestHandlerAdvice:
public class MyAdvice extends AbstractRequestHandlerAdvice {
    @Override
    protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) throws Exception {
        // add code before the invocation
        Object result = callback.execute();
        // add code after the invocation
        return result;
    }
}
| In addition to the  For more information, see the ReflectiveMethodInvocation Javadoc. | 
Other Advice Chain Elements
While the abstract class mentioned above is a convenience, you can add any Advice, including a transaction advice, to the chain.
Handling Message Advice
As discussed in the introduction to this section, advice objects in a request handler advice chain are applied to just the current endpoint, not the downstream flow (if any).
For MessageHandler objects that produce a reply (such as those that extend AbstractReplyProducingMessageHandler), the advice is applied to an internal method: handleRequestMessage() (called from MessageHandler.handleMessage()).
For other message handlers, the advice is applied to MessageHandler.handleMessage().
There are some circumstances where, even if a message handler is an AbstractReplyProducingMessageHandler, the advice must be applied to the handleMessage method.
For example, the idempotent receiver might return null, which would cause an exception if the handler’s replyRequired property is set to true.
Another example is the BoundRabbitChannelAdvice — see Strict Message Ordering.
Starting with version 4.3.1, a new HandleMessageAdvice interface and its base implementation (AbstractHandleMessageAdvice) have been introduced.
Advice objects that implement HandleMessageAdvice are always applied to the handleMessage() method, regardless of the handler type.
It is important to understand that HandleMessageAdvice implementations (such as idempotent receiver), when applied to a handlers that return responses, are dissociated from the adviceChain and properly applied to the MessageHandler.handleMessage() method.
| Because of this disassociation, the advice chain order is not honored. | 
Consider the following configuration:
<some-reply-producing-endpoint ... >
    <int:request-handler-advice-chain>
        <tx:advice ... />
        <ref bean="myHandleMessageAdvice" />
    </int:request-handler-advice-chain>
</some-reply-producing-endpoint>In the preceding example, the <tx:advice> is applied to the AbstractReplyProducingMessageHandler.handleRequestMessage().
However, myHandleMessageAdvice is applied for to MessageHandler.handleMessage().
Therefore, it is invoked before the <tx:advice>.
To retain the order, you should follow the standard Spring AOP configuration approach and use an endpoint id together with the .handler suffix to obtain the target MessageHandler bean.
Note that, in that case, the entire downstream flow is within the transaction scope.
In the case of a MessageHandler that does not return a response, the advice chain order is retained.
Starting with version 5.3, the HandleMessageAdviceAdapter is provided to apply any MethodInterceptor for the MessageHandler.handleMessage() method and, therefore, the whole sub-flow.
For example, a RetryOperationsInterceptor could be applied to the whole sub-flow starting from some endpoint; this is not possible, by default, because the consumer endpoint applies advices only to the AbstractReplyProducingMessageHandler.RequestHandler.handleRequestMessage().
Transaction Support
Starting with version 5.0, a new TransactionHandleMessageAdvice has been introduced to make the whole downstream flow transactional, thanks to the HandleMessageAdvice implementation.
When a regular TransactionInterceptor is used in the <request-handler-advice-chain> element (for example, through configuring <tx:advice>), a started transaction is applied only for an internal AbstractReplyProducingMessageHandler.handleRequestMessage() and is not propagated to the downstream flow.
To simplify XML configuration, along with the <request-handler-advice-chain>, a <transactional> element has been added to all <outbound-gateway> and <service-activator> and related components.
The following example shows <transactional> in use:
<int-jdbc:outbound-gateway query="select * from things where id=:headers[id]">
        <int-jdbc:transactional/>
</int-jdbc:outbound-gateway>
<bean id="transactionManager" class="org.mockito.Mockito" factory-method="mock">
    <constructor-arg value="org.springframework.transaction.TransactionManager"/>
</bean>If you are familiar with the JPA integration components, such a configuration is not new, but now we can start a transaction from any point in our flow — not only from the <poller> or a message-driven channel adapter such as JMS.
Java configuration can be simplified by using the TransactionInterceptorBuilder, and the result bean name can be used in the messaging annotations adviceChain attribute, as the following example shows:
@Bean
public ConcurrentMetadataStore store() {
    return new SimpleMetadataStore(hazelcastInstance()
                       .getMap("idempotentReceiverMetadataStore"));
}
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
    return new IdempotentReceiverInterceptor(
            new MetadataStoreSelector(
                    message -> message.getPayload().toString(),
                    message -> message.getPayload().toString().toUpperCase(), store()));
}
@Bean
public TransactionInterceptor transactionInterceptor() {
    return new TransactionInterceptorBuilder(true)
                .transactionManager(this.transactionManager)
                .isolation(Isolation.READ_COMMITTED)
                .propagation(Propagation.REQUIRES_NEW)
                .build();
}
@Bean
@org.springframework.integration.annotation.Transformer(inputChannel = "input",
         outputChannel = "output",
         adviceChain = { "idempotentReceiverInterceptor",
                 "transactionInterceptor" })
public Transformer transformer() {
    return message -> message;
}
Note the true parameter on the TransactionInterceptorBuilder constructor.
It causes the creation of a TransactionHandleMessageAdvice, not a regular TransactionInterceptor.
Java DSL supports an Advice through the .transactional() options on the endpoint configuration, as the following example shows:
@Bean
public IntegrationFlow updatingGatewayFlow() {
    return f -> f
        .handle(Jpa.updatingGateway(this.entityManagerFactory),
                e -> e.transactional(true))
        .channel(c -> c.queue("persistResults"));
}
Advising Filters
There is an additional consideration when advising Filter advices.
By default, any discard actions (when the filter returns false) are performed within the scope of the advice chain.
This could include all the flow downstream of the discard channel.
So, for example, if an element downstream of the discard channel throws an exception and there is a retry advice, the process is retried.
Also, if throwExceptionOnRejection is set to true (the exception is thrown within the scope of the advice).
Setting discard-within-advice to false modifies this behavior and the discard (or exception) occurs after the advice chain is called.
Advising Endpoints Using Annotations
When configuring certain endpoints by using annotations (@Filter, @ServiceActivator, @Splitter, and @Transformer), you can supply a bean name for the advice chain in the adviceChain attribute.
In addition, the @Filter annotation also has the discardWithinAdvice attribute, which can be used to configure the discard behavior, as discussed in Advising Filters.
The following example causes the discard to be performed after the advice:
@MessageEndpoint
public class MyAdvisedFilter {
    @Filter(inputChannel="input", outputChannel="output",
            adviceChain="adviceChain", discardWithinAdvice="false")
    public boolean filter(String s) {
        return s.contains("good");
    }
}
Ordering Advices within Advice Chain
Advice classes are “around” advices and are applied in a nested fashion. The first advice is the outermost, while the last advice is the innermost (that is, closest to the handler being advised). It is important to put the advice classes in the correct order to achieve the functionality you desire.
For example, suppose you want to add a retry advice and a transaction advice.
You may want to place the retry advice first, followed by the transaction advice.
Consequently, each retry is performed in a new transaction.
On the other hand, if you want all the attempts and any recovery operations (in the retry RecoveryCallback) to be scoped within the transaction, you could put the transaction advice first.
Advised Handler Properties
Sometimes, it is useful to access handler properties from within the advice.
For example, most handlers implement NamedComponent to let you access the component name.
The target object can be accessed through the target argument (when subclassing AbstractRequestHandlerAdvice) or invocation.getThis() (when implementing org.aopalliance.intercept.MethodInterceptor).
When the entire handler is advised (such as when the handler does not produce replies or the advice implements HandleMessageAdvice), you can cast the target object to an interface, such as NamedComponent, as shown in the following example:
String componentName = ((NamedComponent) target).getComponentName();
When you implement MethodInterceptor directly, you could cast the target object as follows:
String componentName = ((NamedComponent) invocation.getThis()).getComponentName();
When only the handleRequestMessage() method is advised (in a reply-producing handler), you need to access the full handler, which is an AbstractReplyProducingMessageHandler.
The following example shows how to do so:
AbstractReplyProducingMessageHandler handler =
    ((AbstractReplyProducingMessageHandler.RequestHandler) target).getAdvisedHandler();
String componentName = handler.getComponentName();
Idempotent Receiver Enterprise Integration Pattern
Starting with version 4.1, Spring Integration provides an implementation of the Idempotent Receiver Enterprise Integration Pattern.
It is a functional pattern and the whole idempotency logic should be implemented in the application.
However, to simplify the decision-making, the IdempotentReceiverInterceptor component is provided.
This is an AOP Advice that is applied to the MessageHandler.handleMessage() method and that can filter a request message or mark it as a duplicate, according to its configuration.
Previously, you could have implemented this pattern by using a custom MessageSelector in a <filter/> (see Filter), for example.
However, since this pattern really defines the behavior of an endpoint rather than being an endpoint itself, the idempotent receiver implementation does not provide an endpoint component.
Rather, it is applied to endpoints declared in the application.
The logic of the IdempotentReceiverInterceptor is based on the provided MessageSelector and, if the message is not accepted by that selector, it is enriched with the duplicateMessage header set to true.
The target MessageHandler (or downstream flow) can consult this header to implement the correct idempotency logic.
If the IdempotentReceiverInterceptor is configured with a discardChannel or throwExceptionOnRejection = true, the duplicate message is not sent to the target MessageHandler.handleMessage().
Rather, it is discarded.
If you want to discard (do nothing with) the duplicate message, the discardChannel should be configured with a NullChannel, such as the default nullChannel bean.
To maintain state between messages and provide the ability to compare messages for the idempotency, we provide the MetadataStoreSelector.
It accepts a MessageProcessor implementation (which creates a lookup key based on the Message) and an optional ConcurrentMetadataStore (Metadata Store).
See the MetadataStoreSelector Javadoc for more information.
You can also customize the value for ConcurrentMetadataStore by using an additional MessageProcessor.
By default, MetadataStoreSelector uses the timestamp message header.
Normally, the selector selects a message for acceptance if there is no existing value for the key.
In some cases, it is useful to compare the current and new values for a key, to determine whether the message should be accepted.
Starting with version 5.3, the compareValues property is provided which references a BiPredicate<String, String>; the first parameter is the old value; return true to accept the message and replace the old value with the new value in the MetadataStore.
This can be useful to reduce the number of keys; for example, when processing lines in a file, you can store the file name in the key and the current line number in the value.
Then, after a restart, you can skip lines that have already been processed.
See Idempotent Downstream Processing a Split File for an example.
For convenience, the MetadataStoreSelector options are configurable directly on the <idempotent-receiver> component.
The following listing shows all the possible attributes:
<idempotent-receiver
        id=""  (1)
        endpoint=""  (2)
        selector=""  (3)
        discard-channel=""  (4)
        metadata-store=""  (5)
        key-strategy=""  (6)
        key-expression=""  (7)
        value-strategy=""  (8)
        value-expression=""  (9)
        compare-values="" (10)
        throw-exception-on-rejection="" />  (11)| 1 | The ID of the IdempotentReceiverInterceptorbean.
Optional. | 
| 2 | Consumer endpoint name(s) or pattern(s) to which this interceptor is applied.
Separate names (patterns) with commas ( ,), such asendpoint="aaa, bbb*, ccc, *ddd, eee*fff".
Endpoint bean names matching these patterns are then used to retrieve the target endpoint’sMessageHandlerbean (using its.handlersuffix), and theIdempotentReceiverInterceptoris applied to those beans.
Required. | 
| 3 | A MessageSelectorbean reference.
Mutually exclusive withmetadata-storeandkey-strategy (key-expression).
Whenselectoris not provided, one ofkey-strategyorkey-strategy-expressionis required. | 
| 4 | Identifies the channel to which to send a message when the IdempotentReceiverInterceptordoes not accept it.
When omitted, duplicate messages are forwarded to the handler with aduplicateMessageheader.
Optional. | 
| 5 | A ConcurrentMetadataStorereference.
Used by the underlyingMetadataStoreSelector.
Mutually exclusive withselector.
Optional.
The defaultMetadataStoreSelectoruses an internalSimpleMetadataStorethat does not maintain state across application executions. | 
| 6 | A MessageProcessorreference.
Used by the underlyingMetadataStoreSelector.
Evaluates anidempotentKeyfrom the request message.
Mutually exclusive withselectorandkey-expression.
When aselectoris not provided, one ofkey-strategyorkey-strategy-expressionis required. | 
| 7 | A SpEL expression to populate an ExpressionEvaluatingMessageProcessor.
Used by the underlyingMetadataStoreSelector.
Evaluates anidempotentKeyby using the request message as the evaluation context root object.
Mutually exclusive withselectorandkey-strategy.
When aselectoris not provided, one ofkey-strategyorkey-strategy-expressionis required. | 
| 8 | A MessageProcessorreference.
Used by the underlyingMetadataStoreSelector.
Evaluates avaluefor theidempotentKeyfrom the request message.
Mutually exclusive withselectorandvalue-expression.
By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the Metadata 'value'. | 
| 9 | A SpEL expression to populate an ExpressionEvaluatingMessageProcessor.
Used by the underlyingMetadataStoreSelector.
Evaluates avaluefor theidempotentKeyby using the request message as the evaluation context root object.
Mutually exclusive withselectorandvalue-strategy.
By default, the 'MetadataStoreSelector' uses the 'timestamp' message header as the metadata 'value'. | 
| 10 | A reference to a BiPredicate<String, String>bean which allows you to optionally select a message by comparing the old and new values for the key;nullby default. | 
| 11 | Whether to throw an exception if the IdempotentReceiverInterceptorrejects the message.
Defaults tofalse.
It is applied regardless of whether or not adiscard-channelis provided. | 
For Java configuration, Spring Integration provides the method-level @IdempotentReceiver annotation.
It is used to mark a method that has a messaging annotation (@ServiceActivator, @Router, and others) to specify which `IdempotentReceiverInterceptor objects are applied to this endpoint.
The following example shows how to use the @IdempotentReceiver annotation:
@Bean
public IdempotentReceiverInterceptor idempotentReceiverInterceptor() {
   return new IdempotentReceiverInterceptor(new MetadataStoreSelector(m ->
                                                    m.getHeaders().get(INVOICE_NBR_HEADER)));
}
@Bean
@ServiceActivator(inputChannel = "input", outputChannel = "output")
@IdempotentReceiver("idempotentReceiverInterceptor")
public MessageHandler myService() {
    ....
}
When you use the Java DSL, you can add the interceptor to the endpoint’s advice chain, as the following example shows:
@Bean
public IntegrationFlow flow() {
    ...
        .handle("someBean", "someMethod",
            e -> e.advice(idempotentReceiverInterceptor()))
    ...
}
| The IdempotentReceiverInterceptoris designed only for theMessageHandler.handleMessage(Message<?>)method.
Starting with version 4.3.1, it implementsHandleMessageAdvice, with theAbstractHandleMessageAdviceas a base class, for better dissociation.
See Handling Message Advice for more information. |