Hazelcast Support
Spring Integration provides channel adapters and other utility components to interact with an in-memory data grid Hazelcast.
You need to include this dependency into your project:
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-hazelcast</artifactId>
    <version>6.1.1</version>
</dependency>compile "org.springframework.integration:spring-integration-hazelcast:6.1.1"
The XML namespace and schemaLocation definitions for Hazelcast components are:
xmlns:int-hazelcast="http://www.springframework.org/schema/integration/hazelcast"
xsi:schemaLocation="http://www.springframework.org/schema/integration/hazelcast
          https://www.springframework.org/schema/integration/hazelcast/spring-integration-hazelcast.xsd"Hazelcast Event-driven Inbound Channel Adapter
Hazelcast provides distributed data structures such as:
- 
com.hazelcast.map.IMap
- 
com.hazelcast.multimap.MultiMap
- 
com.hazelcast.collection.IList
- 
com.hazelcast.collection.ISet
- 
com.hazelcast.collection.IQueue
- 
com.hazelcast.topic.ITopic
- 
com.hazelcast.replicatedmap.ReplicatedMap
It also provides event listeners in order to listen to modifications made to these data structures.
- 
com.hazelcast.core.EntryListener<K, V>
- 
com.hazelcast.collection.ItemListener
- 
com.hazelcast.topic.MessageListener
The Hazelcast Event-Driven Inbound Channel Adapter listens to related cache events and sends event messages to the defined channel. It supports both XML and JavaConfig driven configurations.
XML Configuration :
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                      cache="map"
                      cache-events="UPDATED, REMOVED"
                      cache-listening-policy="SINGLE" />The Hazelcast Event-Driven Inbound Channel Adapter requires the following attributes:
- 
channel: Specifies the channel to which messages are sent;
- 
cache: Specifies the distributed Object reference which is listened to. It is a mandatory attribute;
- 
cache-events: Specifies cache events which are listened for. It is an optional attribute and its default value isADDED. Its supported values are as follows :
- 
Supported cache event types for IMapandMultiMap:ADDED,REMOVED,UPDATED,EVICTED,EVICT_ALLandCLEAR_ALL;
- 
Supported cache event types for ReplicatedMap:ADDED,REMOVED,UPDATED,EVICTED;
- 
Supported cache event types for IList,ISetandIQueue:ADDED,REMOVED. There are no cache event types forITopic.
- 
cache-listening-policy: Specifies the cache listening policy asSINGLEorALL. It is an optional attribute and its default value isSINGLE. Each Hazelcast inbound channel adapter listening to the same cache object with the same cache-events attribute, can receive a single event message or all event messages. If it isALL, all Hazelcast inbound channel adapters listening to the same cache object with the same cache-events attribute, will receive all event messages. If it isSINGLE, they will receive unique event messages.
Some configuration samples:
<int:channel id="mapChannel"/>
<int-hazelcast:inbound-channel-adapter channel="mapChannel"
                              cache="map"
                              cache-events="UPDATED, REMOVED" />
<bean id="map" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="map"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean><int-hazelcast:inbound-channel-adapter channel="multiMapChannel"
                              cache="multiMap"
                              cache-events="ADDED, REMOVED, CLEAR_ALL" />
<bean id="multiMap" factory-bean="instance" factory-method="getMultiMap">
    <constructor-arg value="multiMap"/>
</bean><int-hazelcast:inbound-channel-adapter  channel="listChannel"
                               cache="list"
                               cache-events="ADDED, REMOVED"
                               cache-listening-policy="ALL" />
<bean id="list" factory-bean="instance" factory-method="getList">
    <constructor-arg value="list"/>
</bean><int-hazelcast:inbound-channel-adapter channel="setChannel" cache="set" />
<bean id="set" factory-bean="instance" factory-method="getSet">
    <constructor-arg value="set"/>
</bean><int-hazelcast:inbound-channel-adapter  channel="queueChannel"
                               cache="queue"
                               cache-events="REMOVED"
                               cache-listening-policy="ALL" />
<bean id="queue" factory-bean="instance" factory-method="getQueue">
    <constructor-arg value="queue"/>
</bean><int-hazelcast:inbound-channel-adapter channel="topicChannel" cache="topic" />
<bean id="topic" factory-bean="instance" factory-method="getTopic">
    <constructor-arg value="topic"/>
</bean><int-hazelcast:inbound-channel-adapter channel="replicatedMapChannel"
                              cache="replicatedMap"
                              cache-events="ADDED, UPDATED, REMOVED"
                              cache-listening-policy="SINGLE"  />
