diff --git a/src/reference/asciidoc/hazelcast.adoc b/src/reference/asciidoc/hazelcast.adoc index 35031c13c33..2628460c322 100644 --- a/src/reference/asciidoc/hazelcast.adoc +++ b/src/reference/asciidoc/hazelcast.adoc @@ -598,3 +598,97 @@ public LockRegistry lockRegistry() { When used with a shared `MessageGroupStore` (e.g. `Aggregator` store management), the `HazelcastLockRegistry` can be used to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time. NOTE: For all the distributed operations the CP Subsystem must be enabled on `HazelcastInstance`. + +[[hazelcast-message-channels]] +=== Message Channels with Hazelcast + +The Hazelcast `IQueue` and `ITopic` distributed objects are, essentially, messaging primitives and can be use with Spring Integration core components without extra implementations in this Hazelcast module. + +The <<./channel.adoc#channel-implementations-queuechannel,`QueueChannel`>> can be supplied by any `java.util.Queue`, including the mentioned Hazelcast distributed `IQueue`: + +==== +[source,java] +---- +@Bean +PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) { + return new QueueChannel(hazelcastInstance.Message>getQueue("springIntegrationQueue")); +} +---- +==== + +Placing this config on several nodes in Hazelcast cluster of the application, will make the `QueueChannel` as distributed and only one node will be able to poll a single `Message` from that `IQueue`. +This works similar to <<./jms.adoc#jms-channel,`PollableJmsChannel`>>, <<./kafka.adoc#kafka-channels,`PollableKafkaChannel`>> or <<./amqp.adoc#amqp-channels,`PollableAmqpChannel`>>. + +If the producer side is not a Spring Integration application, there is no way to configure a `QueueChannel`, and therefore the plain Hazelcast `IQueue` API is used to produce the data. +In this case, the `QueueChannel` approach is wrong on the consumer side: an <<./channel-adapter.adoc#channel-adapter-namespace-inbound,Inbound Channel Adapter>> solution must be used instead: + +==== +[source,java] +---- +@Bean +public IQueue myStringHzQueue(HazelcastInstance hazelcastInstance) { + return hazelcastInstance.getQueue("springIntegrationQueue"); +} + +@Bean +@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel") +Supplier fromHzIQueueSource(IQueue myStringHzQueue) { + return myStringHzQueue::poll; +} +---- +==== + +The `ITopic` abstraction in Hazelcast has similar semantics to a `Topic` in JMS: all subscribers receive published messages. +With a pair of simple `MessageChannel` beans this mechanism is supported as an out-of-the-box feature: + +==== +[source,java] +---- +@Bean +public ITopic> springIntegrationTopic(HazelcastInstance hazelcastInstance, + MessageChannel fromHazelcastTopicChannel) { + + ITopic> topic = hazelcastInstance.getTopic("springIntegrationTopic"); + topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject())); + return topic; +} + +@Bean +public MessageChannel publishToHazelcastTopicChannel(ITopic> springIntegrationTopic) { + return new FixedSubscriberChannel(springIntegrationTopic::publish); +} + +@Bean +public MessageChannel fromHazelcastTopicChannel() { + return new DirectChannel(); +} +---- +==== + +The `FixedSubscriberChannel` is an optimized variant of `DirectChannel`, which requires a `MessageHandler` on initialization. +Since the `MessageHandler` is a functional interface a simple lambda for the `handleMessage` method can be provided. +When a message is sent to the `publishToHazelcastTopicChannel` it is just published onto the Hazelcast `ITopic`. +The `com.hazelcast.topic.MessageListener` is a functional interface, too, hence a lambda to the `ITopic#addMessageListener` can be provided. +So, a subscriber to the `fromHazelcastTopicChannel` will consume all messages sent to the mentioned `ITopic`. + +An `ExecutorChannel` can be supplied with an `IExecutorService`. +For example, with respective configuration a cluster-wide singleton can be achieved: + +==== +[source,java] +---- +@Bean +public HazelcastInstance hazelcastInstance() { + return Hazelcast.newHazelcastInstance( + new Config() + .addExecutorConfig(new ExecutorConfig() + .setName("singletonExecutor") + .setPoolSize(1))); +} + +@Bean +public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) { + return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor")); +} +---- +====