Aggregator
Basically a mirror-image of the splitter, the aggregator is a type of message handler that receives multiple messages and combines them into a single message. In fact, an aggregator is often a downstream consumer in a pipeline that includes a splitter.
Technically, the aggregator is more complex than a splitter, because it is stateful.
It must hold the messages to be aggregated and determine when the complete group of messages is ready to be aggregated.
In order to do so, it requires a MessageStore.
Functionality
The Aggregator combines a group of related messages, by correlating and storing them, until the group is deemed to be complete. At that point, the aggregator creates a single message by processing the whole group and sends the aggregated message as output.
Implementing an aggregator requires providing the logic to perform the aggregation (that is, the creation of a single message from many). Two related concepts are correlation and release.
Correlation determines how messages are grouped for aggregation.
In Spring Integration, correlation is done by default, based on the IntegrationMessageHeaderAccessor.CORRELATION_ID message header.
Messages with the same IntegrationMessageHeaderAccessor.CORRELATION_ID are grouped together.
However, you can customize the correlation strategy to allow other ways of specifying how the messages should be grouped together.
To do so, you can implement a CorrelationStrategy (covered later in this chapter).
To determine the point at which a group of messages is ready to be processed, a ReleaseStrategy is consulted.
The default release strategy for the aggregator releases a group when all messages included in a sequence are present, based on the IntegrationMessageHeaderAccessor.SEQUENCE_SIZE header.
You can override this default strategy by providing a reference to a custom ReleaseStrategy implementation.
Programming Model
The Aggregation API consists of a number of classes:
- 
The interface MessageGroupProcessor, and its subclasses:MethodInvokingAggregatingMessageGroupProcessorandExpressionEvaluatingMessageGroupProcessor
- 
The ReleaseStrategyinterface and its default implementation:SimpleSequenceSizeReleaseStrategy
- 
The CorrelationStrategyinterface and its default implementation:HeaderAttributeCorrelationStrategy
AggregatingMessageHandler
The AggregatingMessageHandler (a subclass of AbstractCorrelatingMessageHandler) is a MessageHandler implementation, encapsulating the common functionality of an aggregator (and other correlating use cases), which are as follows:
- 
Correlating messages into a group to be aggregated 
- 
Maintaining those messages in a MessageStoreuntil the group can be released
- 
Deciding when the group can be released 
- 
Aggregating the released group into a single message 
- 
Recognizing and responding to an expired group 
The responsibility for deciding how the messages should be grouped together is delegated to a CorrelationStrategy instance.
The responsibility for deciding whether the message group can be released is delegated to a ReleaseStrategy instance.
The following listing shows a brief highlight of the base AbstractAggregatingMessageGroupProcessor (the responsibility for implementing the aggregatePayloads method is left to the developer):
public abstract class AbstractAggregatingMessageGroupProcessor
              implements MessageGroupProcessor {
    protected Map<String, Object> aggregateHeaders(MessageGroup group) {
        // default implementation exists
    }
    protected abstract Object aggregatePayloads(MessageGroup group, Map<String, Object> defaultHeaders);
}See DefaultAggregatingMessageGroupProcessor, ExpressionEvaluatingMessageGroupProcessor and MethodInvokingMessageGroupProcessor as out-of-the-box implementations of the AbstractAggregatingMessageGroupProcessor.
Starting with version 5.2, a Function<MessageGroup, Map<String, Object>> strategy is available for the AbstractAggregatingMessageGroupProcessor to merge and compute (aggregate) headers for an output message.
The DefaultAggregateHeadersFunction implementation is available with logic that returns all headers that have no conflicts among the group; an absent header on one or more messages within the group is not considered a conflict.
Conflicting headers are omitted.
Along with the newly introduced DelegatingMessageGroupProcessor, this function is used for any arbitrary (non-AbstractAggregatingMessageGroupProcessor) MessageGroupProcessor implementation.
Essentially, the framework injects a provided function into an AbstractAggregatingMessageGroupProcessor instance and wraps all other implementations into a DelegatingMessageGroupProcessor.
The difference in logic between the AbstractAggregatingMessageGroupProcessor and the DelegatingMessageGroupProcessor that the latter doesn’t compute headers in advance, before calling the delegate strategy, and doesn’t invoke the function if the delegate returns a Message or AbstractIntegrationMessageBuilder.
In that case, the framework assumes that the target implementation has taken care of producing a proper set of headers populated into the returned result.
The Function<MessageGroup, Map<String, Object>> strategy is available as the headers-function reference attribute for XML configuration, as the AggregatorSpec.headersFunction() option for the Java DSL and as AggregatorFactoryBean.setHeadersFunction() for plain Java configuration.
The CorrelationStrategy is owned by the AbstractCorrelatingMessageHandler and  has a default value based on the IntegrationMessageHeaderAccessor.CORRELATION_ID message header, as the following example shows:
public AbstractCorrelatingMessageHandler(MessageGroupProcessor processor, MessageGroupStore store,
        CorrelationStrategy correlationStrategy, ReleaseStrategy releaseStrategy) {
    ...
    this.correlationStrategy = correlationStrategy == null ?
        new HeaderAttributeCorrelationStrategy(IntegrationMessageHeaderAccessor.CORRELATION_ID) : correlationStrategy;
    this.releaseStrategy = releaseStrategy == null ? new SimpleSequenceSizeReleaseStrategy() : releaseStrategy;
    ...
}As for the actual processing of the message group, the default implementation is the DefaultAggregatingMessageGroupProcessor.
It creates a single Message whose payload is a List of the payloads received for a given group.
This works well for simple scatter-gather implementations with a splitter, a publish-subscribe channel, or a recipient list router upstream.
| When using a publish-subscribe channel or a recipient list router in this type of scenario, be sure to enable the apply-sequenceflag.
Doing so adds the necessary headers:CORRELATION_ID,SEQUENCE_NUMBER, andSEQUENCE_SIZE.
That behavior is enabled by default for splitters in Spring Integration, but it is not enabled for publish-subscribe channels or for recipient list routers because those components may be used in a variety of contexts in which these headers are not necessary. | 
When implementing a specific aggregator strategy for an application, you can extend AbstractAggregatingMessageGroupProcessor and implement the aggregatePayloads method.
However, there are better solutions, less coupled to the API, for implementing the aggregation logic, which can be configured either through XML or through annotations.
In general, any POJO can implement the aggregation algorithm if it provides a method that accepts a single java.util.List as an argument (parameterized lists are supported as well).
This method is invoked for aggregating messages as follows:
- 
If the argument is a java.util.Collection<T>and the parameter type T is assignable toMessage, the whole list of messages accumulated for aggregation is sent to the aggregator.
- 
If the argument is a non-parameterized java.util.Collectionor the parameter type is not assignable toMessage, the method receives the payloads of the accumulated messages.
- 
If the return type is not assignable to Message, it is treated as the payload for aMessagethat is automatically created by the framework.
| In the interest of code simplicity and promoting best practices such as low coupling, testability, and others, the preferred way of implementing the aggregation logic is through a POJO and using the XML or annotation support for configuring it in the application. | 
Starting with version 5.3, after processing message group, an AbstractCorrelatingMessageHandler performs a MessageBuilder.popSequenceDetails() message headers modification for the proper splitter-aggregator scenario with several nested levels.
It is done only if the message group release result is not a collection of messages.
In that case a target MessageGroupProcessor is responsible for the MessageBuilder.popSequenceDetails() call while building those messages.
If the MessageGroupProcessor returns a Message, a MessageBuilder.popSequenceDetails() will be performed on the output message only if the sequenceDetails matches with first message in group.
(Previously this has been done only if a plain payload or an AbstractIntegrationMessageBuilder has been returned from the MessageGroupProcessor.)
This functionality can be controlled by a new popSequence boolean property, so the MessageBuilder.popSequenceDetails() can be disabled in some scenarios when correlation details have not been populated by the standard splitter.
This property, essentially, undoes what has been done by the nearest upstream applySequence = true in the AbstractMessageSplitter.
See Splitter for more information.
| The SimpleMessageGroup.getMessages()method returns anunmodifiableCollection.
Therefore, if your aggregating POJO method has aCollection<Message>parameter, the argument passed in is exactly thatCollectioninstance and, when you use aSimpleMessageStorefor the aggregator, that originalCollection<Message>is cleared after releasing the group.
Consequently, theCollection<Message>variable in the POJO is cleared too, if it is passed out of the aggregator.
If you wish to simply release that collection as-is for further processing, you must build a newCollection(for example,new ArrayList<Message>(messages)).
Starting with version 4.3, the framework no longer copies the messages to a new collection, to avoid undesired extra object creation. | 
If the processMessageGroup method of the MessageGroupProcessor returns a collection, it must be a collection of Message<?> objects.
In this case, the messages are individually released.
Prior to version 4.2, it was not possible to provide a MessageGroupProcessor by using XML configuration.
Only POJO methods could be used for aggregation.
Now, if the framework detects that the referenced (or inner) bean implements MessageProcessor, it is used as the aggregator’s output processor.
If you wish to release a collection of objects from a custom MessageGroupProcessor as the payload of a message, your class should extend AbstractAggregatingMessageGroupProcessor and implement aggregatePayloads().
Also, since version 4.2, a SimpleMessageGroupProcessor is provided.
It returns the collection of messages from the group, which, as indicated earlier, causes the released messages to be sent individually.
This lets the aggregator work as a message barrier, where arriving messages are held until the release strategy fires and the group is released as a sequence of individual messages.
ReleaseStrategy
The ReleaseStrategy interface is defined as follows:
public interface ReleaseStrategy {
  boolean canRelease(MessageGroup group);
}In general, any POJO can implement the completion decision logic if it provides a method that accepts a single java.util.List as an argument (parameterized lists are supported as well) and returns a boolean value.
This method is invoked after the arrival of each new message, to decide whether the group is complete or not, as follows:
- 
If the argument is a java.util.List<T>and the parameter typeTis assignable toMessage, the whole list of messages accumulated in the group is sent to the method.
- 
If the argument is a non-parametrized java.util.Listor the parameter type is not assignable toMessage, the method receives the payloads of the accumulated messages.
- 
The method must return trueif the message group is ready for aggregation or false otherwise.
The following example shows how to use the @ReleaseStrategy annotation for a List of type Message:
public class MyReleaseStrategy {
    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<Message<?>>) {...}
}The following example shows how to use the @ReleaseStrategy annotation for a List of type String:
public class MyReleaseStrategy {
    @ReleaseStrategy
    public boolean canMessagesBeReleased(List<String>) {...}
}Based on the signatures in the preceding two examples, the POJO-based release strategy is passed a Collection of not-yet-released messages (if you need access to the whole Message) or a Collection of payload objects (if the type parameter is anything other than Message).
This satisfies the majority of use cases.
However if, for some reason, you need to access the full MessageGroup, you should provide an implementation of the ReleaseStrategy interface.
| When handling potentially large groups, you should understand how these methods are invoked, because the release strategy may be invoked multiple times before the group is released.
The most efficient is an implementation of  For these reasons, for large groups, we recommended that you implement  | 
When the group is released for aggregation, all its not-yet-released messages are processed and removed from the group.
If the group is also complete (that is, if all messages from a sequence have arrived or if there is no sequence defined), then the group is marked as complete.
Any new messages for this group are sent to the discard channel (if defined).
Setting expire-groups-upon-completion to true (the default is false) removes the entire group, and any new messages (with the same correlation ID as the removed group) form a new group.
You can release partial sequences by using a MessageGroupStoreReaper together with send-partial-result-on-expiry being set to true.
| To facilitate discarding of late-arriving messages, the aggregator must maintain state about the group after it has been released.
This can eventually cause out-of-memory conditions.
To avoid such situations, you should consider configuring a MessageGroupStoreReaperto remove the group metadata.
The expiry parameters should be set to expire groups once a point has been reach after after which late messages are not expected to arrive.
For information about configuring a reaper, see Managing State in an Aggregator:MessageGroupStore. | 
Spring Integration provides an implementation for ReleaseStrategy: SimpleSequenceSizeReleaseStrategy.
This implementation consults the SEQUENCE_NUMBER and SEQUENCE_SIZE headers of each arriving message to decide when a message group is complete and ready to be aggregated.
As shown earlier, it is also the default strategy.
| Before version 5.0, the default release strategy was SequenceSizeReleaseStrategy, which does not perform well with large groups.
With that strategy, duplicate sequence numbers are detected and rejected.
This operation can be expensive. | 
If you are aggregating large groups, you don’t need to release partial groups, and you don’t need to detect/reject duplicate sequences, consider using the SimpleSequenceSizeReleaseStrategy instead - it is much more efficient for these use cases, and is the default since version 5.0 when partial group release is not specified.
Aggregating Large Groups
The 4.3 release changed the default Collection for messages in a SimpleMessageGroup to HashSet (it was previously a BlockingQueue).
This was expensive when removing individual messages from large groups (an O(n) linear scan was required).
Although the hash set is generally much faster to remove, it can be expensive for large messages, because the hash has to be calculated on both inserts and removes.
If you have messages that are expensive to hash, consider using some other collection type.
As discussed in Using MessageGroupFactory, a SimpleMessageGroupFactory is provided so that you can select the Collection that best suits your needs.
You can also provide your own factory implementation to create some other Collection<Message<?>>.
The following example shows how to configure an aggregator with the previous implementation and a SimpleSequenceSizeReleaseStrategy:
<int:aggregator input-channel="aggregate"
    output-channel="out" message-store="store" release-strategy="releaser" />
