Skip to content

Commit 6a2ff0c

Browse files
GH-4001: Doc for cooperation with some HZ objects (#4003)
* GH-4001: Doc for cooperation with some HZ objects Fixes #4001 The `IQueue`, `ITopic` and `IExecutorService` can be used with Spring Integration channel as is without any extra component implementations. * Document the cooperation feature with Hazelcast objects via samples * * Add a sample about an Inbound Channel Adapter on the `IQueue` * Fix language in docs Co-authored-by: Gary Russell <grussell@vmware.com> --------- Co-authored-by: Gary Russell <grussell@vmware.com>
1 parent 44000b9 commit 6a2ff0c

File tree

1 file changed

+94
-0
lines changed

1 file changed

+94
-0
lines changed

src/reference/asciidoc/hazelcast.adoc

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -598,3 +598,97 @@ public LockRegistry lockRegistry() {
598598
When used with a shared `MessageGroupStore` (e.g. `Aggregator` store management), the `HazelcastLockRegistry` can be used to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
599599

600600
NOTE: For all the distributed operations the CP Subsystem must be enabled on `HazelcastInstance`.
601+
602+
[[hazelcast-message-channels]]
603+
=== Message Channels with Hazelcast
604+
605+
The Hazelcast `IQueue` and `ITopic` distributed objects are, essentially, messaging primitives and can be use with Spring Integration core components without extra implementations in this Hazelcast module.
606+
607+
The <<./channel.adoc#channel-implementations-queuechannel,`QueueChannel`>> can be supplied by any `java.util.Queue`, including the mentioned Hazelcast distributed `IQueue`:
608+
609+
====
610+
[source,java]
611+
----
612+
@Bean
613+
PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) {
614+
return new QueueChannel(hazelcastInstance.Message<?>>getQueue("springIntegrationQueue"));
615+
}
616+
----
617+
====
618+
619+
Placing this config on several nodes in Hazelcast cluster of the application, will make the `QueueChannel` as distributed and only one node will be able to poll a single `Message` from that `IQueue`.
620+
This works similar to <<./jms.adoc#jms-channel,`PollableJmsChannel`>>, <<./kafka.adoc#kafka-channels,`PollableKafkaChannel`>> or <<./amqp.adoc#amqp-channels,`PollableAmqpChannel`>>.
621+
622+
If the producer side is not a Spring Integration application, there is no way to configure a `QueueChannel`, and therefore the plain Hazelcast `IQueue` API is used to produce the data.
623+
In this case, the `QueueChannel` approach is wrong on the consumer side: an <<./channel-adapter.adoc#channel-adapter-namespace-inbound,Inbound Channel Adapter>> solution must be used instead:
624+
625+
====
626+
[source,java]
627+
----
628+
@Bean
629+
public IQueue<String> myStringHzQueue(HazelcastInstance hazelcastInstance) {
630+
return hazelcastInstance.getQueue("springIntegrationQueue");
631+
}
632+
633+
@Bean
634+
@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel")
635+
Supplier<String> fromHzIQueueSource(IQueue<String> myStringHzQueue) {
636+
return myStringHzQueue::poll;
637+
}
638+
----
639+
====
640+
641+
The `ITopic` abstraction in Hazelcast has similar semantics to a `Topic` in JMS: all subscribers receive published messages.
642+
With a pair of simple `MessageChannel` beans this mechanism is supported as an out-of-the-box feature:
643+
644+
====
645+
[source,java]
646+
----
647+
@Bean
648+
public ITopic<Message<?>> springIntegrationTopic(HazelcastInstance hazelcastInstance,
649+
MessageChannel fromHazelcastTopicChannel) {
650+
651+
ITopic<Message<?>> topic = hazelcastInstance.getTopic("springIntegrationTopic");
652+
topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject()));
653+
return topic;
654+
}
655+
656+
@Bean
657+
public MessageChannel publishToHazelcastTopicChannel(ITopic<Message<?>> springIntegrationTopic) {
658+
return new FixedSubscriberChannel(springIntegrationTopic::publish);
659+
}
660+
661+
@Bean
662+
public MessageChannel fromHazelcastTopicChannel() {
663+
return new DirectChannel();
664+
}
665+
----
666+
====
667+
668+
The `FixedSubscriberChannel` is an optimized variant of `DirectChannel`, which requires a `MessageHandler` on initialization.
669+
Since the `MessageHandler` is a functional interface a simple lambda for the `handleMessage` method can be provided.
670+
When a message is sent to the `publishToHazelcastTopicChannel` it is just published onto the Hazelcast `ITopic`.
671+
The `com.hazelcast.topic.MessageListener` is a functional interface, too, hence a lambda to the `ITopic#addMessageListener` can be provided.
672+
So, a subscriber to the `fromHazelcastTopicChannel` will consume all messages sent to the mentioned `ITopic`.
673+
674+
An `ExecutorChannel` can be supplied with an `IExecutorService`.
675+
For example, with respective configuration a cluster-wide singleton can be achieved:
676+
677+
====
678+
[source,java]
679+
----
680+
@Bean
681+
public HazelcastInstance hazelcastInstance() {
682+
return Hazelcast.newHazelcastInstance(
683+
new Config()
684+
.addExecutorConfig(new ExecutorConfig()
685+
.setName("singletonExecutor")
686+
.setPoolSize(1)));
687+
}
688+
689+
@Bean
690+
public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) {
691+
return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor"));
692+
}
693+
----
694+
====

0 commit comments

Comments
 (0)