Skip to content

GH-4001: Doc for cooperation with some HZ objects #4003

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 6, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions src/reference/asciidoc/hazelcast.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -598,3 +598,97 @@ public LockRegistry lockRegistry() {
When used with a shared `MessageGroupStore` (e.g. `Aggregator` store management), the `HazelcastLockRegistry` can be used to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.

NOTE: For all the distributed operations the CP Subsystem must be enabled on `HazelcastInstance`.

[[hazelcast-message-channels]]
=== Message Channels with Hazelcast

The Hazelcast `IQueue` and `ITopic` distributed objects are, essentially, messaging primitives and can be use with Spring Integration core components without extra implementations in this Hazelcast module.

The <<./channel.adoc#channel-implementations-queuechannel,`QueueChannel`>> can be supplied by any `java.util.Queue`, including the mentioned Hazelcast distributed `IQueue`:

====
[source,java]
----
@Bean
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
return new QueueChannel(hazelcastInstance.Message<?>>getQueue("springIntegrationQueue"));
}
----
====

Placing this config on several nodes in Hazelcast cluster of the application, will make the `QueueChannel` as distributed and only one node will be able to poll a single `Message` from that `IQueue`.
This works similar to <<./jms.adoc#jms-channel,`PollableJmsChannel`>>, <<./kafka.adoc#kafka-channels,`PollableKafkaChannel`>> or <<./amqp.adoc#amqp-channels,`PollableAmqpChannel`>>.

If the producer side is not a Spring Integration application, there is no way to configure a `QueueChannel`, and therefore the plain Hazelcast `IQueue` API is used to produce the data.
In this case, the `QueueChannel` approach is wrong on the consumer side: an <<./channel-adapter.adoc#channel-adapter-namespace-inbound,Inbound Channel Adapter>> solution must be used instead:

====
[source,java]
----
@Bean
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
return hazelcastInstance.getQueue("springIntegrationQueue");
}

@Bean
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
return myStringHzQueue::poll;
}
----
====

The `ITopic` abstraction in Hazelcast has similar semantics to a `Topic` in JMS: all subscribers receive published messages.
With a pair of simple `MessageChannel` beans this mechanism is supported as an out-of-the-box feature:

====
[source,java]
----
@Bean
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
MessageChannel fromHazelcastTopicChannel) {

ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
return topic;
}

@Bean
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
return new FixedSubscriberChannel(springIntegrationTopic::publish);
}

@Bean
public MessageChannel fromHazelcastTopicChannel() {
return new DirectChannel();
}
----
====

The `FixedSubscriberChannel` is an optimized variant of `DirectChannel`, which requires a `MessageHandler` on initialization.
Since the `MessageHandler` is a functional interface a simple lambda for the `handleMessage` method can be provided.
When a message is sent to the `publishToHazelcastTopicChannel` it is just published onto the Hazelcast `ITopic`.
The `com.hazelcast.topic.MessageListener` is a functional interface, too, hence a lambda to the `ITopic#addMessageListener` can be provided.
So, a subscriber to the `fromHazelcastTopicChannel` will consume all messages sent to the mentioned `ITopic`.

An `ExecutorChannel` can be supplied with an `IExecutorService`.
For example, with respective configuration a cluster-wide singleton can be achieved:

====
[source,java]
----
@Bean
public HazelcastInstance hazelcastInstance() {
return Hazelcast.newHazelcastInstance(
new Config()
.addExecutorConfig(new ExecutorConfig()
.setName("singletonExecutor")
.setPoolSize(1)));
}

@Bean
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
}
----
====