Class DelayHandler
- All Implemented Interfaces:
EventListener,org.reactivestreams.Subscriber<Message<?>>,Aware,BeanClassLoaderAware,BeanFactoryAware,BeanNameAware,DisposableBean,InitializingBean,ApplicationContextAware,ApplicationListener<ContextRefreshedEvent>,Ordered,ComponentSourceAware,ExpressionCapable,Orderable,MessageProducer,DelayHandlerManagement,HeaderPropagationAware,IntegrationPattern,NamedComponent,IntegrationManagement,TrackableComponent,MessageHandler,reactor.core.CoreSubscriber<Message<?>>
MessageHandler that is capable of delaying the continuation of a Message flow
based on the result of evaluation delayExpression on an inbound Message
or a default delay value configured on this handler. Note that the continuation of the
flow is delegated to a TaskScheduler, and therefore, the calling thread does
not block. The advantage of this approach is that many delays can be managed
concurrently, even very long delays, without producing a buildup of blocked Threads.
One thing to keep in mind, however, is that any active transactional context will not propagate from the original sender to the eventual recipient. This is a side effect of passing the Message to the output channel after the delay with a different Thread in control.
When this handler's delayExpression property is configured, that evaluation
result value will take precedence over the handler's defaultDelay value. The
actual evaluation result value may be a long, a String that can be parsed as a long, or
a Date. If it is a long, it will be interpreted as the length of time to delay in
milliseconds counting from the current time (e.g. a value of 5000 indicates that the
Message can be released as soon as five seconds from the current time). If the value is
a Date, it will be delayed at least until that Date occurs (i.e. the delay in that case
is equivalent to headerDate.getTime() - new Date().getTime()).
Delayed messages are stored in the MessageGroupStore as a dedicated group.
If an external persistent store is provided, those delayed messages will be rescheduled
after application startup.
The messageGroupId is required option and must be unique for each delayer
configuration to avoid work-stealing from the store and unexpected releases.
Different instances of the same delayer can point to the same message group in the store.
The messageGroupId cannot rely on a bean name which might be generated.
After application restart the bean may get a different generated name and its delayed
messages might be lost from reschedule since its group is not managed
by the application anymore.
- Since:
- 1.0.3
- Author:
- Mark Fisher, Artem Bilan, Gary Russell, Christian Tzolov, Youbin Wu
-
Nested Class Summary
Nested ClassesNested classes/interfaces inherited from class org.springframework.integration.handler.AbstractReplyProducingMessageHandler
AbstractReplyProducingMessageHandler.RequestHandlerNested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides -
Field Summary
FieldsModifier and TypeFieldDescriptionstatic final intstatic final longFields inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
messagingTemplateFields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.core.Ordered
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE -
Constructor Summary
ConstructorsConstructorDescriptionConstruct an instance with default options.DelayHandler(String messageGroupId) Create a DelayHandler with the given 'messageGroupId' that is used as 'key' forMessageGroupto store delayed Messages in theMessageGroupStore.DelayHandler(String messageGroupId, TaskScheduler taskScheduler) Create a DelayHandler with the given default delay. -
Method Summary
Modifier and TypeMethodDescriptionprotected voiddoInit()Subclasses may implement this method to provide component type information.intReturn a pattern type this component implements.protected ObjecthandleRequestMessage(Message<?> requestMessage) Check if 'requestMessage' wasn't delayed before (releaseMessageAfterDelay(org.springframework.messaging.Message<?>, long)andDelayHandler.DelayedMessageWrapper).voidHandleContextRefreshedEventto invokereschedulePersistedMessages()as late as possible after application context startup.protected voidrescheduleAt(Message<?> message, Date startTime) voidUsed for reading persisted Messages in the 'messageStore' to reschedule them e.g.voidsetDefaultDelay(long defaultDelay) Set the default delay in milliseconds.voidsetDelayedAdviceChain(List<Advice> delayedAdviceChain) Specify theList<Advice>to adviseDelayHandler.ReleaseMessageHandlerproxy.voidsetDelayedMessageErrorChannel(MessageChannel delayedMessageErrorChannel) Set a message channel to which anErrorMessagewill be sent if sending the released message fails.voidsetDelayedMessageErrorChannelName(String delayedMessageErrorChannelName) Set a message channel name to which anErrorMessagewill be sent if sending the released message fails.voidsetDelayExpression(Expression delayExpression) Specify theExpressionthat should be checked for a delay period (in milliseconds) or a Date to delay until.voidsetDelayExpressionString(String delayExpression) Specify theExpressionthat should be checked for a delay period (in milliseconds) or a Date to delay until.voidsetIgnoreExpressionFailures(boolean ignoreExpressionFailures) Specify whetherExceptionsthrown bydelayExpressionevaluation should be ignored (only logged).voidsetMaxAttempts(int maxAttempts) Set the maximum number of release attempts for when message release fails.voidsetMessageGroupId(String messageGroupId) Set a group id to manage delayed messages by this handler.voidsetMessageStore(MessageGroupStore messageStore) Specify theMessageGroupStorethat should be used to store Messages while awaiting the delay.voidsetRetryDelay(long retryDelay) Set an additional delay to apply when retrying after a release failure.protected booleanSubclasses may override this.Methods inherited from class org.springframework.integration.handler.AbstractReplyProducingMessageHandler
doInvokeAdvisedRequestHandler, getBeanClassLoader, getRequiresReply, handleMessageInternal, hasAdviceChain, onInit, setAdviceChain, setBeanClassLoader, setRequiresReplyMethods inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, setupMessageProcessor, shouldSplitOutput, updateNotPropagatedHeadersMethods inherited from class org.springframework.integration.handler.AbstractMessageHandler
handleMessage, onComplete, onError, onNext, onSubscribe, setObservationConventionMethods inherited from class org.springframework.integration.handler.MessageHandlerSupport
buildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getObservationRegistry, getOrder, getOverrides, isLoggingEnabled, isObserved, registerMetricsCaptor, registerObservationRegistry, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrackMethods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentDescription, getComponentName, getComponentSource, getConversionService, getExpression, getIntegrationProperties, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentDescription, setComponentName, setComponentSource, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.context.ApplicationListener
supportsAsyncExecutionMethods inherited from interface reactor.core.CoreSubscriber
currentContextMethods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAsMethods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Field Details
-
DEFAULT_MAX_ATTEMPTS
public static final int DEFAULT_MAX_ATTEMPTS- See Also:
-
DEFAULT_RETRY_DELAY
public static final long DEFAULT_RETRY_DELAY- See Also:
-
-
Constructor Details
-
DelayHandler
public DelayHandler()Construct an instance with default options. ThemessageGroupIdmust then be provided via the setter.- Since:
- 6.2
-
DelayHandler
Create a DelayHandler with the given 'messageGroupId' that is used as 'key' forMessageGroupto store delayed Messages in theMessageGroupStore. The sending of Messages after the delay will be handled by registered in the ApplicationContext defaultThreadPoolTaskScheduler.- Parameters:
messageGroupId- The message group identifier.- See Also:
-
DelayHandler
Create a DelayHandler with the given default delay. The sending of Messages after the delay will be handled by the providedTaskScheduler.- Parameters:
messageGroupId- The message group identifier.taskScheduler- A task scheduler.
-
-
Method Details
-
setMessageGroupId
Set a group id to manage delayed messages by this handler. Required.- Parameters:
messageGroupId- the group id for delayed messages.- Since:
- 6.2
-
setDefaultDelay
public void setDefaultDelay(long defaultDelay) Set the default delay in milliseconds. If nodelayExpressionproperty has been provided, the default delay will be applied to all Messages. If a delay should only be applied to Messages with evaluation result fromdelayExpression, then set this value to 0.- Parameters:
defaultDelay- The default delay in milliseconds.
-
setDelayExpression
Specify theExpressionthat should be checked for a delay period (in milliseconds) or a Date to delay until. If this property is set, the result of the expression evaluation (if not null) will take precedence over this handler's default delay.- Parameters:
delayExpression- The delay expression.
-
setDelayExpressionString
Specify theExpressionthat should be checked for a delay period (in milliseconds) or a Date to delay until. If this property is set, the result of the expression evaluation (if not null) will take precedence over this handler's default delay.- Parameters:
delayExpression- The delay expression.- Since:
- 5.0
-
setIgnoreExpressionFailures
public void setIgnoreExpressionFailures(boolean ignoreExpressionFailures) Specify whetherExceptionsthrown bydelayExpressionevaluation should be ignored (only logged). In this case the delayer will fall back to thedefaultDelay. If this property is specified asfalse, anydelayExpressionevaluationExceptionwill be thrown to the caller without falling back to thedefaultDelay. Default istrue.- Parameters:
ignoreExpressionFailures- true if expression evaluation failures should be ignored.- See Also:
-
setMessageStore
Specify theMessageGroupStorethat should be used to store Messages while awaiting the delay.- Parameters:
messageStore- The message store.
-
setDelayedAdviceChain
Specify theList<Advice>to adviseDelayHandler.ReleaseMessageHandlerproxy. Usually used to add transactions to delayed messages retrieved from a transactional message store.- Parameters:
delayedAdviceChain- The advice chain.- See Also:
-
setDelayedMessageErrorChannel
Set a message channel to which anErrorMessagewill be sent if sending the released message fails. If the error flow returns normally, the release is complete. If the error flow throws an exception, the release will be re-attempted. If there is a transaction advice on the release task, the error flow is called within the transaction.- Parameters:
delayedMessageErrorChannel- the channel.- Since:
- 5.0.8
- See Also:
-
setDelayedMessageErrorChannelName
Set a message channel name to which anErrorMessagewill be sent if sending the released message fails. If the error flow returns normally, the release is complete. If the error flow throws an exception, the release will be re-attempted. If there is a transaction advice on the release task, the error flow is called within the transaction.- Parameters:
delayedMessageErrorChannelName- the channel name.- Since:
- 5.0.8
- See Also:
-
setMaxAttempts
public void setMaxAttempts(int maxAttempts) Set the maximum number of release attempts for when message release fails. Default 5.- Parameters:
maxAttempts- the max attempts.- Since:
- 5.0.8
- See Also:
-
setRetryDelay
public void setRetryDelay(long retryDelay) Set an additional delay to apply when retrying after a release failure. Default 1000L.- Parameters:
retryDelay- the retry delay.- Since:
- 5.0.8
- See Also:
-
getComponentType
Description copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
getComponentTypein interfaceNamedComponent- Overrides:
getComponentTypein classMessageHandlerSupport
-
getIntegrationPatternType
Description copied from interface:IntegrationPatternReturn a pattern type this component implements.- Specified by:
getIntegrationPatternTypein interfaceIntegrationPattern- Overrides:
getIntegrationPatternTypein classAbstractReplyProducingMessageHandler- Returns:
- the
IntegrationPatternTypethis component implements.
-
doInit
protected void doInit()- Overrides:
doInitin classAbstractReplyProducingMessageHandler
-
shouldCopyRequestHeaders
protected boolean shouldCopyRequestHeaders()Description copied from class:AbstractMessageProducingHandlerSubclasses may override this. True by default.- Overrides:
shouldCopyRequestHeadersin classAbstractMessageProducingHandler- Returns:
- true if the request headers should be copied.
-
handleRequestMessage
Check if 'requestMessage' wasn't delayed before (releaseMessageAfterDelay(org.springframework.messaging.Message<?>, long)andDelayHandler.DelayedMessageWrapper). Than determine 'delay' for 'requestMessage' (determineDelayForMessage(org.springframework.messaging.Message<?>)) and ifdelay > 0schedules 'releaseMessage' task after 'delay'.- Specified by:
handleRequestMessagein classAbstractReplyProducingMessageHandler- Parameters:
requestMessage- - the Message which may be delayed.- Returns:
- -
nullif 'requestMessage' is delayed, otherwise - 'payload' from 'requestMessage'. - See Also:
-
rescheduleAt
-
getDelayedMessageCount
public int getDelayedMessageCount()- Specified by:
getDelayedMessageCountin interfaceDelayHandlerManagement
-
reschedulePersistedMessages
public void reschedulePersistedMessages()Used for reading persisted Messages in the 'messageStore' to reschedule them e.g. upon application restart. The logic is based on iteration overmessageGroup.getMessages()and schedules task for 'delay' logic. This behavior is dictated by the avoidance of invocation thread overload.- Specified by:
reschedulePersistedMessagesin interfaceDelayHandlerManagement
-
onApplicationEvent
HandleContextRefreshedEventto invokereschedulePersistedMessages()as late as possible after application context startup. Also, it checksinitializedto ignore otherContextRefreshedEvents which may be published in the 'parent-child' contexts, e.g. in the Spring-MVC applications.- Specified by:
onApplicationEventin interfaceApplicationListener<ContextRefreshedEvent>- Parameters:
event- -ContextRefreshedEventwhich occurs after Application context is completely initialized.- See Also:
-