<bean id="store" class="org.springframework.integration.store.SimpleMessageStore">
    <property name="messageGroupFactory">
        <bean class="org.springframework.integration.store.SimpleMessageGroupFactory">
            <constructor-arg value="BLOCKING_QUEUE"/>
        </bean>
    </property>
</bean>
<bean id="releaser" class="SimpleSequenceSizeReleaseStrategy" />Correlation Strategy
The CorrelationStrategy interface is defined as follows:
public interface CorrelationStrategy {
  Object getCorrelationKey(Message<?> message);
}The method returns an Object that represents the correlation key used for associating the message with a message group.
The key must satisfy the criteria used for a key in a Map with respect to the implementation of equals() and hashCode().
In general, any POJO can implement the correlation logic, and the rules for mapping a message to a method’s argument (or arguments) are the same as for a ServiceActivator (including support for @Header annotations).
The method must return a value, and the value must not be null.
Spring Integration provides an implementation for CorrelationStrategy: HeaderAttributeCorrelationStrategy.
This implementation returns the value of one of the message headers (whose name is specified by a constructor argument) as the correlation key.
By default, the correlation strategy is a HeaderAttributeCorrelationStrategy that returns the value of the CORRELATION_ID header attribute.
If you have a custom header name you would like to use for correlation, you can configure it on an instance of HeaderAttributeCorrelationStrategy and provide that as a reference for the aggregator’s correlation strategy.
Lock Registry
Changes to groups are thread safe.
So, when you send messages for the same correlation ID concurrently, only one of them will be processed in the aggregator, making it effectively as a single-threaded per message group.
A LockRegistry is used to obtain a lock for the resolved correlation ID.
A DefaultLockRegistry is used by default (in-memory).
For synchronizing updates across servers where a shared MessageGroupStore is being used, you must configure a shared lock registry.
Avoiding Deadlocks
As discussed above, when message groups are mutated (messages added or released) a lock is held.
Consider the following flow:
...->aggregator1-> ... ->aggregator2-> ...If there are multiple threads, and the aggregators share a common lock registry, it is possible to get a deadlock.
This will cause hung threads and jstack <pid> might present a result such as:
Found one Java-level deadlock:
=============================
"t2":
  waiting for ownable synchronizer 0x000000076c1cbfa0, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t1"
