Class FluxAggregatorMessageHandler
java.lang.Object
org.springframework.integration.context.IntegrationObjectSupport
org.springframework.integration.handler.MessageHandlerSupport
org.springframework.integration.handler.AbstractMessageHandler
org.springframework.integration.handler.AbstractMessageProducingHandler
org.springframework.integration.aggregator.FluxAggregatorMessageHandler
- All Implemented Interfaces:
org.reactivestreams.Subscriber<Message<?>>,Aware,BeanFactoryAware,BeanNameAware,DisposableBean,InitializingBean,ApplicationContextAware,Lifecycle,Ordered,ExpressionCapable,Orderable,MessageProducer,HeaderPropagationAware,IntegrationPattern,NamedComponent,IntegrationManagement,ManageableLifecycle,TrackableComponent,MessageHandler,reactor.core.CoreSubscriber<Message<?>>
public class FluxAggregatorMessageHandler extends AbstractMessageProducingHandler implements ManageableLifecycle
The
AbstractMessageProducingHandler implementation for aggregation logic based
on Reactor's Flux.groupBy(java.util.function.Function<? super T, ? extends K>) and Flux.window(int) operators.
The incoming messages are emitted into a FluxSink provided by the
Flux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>) initialized in the constructor.
The resulting windows for groups are wrapped into Messages for downstream
consumption.
If the AbstractMessageProducingHandler.getOutputChannel() is not a ReactiveStreamsSubscribableChannel
instance, a subscription for the whole aggregating Flux is performed in the
start() method.
- Since:
- 5.2
- Author:
- Artem Bilan
-
Nested Class Summary
Nested classes/interfaces inherited from interface org.springframework.integration.support.management.IntegrationManagement
IntegrationManagement.ManagementOverrides -
Field Summary
Fields inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
messagingTemplateFields inherited from class org.springframework.integration.context.IntegrationObjectSupport
EXPRESSION_PARSER, loggerFields inherited from interface org.springframework.integration.support.management.IntegrationManagement
METER_PREFIX, RECEIVE_COUNTER_NAME, SEND_TIMER_NAMEFields inherited from interface org.springframework.core.Ordered
HIGHEST_PRECEDENCE, LOWEST_PRECEDENCE -
Constructor Summary
Constructors Constructor Description FluxAggregatorMessageHandler()Create an instance with aFlux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>)and applyFlux.groupBy(java.util.function.Function<? super T, ? extends K>)andFlux.window(int)transformation into it. -
Method Summary
Modifier and Type Method Description StringgetComponentType()Subclasses may implement this method to provide component type information.IntegrationPatternTypegetIntegrationPatternType()Return a pattern type this component implements.protected voidhandleMessageInternal(Message<?> message)booleanisRunning()voidsetBoundaryTrigger(Predicate<Message<?>> boundaryTrigger)Configure aPredicatefor messages to determine a window boundary in theFlux.windowUntil(java.util.function.Predicate<T>)operator.voidsetCombineFunction(Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Mono<Message<?>>> combineFunction)Configure a transformationFunctionto apply for aFluxwindow to emit.voidsetCorrelationStrategy(CorrelationStrategy correlationStrategy)Configure aCorrelationStrategyto determine a group key from the incoming messages.voidsetWindowConfigurer(Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer)Configure aFunctionto apply a transformation into the groupingFluxfor any arbitraryFlux.window(int)options not covered by the simple options.voidsetWindowSize(int windowSize)Specify a size for windows to close.voidsetWindowSizeFunction(Function<Message<?>,Integer> windowSizeFunction)Specify aFunctionto determine a size for windows to close against the first message in group.voidsetWindowTimespan(Duration windowTimespan)Configure aDurationfor closing windows periodically.protected booleanshouldCopyRequestHeaders()Subclasses may override this.voidstart()voidstop()Methods inherited from class org.springframework.integration.handler.AbstractMessageProducingHandler
addNotPropagatedHeaders, createOutputMessage, getNotPropagatedHeaders, getOutputChannel, isAsync, messageBuilderForReply, onInit, produceOutput, resolveErrorChannel, sendErrorMessage, sendOutput, sendOutputs, setAsync, setNotPropagatedHeaders, setOutputChannel, setOutputChannelName, setSendTimeout, shouldSplitOutput, updateNotPropagatedHeadersMethods inherited from class org.springframework.integration.handler.AbstractMessageHandler
handleMessage, onComplete, onError, onNext, onSubscribeMethods inherited from class org.springframework.integration.handler.MessageHandlerSupport
buildSendTimer, destroy, getManagedName, getManagedType, getMetricsCaptor, getOrder, getOverrides, isLoggingEnabled, registerMetricsCaptor, sendTimer, setLoggingEnabled, setManagedName, setManagedType, setOrder, setShouldTrack, shouldTrackMethods inherited from class org.springframework.integration.context.IntegrationObjectSupport
afterPropertiesSet, extractTypeIfPossible, generateId, getApplicationContext, getApplicationContextId, getBeanDescription, getBeanFactory, getBeanName, getChannelResolver, getComponentName, getConversionService, getExpression, getIntegrationProperties, getIntegrationProperty, getMessageBuilderFactory, getTaskScheduler, isInitialized, setApplicationContext, setBeanFactory, setBeanName, setChannelResolver, setComponentName, setConversionService, setMessageBuilderFactory, setPrimaryExpression, setTaskScheduler, toStringMethods inherited from class java.lang.Object
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, waitMethods inherited from interface org.springframework.integration.support.management.IntegrationManagement
getThisAsMethods inherited from interface org.springframework.integration.support.context.NamedComponent
getBeanName, getComponentName
-
Constructor Details
-
FluxAggregatorMessageHandler
public FluxAggregatorMessageHandler()Create an instance with aFlux.create(java.util.function.Consumer<? super reactor.core.publisher.FluxSink<T>>)and applyFlux.groupBy(java.util.function.Function<? super T, ? extends K>)andFlux.window(int)transformation into it.
-
-
Method Details
-
setCorrelationStrategy
Configure aCorrelationStrategyto determine a group key from the incoming messages. By default aHeaderAttributeCorrelationStrategyis used against aIntegrationMessageHeaderAccessor.CORRELATION_IDheader value.- Parameters:
correlationStrategy- theCorrelationStrategyto use.
-
setCombineFunction
public void setCombineFunction(Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Mono<Message<?>>> combineFunction)Configure a transformationFunctionto apply for aFluxwindow to emit. Requires aMonoresult with aMessageas value as a combination result of the incomingFluxfor window. By default aFluxfor window is fully wrapped into a message with headers copied from the first message in window. Such aFluxin the payload has to be subscribed and consumed downstream.- Parameters:
combineFunction- theFunctionto use for result windows transformation.
-
setBoundaryTrigger
Configure aPredicatefor messages to determine a window boundary in theFlux.windowUntil(java.util.function.Predicate<T>)operator. Has a precedence over any other window configuration options.- Parameters:
boundaryTrigger- thePredicateto use for window boundary.- See Also:
Flux.windowUntil(Predicate)
-
setWindowSize
public void setWindowSize(int windowSize)Specify a size for windows to close. Can be combined with thesetWindowTimespan(Duration).- Parameters:
windowSize- the size for window to use.- See Also:
Flux.window(int),Flux.windowTimeout(int, Duration)
-
setWindowSizeFunction
Specify aFunctionto determine a size for windows to close against the first message in group. Tne result of the function can be combined with thesetWindowTimespan(Duration). By default anIntegrationMessageHeaderAccessor.SEQUENCE_SIZEheader is consulted.- Parameters:
windowSizeFunction- theFunctionto use to determine a window size against a first message in the group.- See Also:
Flux.window(int),Flux.windowTimeout(int, Duration)
-
setWindowTimespan
Configure aDurationfor closing windows periodically. Can be combined with thesetWindowSize(int)orsetWindowSizeFunction(Function).- Parameters:
windowTimespan- theDurationto use for windows to close periodically.- See Also:
Flux.window(Duration),Flux.windowTimeout(int, Duration)
-
setWindowConfigurer
public void setWindowConfigurer(Function<reactor.core.publisher.Flux<Message<?>>,reactor.core.publisher.Flux<reactor.core.publisher.Flux<Message<?>>>> windowConfigurer)Configure aFunctionto apply a transformation into the groupingFluxfor any arbitraryFlux.window(int)options not covered by the simple options. Has a precedence over any other window configuration options.- Parameters:
windowConfigurer- theFunctionto apply any custom window transformation.
-
getComponentType
Description copied from class:IntegrationObjectSupportSubclasses may implement this method to provide component type information.- Specified by:
getComponentTypein interfaceNamedComponent- Overrides:
getComponentTypein classMessageHandlerSupport
-
getIntegrationPatternType
Description copied from interface:IntegrationPatternReturn a pattern type this component implements.- Specified by:
getIntegrationPatternTypein interfaceIntegrationPattern- Overrides:
getIntegrationPatternTypein classMessageHandlerSupport- Returns:
- the
IntegrationPatternTypethis component implements.
-
start
public void start()- Specified by:
startin interfaceLifecycle- Specified by:
startin interfaceManageableLifecycle
-
stop
public void stop()- Specified by:
stopin interfaceLifecycle- Specified by:
stopin interfaceManageableLifecycle
-
isRunning
public boolean isRunning()- Specified by:
isRunningin interfaceLifecycle- Specified by:
isRunningin interfaceManageableLifecycle
-
handleMessageInternal
- Specified by:
handleMessageInternalin classAbstractMessageHandler
-
shouldCopyRequestHeaders
protected boolean shouldCopyRequestHeaders()Description copied from class:AbstractMessageProducingHandlerSubclasses may override this. True by default.- Overrides:
shouldCopyRequestHeadersin classAbstractMessageProducingHandler- Returns:
- true if the request headers should be copied.
-