Class CorrelatingMessageBarrier

All Implemented Interfaces:
org.reactivestreams.Subscriber<Message<?>>, Aware, BeanFactoryAware, BeanNameAware, DisposableBean, InitializingBean, ApplicationContextAware, Ordered, ComponentSourceAware, ExpressionCapable, Orderable, MessageSource<Object>, IntegrationPattern, NamedComponent, IntegrationManagement, TrackableComponent, MessageHandler, reactor.core.CoreSubscriber<Message<?>>

public class CorrelatingMessageBarrier extends AbstractMessageHandler implements MessageSource<Object>
This Endpoint serves as a barrier for messages that should not be processed yet. The decision when a message can be processed is delegated to a ReleaseStrategy. When a message can be processed it is up to the client to take care of the locking (potentially from the ReleaseStrategy.canRelease(MessageGroup) method).

This class differs from AbstractCorrelatingMessageHandler in that it completely decouples the receiver and the sender. It can be applied in scenarios where completion of a message group is not well-defined but only a certain amount of messages for any given correlation key may be processed at a time.

The messages will be stored in a MessageGroupStore for each correlation key.

Author:
Iwein Fuld, Oleg Zhurakousky, Gary Russell, Artem Bilan, Trung Pham, Glenn Renfro
See Also:
  • Constructor Details

    • CorrelatingMessageBarrier

      public CorrelatingMessageBarrier()
    • CorrelatingMessageBarrier

      public CorrelatingMessageBarrier(MessageGroupStore store)
  • Method Details

    • setCorrelationStrategy

      public void setCorrelationStrategy(CorrelationStrategy correlationStrategy)
      Set the CorrelationStrategy to be used to determine the correlation key for incoming messages.
      Parameters:
      correlationStrategy - The correlation strategy.
    • setReleaseStrategy

      public void setReleaseStrategy(ReleaseStrategy releaseStrategy)
      Set the ReleaseStrategy that should be used when deciding if a group in this barrier may be released.
      Parameters:
      releaseStrategy - The release strategy.
    • onInit

      protected void onInit()
      Description copied from class: IntegrationObjectSupport
      Subclasses may implement this for initialization logic.
      Overrides:
      onInit in class IntegrationObjectSupport
    • handleMessageInternal

      protected void handleMessageInternal(Message<?> message)
      Specified by:
      handleMessageInternal in class AbstractMessageHandler
    • receive

      public @Nullable Message<Object> receive()
      Description copied from interface: MessageSource
      Retrieve the next available message from this source. Returns null if no message is available.
      Specified by:
      receive in interface MessageSource<Object>
      Returns:
      The message or null.