"t1":
  waiting for ownable synchronizer 0x000000076c1ccc00, (a java.util.concurrent.locks.ReentrantLock$NonfairSync),
  which is held by "t2"There are several ways to avoid this problem:
- 
ensure each aggregator has its own lock registry (this can be a shared registry across application instances but two or more aggregators in the flow must each have a distinct registry) 
- 
use an ExecutorChannelorQueueChannelas the output channel of the aggregator so that the downstream flow runs on a new thread
- 
starting with version 5.1.1, set the releaseLockBeforeSendaggregator property totrue
| This problem can also be caused if, for some reason, the output of a single aggregator is eventually routed back to the same aggregator. Of course, the first solution above does not apply in this case. | 
Configuring an Aggregator in Java DSL
See Aggregators and Resequencers for how to configure an aggregator in Java DSL.
Configuring an Aggregator with XML
Spring Integration supports the configuration of an aggregator with XML through the <aggregator/> element.
The following example shows an example of an aggregator:
<channel id="inputChannel"/>
<int:aggregator id="myAggregator"                          (1)
        auto-startup="true"                                (2)
        input-channel="inputChannel"                       (3)
        output-channel="outputChannel"                     (4)
        discard-channel="throwAwayChannel"                 (5)
        message-store="persistentMessageStore"             (6)
        order="1"                                          (7)
        send-partial-result-on-expiry="false"              (8)
        send-timeout="1000"                                (9)
        correlation-strategy="correlationStrategyBean"     (10)
        correlation-strategy-method="correlate"            (11)
        correlation-strategy-expression="headers['foo']"   (12)
        ref="aggregatorBean"                               (13)
        method="aggregate"                                 (14)
        release-strategy="releaseStrategyBean"             (15)
        release-strategy-method="release"                  (16)
        release-strategy-expression="size() == 5"          (17)
        expire-groups-upon-completion="false"              (18)
        empty-group-min-timeout="60000"                    (19)
        lock-registry="lockRegistry"                       (20)
        group-timeout="60000"                              (21)
        group-timeout-expression="size() ge 2 ? 100 : -1"  (22)
        expire-groups-upon-timeout="true"                  (23)
        scheduler="taskScheduler" >                        (24)
            <expire-transactional/>                        (25)
            <expire-advice-chain/>                         (26)