<bean id="replicatedMap" factory-bean="instance" factory-method="getReplicatedMap">
    <constructor-arg value="replicatedMap"/>
</bean>Java Configuration Sample:
The following sample shows a DistributedMap configuration.
The same configuration can be used for other distributed data structures(IMap, MultiMap, ReplicatedMap, IList, ISet, IQueue and ITopic):
@Bean
public PollableChannel distributedMapChannel() {
    return new QueueChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
    return hazelcastInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer() {
    final HazelcastEventDrivenMessageProducer producer = new HazelcastEventDrivenMessageProducer(distributedMap());
    producer.setOutputChannel(distributedMapChannel());
    producer.setCacheEventTypes("ADDED,REMOVED,UPDATED,CLEAR_ALL");
    producer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
    return producer;
}
Hazelcast Continuous Query Inbound Channel Adapter
Hazelcast Continuous Query enables listening to modifications performed on specific map entries. The Hazelcast Continuous Query Inbound Channel Adapter is an event-driven channel adapter which listens to the related distributed map events in the light of the defined predicate.
@Bean
public PollableChannel cqDistributedMapChannel() {
    return new QueueChannel();
}
@Bean
public IMap<Integer, String> cqDistributedMap() {
    return hazelcastInstance().getMap("CQ_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastContinuousQueryMessageProducer hazelcastContinuousQueryMessageProducer() {
    final HazelcastContinuousQueryMessageProducer producer =
        new HazelcastContinuousQueryMessageProducer(cqDistributedMap(), "surname=TestSurname");
    producer.setOutputChannel(cqDistributedMapChannel());
    producer.setCacheEventTypes("UPDATED");
    producer.setIncludeValue(false);
    return producer;
}
<int:channel id="cqMapChannel"/>
<int-hazelcast:cq-inbound-channel-adapter
                channel="cqMapChannel"
                cache="cqMap"
                cache-events="UPDATED, REMOVED"
                predicate="name=TestName AND surname=TestSurname"
                include-value="true"
                cache-listening-policy="SINGLE"/>
<bean id="cqMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="cqMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>It supports six attributes as follows:
- 
channel: Specifies the channel to which messages are sent;
- 
cache: Specifies the distributed Map reference which is listened to. Mandatory;
- 
cache-events: Specifies cache events which are listened for. Optional attribute withADDEDbeing its default value. Supported values areADDED,REMOVED,UPDATED,EVICTED,EVICT_ALLandCLEAR_ALL;
- 
predicate: Specifies a predicate to listen to the modifications performed on specific map entries. Mandatory;
- 
include-value: Specifies including the value and oldValue in a continuous query result. Optional withtruebeing the default;
- 
cache-listening-policy: Specifies the cache listening policy asSINGLEorALL. Optional with the default value beingSINGLE. Each Hazelcast CQ inbound channel adapter listening to the same cache object with the same cache-events attribute, can receive a single event message or all event messages. If it isALL, all Hazelcast CQ inbound channel adapters listening to the same cache object with the same cache-events attribute, will receive all event messages. If it isSINGLE, they will receive unique event messages.
Hazelcast Cluster Monitor Inbound Channel Adapter
A Hazelcast Cluster Monitor supports listening to modifications performed on the cluster. The Hazelcast Cluster Monitor Inbound Channel Adapter is an event-driven channel adapter and listens to related Membership, Distributed Object, Migration, Lifecycle and Client events:
@Bean
public PollableChannel eventChannel() {
    return new QueueChannel();
}
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public HazelcastClusterMonitorMessageProducer hazelcastClusterMonitorMessageProducer() {
    HazelcastClusterMonitorMessageProducer producer = new HazelcastClusterMonitorMessageProducer(hazelcastInstance());
    producer.setOutputChannel(eventChannel());
    producer.setMonitorEventTypes("DISTRIBUTED_OBJECT");
    return producer;
}
<int:channel id="monitorChannel"/>
<int-hazelcast:cm-inbound-channel-adapter
                 channel="monitorChannel"
                 hazelcast-instance="instance"
                 monitor-types="MEMBERSHIP, DISTRIBUTED_OBJECT" />
<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>It supports three attributes as follows :
- 
channel: Specifies the channel to which messages are sent;
- 
hazelcast-instance: Specifies the Hazelcast Instance reference to listen for cluster events. It is a mandatory attribute;
- 
monitor-types: Specifies the monitor types which are listened for. It is an optional attribute withMEMBERSHIPbeing the default value. Supported values areMEMBERSHIP,DISTRIBUTED_OBJECT,MIGRATION,LIFECYCLE,CLIENT.
Hazelcast Distributed SQL Inbound Channel Adapter
Hazelcast allows running distributed queries on the distributed map. The Hazelcast Distributed SQL Inbound Channel Adapter is a polling inbound channel adapter. It runs the defined distributed-sql command and returns results depending on the iteration type.
@Bean
public PollableChannel dsDistributedMapChannel() {
    return new QueueChannel();
}
@Bean
public IMap<Integer, String> dsDistributedMap() {
    return hazelcastInstance().getMap("DS_Distributed_Map");
}
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
@InboundChannelAdapter(value = "dsDistributedMapChannel", poller = @Poller(maxMessagesPerPoll = "1"))
public HazelcastDistributedSQLMessageSource hazelcastDistributedSQLMessageSource() {
    final HazelcastDistributedSQLMessageSource messageSource =
        new HazelcastDistributedSQLMessageSource(dsDistributedMap(),
            "name='TestName' AND surname='TestSurname'");
    messageSource.setIterationType(DistributedSQLIterationType.ENTRY);
    return messageSource;
}
<int:channel id="dsMapChannel"/>
<int-hazelcast:ds-inbound-channel-adapter
            channel="dsMapChannel"
            cache="dsMap"
            iteration-type="ENTRY"
            distributed-sql="active=false OR age >= 25 OR name = 'TestName'">
    <int:poller fixed-delay="100"/>
</int-hazelcast:ds-inbound-channel-adapter>
<bean id="dsMap" factory-bean="instance" factory-method="getMap">
    <constructor-arg value="dsMap"/>
</bean>
<bean id="instance" class="com.hazelcast.core.Hazelcast"
            factory-method="newHazelcastInstance">
    <constructor-arg>
        <bean class="com.hazelcast.config.Config" />
    </constructor-arg>
</bean>It requires a poller and supports four attributes:
- 
channel: Specifies the channel to which messages are sent. It is a mandatory attribute;
- 
cache: Specifies the distributedIMapreference which is queried. It is mandatory attribute;
- 
iteration-type: Specifies result type. Distributed SQL can be run onEntrySet,KeySet,LocalKeySetorValues. It is an optional attribute withVALUEbeing the default. Supported values areENTRY, `KEY,LOCAL_KEYandVALUE;
- 
distributed-sql: Specifies the where clause of the sql statement. It is a mandatory attribute.
Hazelcast Outbound Channel Adapter
The Hazelcast Outbound Channel Adapter listens to its defined channel and writes incoming messages to related distributed cache.
It expects one of cache, cache-expression or HazelcastHeaders.CACHE_NAME for distributed object definition.
Supported Distributed Objects are: IMap, MultiMap, ReplicatedMap, IList, ISet, IQueue and ITopic.
@Bean
public MessageChannel distributedMapChannel() {
    return new DirectChannel();
}
@Bean
public IMap<Integer, String> distributedMap() {
    return hzInstance().getMap("Distributed_Map");
}
@Bean
public HazelcastInstance hzInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
@ServiceActivator(inputChannel = "distributedMapChannel")
public HazelcastCacheWritingMessageHandler hazelcastCacheWritingMessageHandler() {
    HazelcastCacheWritingMessageHandler handler = new HazelcastCacheWritingMessageHandler();
    handler.setDistributedObject(distributedMap());
    handler.setKeyExpression(new SpelExpressionParser().parseExpression("payload.id"));
    handler.setExtractPayload(true);
    return handler;
}
<int-hazelcast:outbound-channel-adapter channel="mapChannel"
                    cache-expression="headers['CACHE_HEADER']"
                    key-expression="payload.key"
                    extract-payload="true"/>It requires the following attributes :
- 
channel: Specifies the channel to which messages are sent;
- 
cache: Specifies the distributed object reference. Optional;
- 
cache-expression: Specifies the distributed object via Spring Expression Language (SpEL). Optional;
- 
key-expression: Specifies the key of a key-value pair via Spring Expression Language (SpEL). Optional and required for only forIMap,MultiMapandReplicatedMapdistributed data structures.
- 
extract-payload: Specifies whether to send the whole message or just the payload. Optional attribute withtruebeing the default. If it is true, just the payload will be written to the distributed object. Otherwise, the whole message will be written by converting both message headers and payload.
By setting distributed object name in the header, messages can be written to different distributed objects via same channel.
If cache or cache-expression attributes are not defined, a HazelcastHeaders.CACHE_NAME header has to be set in a request Message.
Hazelcast Leader Election
If leader election is needed (e.g. for highly available message consumer where only one node should receive messages) a Hazelcast-based LeaderInitiator can be used:
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public LeaderInitiator initiator() {
    return new LeaderInitiator(hazelcastInstance());
}
When a node is elected leader it will send an OnGrantedEvent to all application listeners.
Hazelcast Message Store
For distributed messaging state management, for example for a persistent QueueChannel or tracking Aggregator message groups, the HazelcastMessageStore implementation is provided:
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public MessageGroupStore messageStore() {
    return new HazelcastMessageStore(hazelcastInstance());
}
By default, the SPRING_INTEGRATION_MESSAGE_STORE IMap is used to store messages and groups as a key/value.
Any custom IMap can be provided to the HazelcastMessageStore.
Hazelcast Metadata Store
An implementation of a ListenableMetadataStore is available using a backing Hazelcast IMap.
The default map is created with a name SPRING_INTEGRATION_METADATA_STORE which can be customized.
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public MetadataStore metadataStore() {
    return new HazelcastMetadataStore(hazelcastInstance());
}
The HazelcastMetadataStore implements ListenableMetadataStore which allows you to register your own listeners of type MetadataStoreListener to listen for events via addListener(MetadataStoreListener callback).
Hazelcast Lock Registry
An implementation of a LockRegistry is available using a backing Hazelcast distributed ILock support:
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance();
}
@Bean
public LockRegistry lockRegistry() {
    return new HazelcastLockRegistry(hazelcastInstance());
}
When used with a shared MessageGroupStore (e.g. Aggregator store management), the HazelcastLockRegistry can be used to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
| For all the distributed operations the CP Subsystem must be enabled on HazelcastInstance. | 
Message Channels with Hazelcast
The Hazelcast IQueue and ITopic distributed objects are, essentially, messaging primitives and can be use with Spring Integration core components without extra implementations in this Hazelcast module.
The QueueChannel can be supplied by any java.util.Queue, including the mentioned Hazelcast distributed IQueue:
@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
    return new QueueChannel(hazelcastInstance.Message<?>>getQueue("springIntegrationQueue"));
}
Placing this config on several nodes in Hazelcast cluster of the application, will make the QueueChannel as distributed and only one node will be able to poll a single Message from that IQueue.
This works similar to PollableJmsChannel, PollableKafkaChannel or PollableAmqpChannel.
If the producer side is not a Spring Integration application, there is no way to configure a QueueChannel, and therefore the plain Hazelcast IQueue API is used to produce the data.
In this case, the QueueChannel approach is wrong on the consumer side: an Inbound Channel Adapter solution must be used instead:
@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
    return hazelcastInstance.getQueue("springIntegrationQueue");
}
@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
    return myStringHzQueue::poll;
}
The ITopic abstraction in Hazelcast has similar semantics to a Topic in JMS: all subscribers receive published messages.
With a pair of simple MessageChannel beans this mechanism is supported as an out-of-the-box feature:
@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
        MessageChannel fromHazelcastTopicChannel) {
    ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
	topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
	return topic;
}
@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
    return new FixedSubscriberChannel(springIntegrationTopic::publish);
}
@Bean
public MessageChannel fromHazelcastTopicChannel() {
    return new DirectChannel();
}
The FixedSubscriberChannel is an optimized variant of DirectChannel, which requires a MessageHandler on initialization.
Since the MessageHandler is a functional interface a simple lambda for the handleMessage method can be provided.
When a message is sent to the publishToHazelcastTopicChannel it is just published onto the Hazelcast ITopic.
The com.hazelcast.topic.MessageListener is a functional interface, too, hence a lambda to the ITopic#addMessageListener can be provided.
So, a subscriber to the fromHazelcastTopicChannel will consume all messages sent to the mentioned ITopic.
An ExecutorChannel can be supplied with an IExecutorService.
For example, with respective configuration a cluster-wide singleton can be achieved:
@Bean
public HazelcastInstance hazelcastInstance() {
    return Hazelcast.newHazelcastInstance(
                new Config()
                    .addExecutorConfig(new ExecutorConfig()
                         .setName("singletonExecutor")
                         .setPoolSize(1)));
}
@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
    return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}