</aggregator>
<int:channel id="outputChannel"/>
<int:channel id="throwAwayChannel"/>
<bean id="persistentMessageStore" class="org.springframework.integration.jdbc.store.JdbcMessageStore">
    <constructor-arg ref="dataSource"/>
</bean>
<bean id="aggregatorBean" class="sample.PojoAggregator"/>
<bean id="releaseStrategyBean" class="sample.PojoReleaseStrategy"/>
<bean id="correlationStrategyBean" class="sample.PojoCorrelationStrategy"/>| 1 | The id of the aggregator is optional. | 
| 2 | Lifecycle attribute signaling whether the aggregator should be started during application context startup. Optional (the default is 'true'). | 
| 3 | The channel from which where aggregator receives messages. Required. | 
| 4 | The channel to which the aggregator sends the aggregation results. Optional (because incoming messages can themselves specify a reply channel in the 'replyChannel' message header). | 
| 5 | The channel to which the aggregator sends the messages that timed out (if send-partial-result-on-expiryisfalse).
Optional. | 
| 6 | A reference to a MessageGroupStoreused to store groups of messages under their correlation key until they are complete.
Optional.
By default, it is a volatile in-memory store.
See Message Store for more information. | 
| 7 | The order of this aggregator when more than one handle is subscribed to the same DirectChannel(use for load-balancing purposes).
Optional. | 
| 8 | Indicates that expired messages should be aggregated and sent to the 'output-channel' or 'replyChannel' once their containing MessageGroupis expired (seeMessageGroupStore.expireMessageGroups(long)).
One way of expiring aMessageGroupis by configuring aMessageGroupStoreReaper.
However you can alternatively expireMessageGroupby callingMessageGroupStore.expireMessageGroups(timeout).
You can accomplish that through a Control Bus operation or, if you have a reference to theMessageGroupStoreinstance, by invokingexpireMessageGroups(timeout).
Otherwise, by itself, this attribute does nothing.
It serves only as an indicator of whether to discard or send to the output or reply channel any messages that are still in theMessageGroupthat is about to be expired.
Optional (the default isfalse).
NOTE: This attribute might more properly be calledsend-partial-result-on-timeout, because the group may not actually expire ifexpire-groups-upon-timeoutis set tofalse. | 
| 9 | The timeout interval to wait when sending a reply Messageto theoutput-channelordiscard-channel.
Defaults to-1, which results in blocking indefinitely.
It is applied only if the output channel has some 'sending' limitations, such as aQueueChannelwith a fixed 'capacity'.
In this case, aMessageDeliveryExceptionis thrown.
ForAbstractSubscribableChannelimplementations, thesend-timeoutis ignored .
Forgroup-timeout(-expression), theMessageDeliveryExceptionfrom the scheduled expire task leads this task to be rescheduled.
Optional. | 
| 10 | A reference to a bean that implements the message correlation (grouping) algorithm.
The bean can be an implementation of the CorrelationStrategyinterface or a POJO.
In the latter case, thecorrelation-strategy-methodattribute must be defined as well.
Optional (by default, the aggregator uses theIntegrationMessageHeaderAccessor.CORRELATION_IDheader). | 
| 11 | A method defined on the bean referenced by correlation-strategy.
It implements the correlation decision algorithm.
Optional, with restrictions (correlation-strategymust be present). | 
| 12 | A SpEL expression representing the correlation strategy.
Example: "headers['something']".
Only one ofcorrelation-strategyorcorrelation-strategy-expressionis allowed. | 
| 13 | A reference to a bean defined in the application context. The bean must implement the aggregation logic, as described earlier. Optional (by default, the list of aggregated messages becomes a payload of the output message). | 
| 14 | A method defined on the bean referenced by the refattribute.
It implements the message aggregation algorithm.
Optional (it depends onrefattribute being defined). | 
| 15 | A reference to a bean that implements the release strategy.
The bean can be an implementation of the ReleaseStrategyinterface or a POJO.
In the latter case, therelease-strategy-methodattribute must be defined as well.
Optional (by default, the aggregator uses theIntegrationMessageHeaderAccessor.SEQUENCE_SIZEheader attribute). | 
| 16 | A method defined on the bean referenced by the release-strategyattribute.
It implements the completion decision algorithm.
Optional, with restrictions (release-strategymust be present). | 
| 17 | A SpEL expression representing the release strategy.
The root object for the expression is a MessageGroup.
Example:"size() == 5".
Only one ofrelease-strategyorrelease-strategy-expressionis allowed. | 
| 18 | When set to true(the default isfalse), completed groups are removed from the message store, letting subsequent messages with the same correlation form a new group.
The default behavior is to send messages with the same correlation as a completed group to thediscard-channel. | 
| 19 | Applies only if a MessageGroupStoreReaperis configured for theMessageStoreof the<aggregator>.
By default, when aMessageGroupStoreReaperis configured to expire partial groups, empty groups are also removed.
Empty groups exist after a group is normally released.
The empty groups enable the detection and discarding of late-arriving messages.
If you wish to expire empty groups on a longer schedule than expiring partial groups, set this property.
Empty groups are then not removed from theMessageStoreuntil they have not been modified for at least this number of milliseconds.
Note that the actual time to expire an empty group is also affected by the reaper’stimeoutproperty, and it could be as much as this value plus the timeout. | 
| 20 | A reference to a org.springframework.integration.util.LockRegistrybean.
It used to obtain aLockbased on thegroupIdfor concurrent operations on theMessageGroup.
By default, an internalDefaultLockRegistryis used.
Use of a distributedLockRegistry, such as theZookeeperLockRegistry, ensures only one instance of the aggregator can operate on a group concurrently.
See Redis Lock Registry, Gemfire Lock Registry, and Zookeeper Lock Registry for more information. | 
| 21 | A timeout (in milliseconds) to force the MessageGroupcomplete when theReleaseStrategydoes not release the group when the current message arrives.
This attribute provides a built-in time-based release strategy for the aggregator when there is a need to emit a partial result (or discard the group) if a new message does not arrive for theMessageGroupwithin the timeout which counts from the time the last message arrived.
To set up a timeout which counts from the time theMessageGroupwas created seegroup-timeout-expressioninformation.
When a new message arrives at the aggregator, any existingScheduledFuture<?>for itsMessageGroupis canceled.
If theReleaseStrategyreturnsfalse(meaning do not release) andgroupTimeout > 0, a new task is scheduled to expire the group.
We do not advise setting this attribute to zero (or a negative value).
Doing so effectively disables the aggregator, because every message group is immediately completed.
You can, however, conditionally set it to zero (or a negative value) by using an expression.
Seegroup-timeout-expressionfor information.
The action taken during the completion depends on theReleaseStrategyand thesend-partial-group-on-expiryattribute.
See Aggregator and Group Timeout for more information.
It is mutually exclusive with 'group-timeout-expression' attribute. | 
| 22 | The SpEL expression that evaluates to a groupTimeoutwith theMessageGroupas the#rootevaluation context object.
Used for scheduling theMessageGroupto be forced complete.
If the expression evaluates tonull, the completion is not scheduled.
If it evaluates to zero, the group is completed immediately on the current thread.
In effect, this provides a dynamicgroup-timeoutproperty.
As an example, if you wish to forcibly complete aMessageGroupafter 10 seconds have elapsed since the time the group was created you might consider using the following SpEL expression:timestamp + 10000 - T(System).currentTimeMillis()wheretimestampis provided byMessageGroup.getTimestamp()as theMessageGrouphere is the#rootevaluation context object.
Bear in mind however that the group creation time might differ from the time of the first arrived message depending on other group expiration properties' configuration.
Seegroup-timeoutfor more information.
Mutually exclusive with 'group-timeout' attribute. | 
| 23 | When a group is completed due to a timeout (or by a MessageGroupStoreReaper), the group is expired (completely removed) by default.
Late arriving messages start a new group.
You can set this tofalseto complete the group but have its metadata remain so that late arriving messages are discarded.
Empty groups can be expired later using aMessageGroupStoreReapertogether with theempty-group-min-timeoutattribute.
It defaults to 'true'. | 
| 24 | A TaskSchedulerbean reference to schedule theMessageGroupto be forced complete if no new message arrives for theMessageGroupwithin thegroupTimeout.
If not provided, the default scheduler (taskScheduler) registered in theApplicationContext(ThreadPoolTaskScheduler) is used.
This attribute does not apply ifgroup-timeoutorgroup-timeout-expressionis not specified. | 
| 25 | Since version 4.1.
It lets a transaction be started for the forceCompleteoperation.
It is initiated from agroup-timeout(-expression)or by aMessageGroupStoreReaperand is not applied to the normaladd,release, anddiscardoperations.
Only this sub-element or<expire-advice-chain/>is allowed. | 
| 26 | Since version 4.1.
It allows the configuration of any Advicefor theforceCompleteoperation.
It is initiated from agroup-timeout(-expression)or by aMessageGroupStoreReaperand is not applied to the normaladd,release, anddiscardoperations.
Only this sub-element or<expire-transactional/>is allowed.
A transactionAdvicecan also be configured here by using the Springtxnamespace. | 
| Expiring Groups There are two attributes related to expiring (completely removing) groups.
When a group is expired, there is no record of it, and, if a new message arrives with the same correlation, a new group is started.
When a group is completed (without expiry), the empty group remains and late-arriving messages are discarded.
Empty groups can be removed later by using a  
 If a group is not completed normally but is released or discarded because of a timeout, the group is normally expired.
Since version 4.1, you can control this behavior by using  
 Since version 5.0, empty groups are also scheduled for removal after  Starting with version 5.4, the aggregator (and resequencer) can be configured to expire orphaned groups (groups in a persistent message store that might not otherwise be released).
The  | 
We generally recommend using a ref attribute if a custom aggregator handler implementation may be referenced in other <aggregator> definitions.
However, if a custom aggregator implementation is only being used by a single definition of the <aggregator>, you can use an inner bean definition (starting with version 1.0.3) to configure the aggregation POJO within the <aggregator> element, as the following example shows:
<aggregator input-channel="input" method="sum" output-channel="output">
    <beans:bean class="org.foo.PojoAggregator"/>
</aggregator>| Using both a refattribute and an inner bean definition in the same<aggregator>configuration is not allowed, as it creates an ambiguous condition.
In such cases, an Exception is thrown. | 
The following example shows an implementation of the aggregator bean:
public class PojoAggregator {
  public Long add(List<Long> results) {
    long total = 0l;
    for (long partialResult: results) {
      total += partialResult;
    }
    return total;
  }
}An implementation of the completion strategy bean for the preceding example might be as follows:
public class PojoReleaseStrategy {
...
  public boolean canRelease(List<Long> numbers) {
    int sum = 0;
    for (long number: numbers) {
      sum += number;
    }
    return sum >= maxValue;
  }
}| Wherever it makes sense to do so, the release strategy method and the aggregator method can be combined into a single bean. | 
An implementation of the correlation strategy bean for the example above might be as follows:
public class PojoCorrelationStrategy {
...
  public Long groupNumbersByLastDigit(Long number) {
    return number % 10;
  }
}The aggregator in the preceding example would group numbers by some criterion (in this case, the remainder after dividing by ten) and hold the group until the sum of the numbers provided by the payloads exceeds a certain value.
| Wherever it makes sense to do so, the release strategy method, the correlation strategy method, and the aggregator method can be combined in a single bean. (Actually, all of them or any two of them can be combined.) | 
Aggregators and Spring Expression Language (SpEL)
Since Spring Integration 2.0, you can handle the various strategies (correlation, release, and aggregation) with SpEL, which we recommend if the logic behind such a release strategy is relatively simple.
Suppose you have a legacy component that was designed to receive an array of objects.
We know that the default release strategy assembles all aggregated messages in the List.
Now we have two problems.
First, we need to extract individual messages from the list.
Second, we need to extract the payload of each message and assemble the array of objects.
The following example solves both problems:
public String[] processRelease(List<Message<String>> messages){
    List<String> stringList = new ArrayList<String>();
    for (Message<String> message : messages) {
        stringList.add(message.getPayload());
    }
    return stringList.toArray(new String[]{});
}However, with SpEL, such a requirement could actually be handled relatively easily with a one-line expression, thus sparing you from writing a custom class and configuring it as a bean. The following example shows how to do so:
<int:aggregator input-channel="aggChannel"
    output-channel="replyChannel"
    expression="#this.![payload].toArray()"/>In the preceding configuration, we use a collection projection expression to assemble a new collection from the payloads of all the messages in the list and then transform it to an array, thus achieving the same result as the earlier Java code.
You can apply the same expression-based approach when dealing with custom release and correlation strategies.
Instead of defining a bean for a custom CorrelationStrategy in the correlation-strategy attribute, you can implement your simple correlation logic as a SpEL expression and configure it in the correlation-strategy-expression attribute, as the following example shows:
correlation-strategy-expression="payload.person.id"In the preceding example, we assume that the payload has a person attribute with an id, which is going to be used to correlate messages.
Likewise, for the ReleaseStrategy, you can implement your release logic as a SpEL expression and configure it in the release-strategy-expression attribute.
The root object for evaluation context is the MessageGroup itself.
The List of messages can be referenced by using the message property of the group within the expression.
| In releases prior to version 5.0, the root object was the collection of Message<?>, as the previous example shows: | 
release-strategy-expression="!messages.?[payload==5].empty"In the preceding example, the root object of the SpEL evaluation context is the MessageGroup itself, and you are stating that, as soon as there is a message with payload of 5 in this group, the group should be released.
Aggregator and Group Timeout
Starting with version 4.0, two new mutually exclusive attributes have been introduced: group-timeout and group-timeout-expression (see the earlier description).
See Configuring an Aggregator with XML.
In some cases, you may need to emit the aggregator result (or discard the group) after a timeout if the ReleaseStrategy does not release when the current message arrives.
For this purpose, the groupTimeout option lets scheduling the MessageGroup be forced to complete, as the following example shows:
<aggregator input-channel="input" output-channel="output"
        send-partial-result-on-expiry="true"
        group-timeout-expression="size() ge 2 ? 10000 : -1"
        release-strategy-expression="messages[0].headers.sequenceNumber == messages[0].headers.sequenceSize"/>With this example, the normal release is possible if the aggregator receives the last message in sequence as defined by the release-strategy-expression.
If that specific message does not arrive, the groupTimeout forces the group to complete after ten seconds, as long as the group contains at least two Messages.
The results of forcing the group to complete depends on the ReleaseStrategy and the send-partial-result-on-expiry.
First, the release strategy is again consulted to see if a normal release is to be made.
While the group has not changed, the ReleaseStrategy can decide to release the group at this time.
If the release strategy still does not release the group, it is expired.
If send-partial-result-on-expiry is true, existing messages in the (partial) MessageGroup are released as a normal aggregator reply message to the output-channel.
Otherwise, it is discarded.
There is a difference between groupTimeout behavior and MessageGroupStoreReaper (see Configuring an Aggregator with XML).
The reaper initiates forced completion for all MessageGroup s in the MessageGroupStore periodically.
The groupTimeout does it for each MessageGroup individually if a new message does not arrive during the groupTimeout.
Also, the reaper can be used to remove empty groups (empty groups are retained in order to discard late messages if expire-groups-upon-completion is false).
Configuring an Aggregator with Annotations
The following example shows an aggregator configured with annotations:
public class Waiter {
  ...
  @Aggregator  (1)
  public Delivery aggregatingMethod(List<OrderItem> items) {
    ...
  }
  @ReleaseStrategy  (2)
  public boolean releaseChecker(List<Message<?>> messages) {
    ...
  }
  @CorrelationStrategy  (3)
  public String correlateBy(OrderItem item) {
    ...
  }
}| 1 | An annotation indicating that this method should be used as an aggregator. It must be specified if this class is used as an aggregator. | 
| 2 | An annotation indicating that this method is used as the release strategy of an aggregator.
If not present on any method, the aggregator uses the SimpleSequenceSizeReleaseStrategy. | 
| 3 | An annotation indicating that this method should be used as the correlation strategy of an aggregator.
If no correlation strategy is indicated, the aggregator uses the HeaderAttributeCorrelationStrategybased onCORRELATION_ID. | 
All of the configuration options provided by the XML element are also available for the @Aggregator annotation.
The aggregator can be either referenced explicitly from XML or, if the @MessageEndpoint is defined on the class, detected automatically through classpath scanning.
Annotation configuration (@Aggregator and others) for the Aggregator component covers only simple use cases, where most default options are sufficient.
If you need more control over those options when using annotation configuration, consider using a @Bean definition for the AggregatingMessageHandler and mark its @Bean method with @ServiceActivator, as the following example shows:
@ServiceActivator(inputChannel = "aggregatorChannel")
@Bean
public MessageHandler aggregator(MessageGroupStore jdbcMessageGroupStore) {
     AggregatingMessageHandler aggregator =
                       new AggregatingMessageHandler(new DefaultAggregatingMessageGroupProcessor(),
                                                 jdbcMessageGroupStore);
     aggregator.setOutputChannel(resultsChannel());
     aggregator.setGroupTimeoutExpression(new ValueExpression<>(500L));
     aggregator.setTaskScheduler(this.taskScheduler);
     return aggregator;
}See Programming Model and Annotations on @Bean Methods for more information.
| Starting with version 4.2, the AggregatorFactoryBeanis available to simplify Java configuration for theAggregatingMessageHandler. | 
Managing State in an Aggregator: MessageGroupStore
Aggregator (and some other patterns in Spring Integration) is a stateful pattern that requires decisions to be made based on a group of messages that have arrived over a period of time, all with the same correlation key.
The design of the interfaces in the stateful patterns (such as ReleaseStrategy) is driven by the principle that the components (whether defined by the framework or by a user) should be able to remain stateless.
All state is carried by the MessageGroup and its management is delegated to the MessageGroupStore.
The MessageGroupStore interface is defined as follows:
public interface MessageGroupStore {
    int getMessageCountForAllMessageGroups();
    int getMarkedMessageCountForAllMessageGroups();
    int getMessageGroupCount();
    MessageGroup getMessageGroup(Object groupId);
    MessageGroup addMessageToGroup(Object groupId, Message<?> message);
    MessageGroup markMessageGroup(MessageGroup group);
    MessageGroup removeMessageFromGroup(Object key, Message<?> messageToRemove);
    MessageGroup markMessageFromGroup(Object key, Message<?> messageToMark);
    void removeMessageGroup(Object groupId);
    void registerMessageGroupExpiryCallback(MessageGroupCallback callback);
    int expireMessageGroups(long timeout);
}For more information, see the Javadoc.
The MessageGroupStore accumulates state information in MessageGroups while waiting for a release strategy to be triggered, and that event might not ever happen.
So, to prevent stale messages from lingering, and for volatile stores to provide a hook for cleaning up when the application shuts down, the MessageGroupStore lets you register callbacks to apply to its MessageGroups when they expire.
The interface is very straightforward, as the following listing shows:
public interface MessageGroupCallback {
    void execute(MessageGroupStore messageGroupStore, MessageGroup group);
}The callback has direct access to the store and the message group so that it can manage the persistent state (for example, by entirely removing the group from the store).
The MessageGroupStore maintains a list of these callbacks, which it applies, on demand, to all messages whose timestamps are earlier than a time supplied as a parameter (see the registerMessageGroupExpiryCallback(..) and expireMessageGroups(..) methods, described earlier).
For more detail, see Managing State in an Aggregator: MessageGroupStore.
| It is important not to use the same MessageGroupStoreinstance in different aggregator components, when you intend to rely on theexpireMessageGroupsfunctionality.
EveryAbstractCorrelatingMessageHandlerregisters its ownMessageGroupCallbackbased on theforceComplete()callback.
This way each group for expiration may be completed or discarded by the wrong aggregator.
Starting with version 5.0.10, aUniqueExpiryCallbackis used from theAbstractCorrelatingMessageHandlerfor the registration callback in theMessageGroupStore.
TheMessageGroupStore, in turn, checks for presence an instance of this class and logs an error with an appropriate message if one is already present in the callbacks set.
This way the Framework disallows usage of theMessageGroupStoreinstance in different aggregators/resequencers to avoid the mentioned side effect of expiration the groups not created by the particular correlation handler. | 
You can call the expireMessageGroups method with a timeout value.
Any message older than the current time minus this value is expired and has the callbacks applied.
Thus, it is the user of the store that defines what is meant by message group “expiry”.
As a convenience for users, Spring Integration provides a wrapper for the message expiry in the form of a MessageGroupStoreReaper, as the following example shows:
<bean id="reaper" class="org...MessageGroupStoreReaper">
    <property name="messageGroupStore" ref="messageStore"/>
    <property name="timeout" value="30000"/>
</bean>
<task:scheduled-tasks scheduler="scheduler">
    <task:scheduled ref="reaper" method="run" fixed-rate="10000"/>
</task:scheduled-tasks>The reaper is a Runnable.
In the preceding example, the message group store’s expire method is called every ten seconds.
The timeout itself is 30 seconds.
| It is important to understand that the 'timeout' property of MessageGroupStoreReaperis an approximate value and is impacted by the the rate of the task scheduler, since this property is only checked on the next scheduled execution of theMessageGroupStoreReapertask.
For example, if the timeout is set for ten minutes but theMessageGroupStoreReapertask is scheduled to run every hour and the last execution of theMessageGroupStoreReapertask happened one minute before the timeout, theMessageGroupdoes not expire for the next 59 minutes.
Consequently, we recommend setting the rate to be at least equal to the value of the timeout or shorter. | 
In addition to the reaper, the expiry callbacks are invoked when the application shuts down through a lifecycle callback in the AbstractCorrelatingMessageHandler.
The AbstractCorrelatingMessageHandler registers its own expiry callback, and this is the link with the boolean flag send-partial-result-on-expiry in the XML configuration of the aggregator.
If the flag is set to true, then, when the expiry callback is invoked, any unmarked messages in groups that are not yet released can be sent on to the output channel.
| When a shared  Some  For more information about the  | 
Flux Aggregator
In version 5.2, the FluxAggregatorMessageHandler component has been introduced.
It is based on the Project Reactor Flux.groupBy() and Flux.window() operators.
The incoming messages are emitted into the FluxSink initiated by the Flux.create() in the constructor of this component.
If the outputChannel is not provided or it is not an instance of ReactiveStreamsSubscribableChannel, the subscription to the main Flux is done from the Lifecycle.start() implementation.
Otherwise it is postponed to the subscription done by the ReactiveStreamsSubscribableChannel implementation.
The messages are grouped by the Flux.groupBy() using a CorrelationStrategy for the group key.
By default, the IntegrationMessageHeaderAccessor.CORRELATION_ID header of the message is consulted.
By default every closed window is released as a Flux in payload of a message to produce.
This message contains all the headers from the first message in the window.
This Flux in the output message payload must be subscribed and processed downstream.
Such a logic can be customized (or superseded) by the setCombineFunction(Function<Flux<Message<?>>, Mono<Message<?>>>) configuration option of the FluxAggregatorMessageHandler.
For example, if we would like to have a List of payloads in the final message, we can configure a Flux.collectList() like this:
fluxAggregatorMessageHandler.setCombineFunction(
                (messageFlux) ->
                        messageFlux
                                .map(Message::getPayload)
                                .collectList()
                                .map(GenericMessage::new));There are several options in the FluxAggregatorMessageHandler to select an appropriate window strategy:
- 
setBoundaryTrigger(Predicate<Message<?>>)- is propagated to theFlux.windowUntil()operator. See its JavaDocs for more information. Has a precedence over all other window options.
- 
setWindowSize(int)andsetWindowSizeFunction(Function<Message<?>, Integer>)- is propagated to theFlux.window(int)orwindowTimeout(int, Duration). By default a window size is calculated from the first message in group and itsIntegrationMessageHeaderAccessor.SEQUENCE_SIZEheader.
- 
setWindowTimespan(Duration)- is propagated to theFlux.window(Duration)orwindowTimeout(int, Duration)depending in the window size configuration.
- 
setWindowConfigurer(Function<Flux<Message<?>>, Flux<Flux<Message<?>>>>)- a function to apply a transformation into the grouped fluxes for any custom window operation not covered by the exposed options.
Since this component is a MessageHandler implementation it can simply be used as a @Bean definition together with a @ServiceActivator messaging annotation.
With Java DSL it can be used from the .handle() EIP-method.
The sample below demonstrates how we can register an IntegrationFlow at runtime and how a FluxAggregatorMessageHandler can be correlated with a splitter upstream:
IntegrationFlow fluxFlow =
        (flow) -> flow
                .split()
                .channel(MessageChannels.flux())
                .handle(new FluxAggregatorMessageHandler());
IntegrationFlowContext.IntegrationFlowRegistration registration =
        this.integrationFlowContext.registration(fluxFlow)
                .register();
@SuppressWarnings("unchecked")
Flux<Message<?>> window =
        registration.getMessagingTemplate()
                .convertSendAndReceive(new Integer[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }, Flux.class);