From 092aa48cf2050407a0f58cc100455789a9d01a36 Mon Sep 17 00:00:00 2001 From: abilan Date: Thu, 2 Feb 2023 16:28:38 -0500 Subject: [PATCH 1/3] GH-4001: Doc for cooperation with some HZ objects Fixes https://github.com/spring-projects/spring-integration/issues/4001 The `IQueue`, `ITopic` and `IExecutorService` can be used with Spring Integration channel as is without any extra component implementations. * Document the cooperation feature with Hazelcast objects via samples --- src/reference/asciidoc/hazelcast.adoc | 75 +++++++++++++++++++++++++++ 1 file changed, 75 insertions(+) diff --git a/src/reference/asciidoc/hazelcast.adoc b/src/reference/asciidoc/hazelcast.adoc index 35031c13c33..aa2af278195 100644 --- a/src/reference/asciidoc/hazelcast.adoc +++ b/src/reference/asciidoc/hazelcast.adoc @@ -598,3 +598,78 @@ public LockRegistry lockRegistry() { When used with a shared `MessageGroupStore` (e.g. `Aggregator` store management), the `HazelcastLockRegistry` can be used to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time. NOTE: For all the distributed operations the CP Subsystem must be enabled on `HazelcastInstance`. + +[[hazelcast-message-channels]] +=== Message Channels with Hazelcast + +The Hazelcast `IQueue` and `ITopic` distributed objects are, essentially, messaging primitives and can be use with Spring Integration core components without extra implementations in this Hazelcast module. + +The <<./channel.adoc#channel-implementations-queuechannel,`QueueChannel`>> can be supplied by any `java.util.Queue`, including the mentioned Hazelcast distributed `IQueue`: + +==== +[source,java] +---- +@Bean +PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) { + return new QueueChannel(hazelcastInstance.Message>getQueue("springIntegrationQueue")); +} +---- +==== + +Placing this config on several nodes in Hazelcast cluster of the application, will make the `QueueChannel` as distributed and only one node will be able to poll a single `Message` from that `IQueue`. +This works similar to <<./jms.adoc#jms-channel,`PollableJmsChannel`>>, <<./kafka.adoc#kafka-channels,`PollableKafkaChannel`>> or <<./amqp.adoc#amqp-channels,`PollableAmqpChannel`>>. + +An `ITopic` abstraction in Hazelcast has similar semantics to a `Topic` in JMS: all subscribers receive published messages. +With a pair of simple `MessageChannel` beans this mechanism as an out-of-the-box feature: + +==== +[source,java] +---- +@Bean +public ITopic> springIntegrationTopic(HazelcastInstance hazelcastInstance, + MessageChannel fromHazelcastTopicChannel) { + + ITopic> topic = hazelcastInstance.getTopic("springIntegrationTopic"); + topic.addMessageListener(m -> fromHazelcastTopicChannel.send(m.getMessageObject())); + return topic; +} + +@Bean +public MessageChannel publishToHazelcastTopicChannel(ITopic> springIntegrationTopic) { + return new FixedSubscriberChannel(springIntegrationTopic::publish); +} + +@Bean +public MessageChannel fromHazelcastTopicChannel() { + return new DirectChannel(); +} +---- +==== + +The `FixedSubscriberChannel` is an optimized variant of `DirectChannel`, which requires a `MessageHandler` on initialization. +Since the `MessageHandler` is a functional interface a simple lambda for the `handleMessage` method can be provided. +When a message is sent to the `publishToHazelcastTopicChannel` it is just published onto the Hazelcast `ITopic`. +The `com.hazelcast.topic.MessageListener` is a functional interface, too, hence a lambda to the `ITopic#addMessageListener` can be provided. +So, when ever in the Hazelcast the `fromHazelcastTopicChannel` is subscribed, it is going to consume a message sent to the mentioned `ITopic`. + +An `ExecutorChannel` can be supplied with an `IExecutorService`. +For example, with respective configuration a cluster-wide singleton can be achieved: + +==== +[source,java] +---- +@Bean +public HazelcastInstance hazelcastInstance() { + return Hazelcast.newHazelcastInstance( + new Config() + .addExecutorConfig(new ExecutorConfig() + .setName("singletonExecutor") + .setPoolSize(1))); +} + +@Bean +public MessageChannel hazelcastSingletonExecutorChannel(HazelcastInstance hazelcastInstance) { + return new ExecutorChannel(hazelcastInstance.getExecutorService("singletonExecutor")); +} +---- +==== From e75e3a9b156497c87822e8ae4f192a68c99663fc Mon Sep 17 00:00:00 2001 From: abilan Date: Mon, 6 Feb 2023 11:34:06 -0500 Subject: [PATCH 2/3] * Add a sample about an Inbound Channel Adapter on the `IQueue` --- src/reference/asciidoc/hazelcast.adoc | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/reference/asciidoc/hazelcast.adoc b/src/reference/asciidoc/hazelcast.adoc index aa2af278195..c4238a08976 100644 --- a/src/reference/asciidoc/hazelcast.adoc +++ b/src/reference/asciidoc/hazelcast.adoc @@ -619,6 +619,25 @@ PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) { Placing this config on several nodes in Hazelcast cluster of the application, will make the `QueueChannel` as distributed and only one node will be able to poll a single `Message` from that `IQueue`. This works similar to <<./jms.adoc#jms-channel,`PollableJmsChannel`>>, <<./kafka.adoc#kafka-channels,`PollableKafkaChannel`>> or <<./amqp.adoc#amqp-channels,`PollableAmqpChannel`>>. +If the producer side is not a Spring Integration, there is no way to configure a `QueueChannel`, and rather a plain Hazelcast `IQueue` API is used to produce the data. +In this case a `QueueChannel` approach is wrong on the consumer side: an <<./channel-adapter.adoc#channel-adapter-namespace-inbound,Inbound Channel Adapter>> solution must be used instead: + +==== +[source,java] +---- +@Bean +public IQueue myStringHzQueue(HazelcastInstance hazelcastInstance) { + return hazelcastInstance.getQueue("springIntegrationQueue"); +} + +@Bean +@InboundChannelAdapter(channel = "stringValuesFromHzQueueChannel") +Supplier fromHzIQueueSource(IQueue myStringHzQueue) { + return myStringHzQueue::poll; +} +---- +==== + An `ITopic` abstraction in Hazelcast has similar semantics to a `Topic` in JMS: all subscribers receive published messages. With a pair of simple `MessageChannel` beans this mechanism as an out-of-the-box feature: From 552c652b1dbcc8b5a71d4ffd7d554d28e9162ce0 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 6 Feb 2023 12:00:27 -0500 Subject: [PATCH 3/3] Fix language in docs Co-authored-by: Gary Russell --- src/reference/asciidoc/hazelcast.adoc | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/reference/asciidoc/hazelcast.adoc b/src/reference/asciidoc/hazelcast.adoc index c4238a08976..2628460c322 100644 --- a/src/reference/asciidoc/hazelcast.adoc +++ b/src/reference/asciidoc/hazelcast.adoc @@ -619,8 +619,8 @@ PollableChannel hazelcastQueueChannel(HazelcastInstance hazelcastInstance) { Placing this config on several nodes in Hazelcast cluster of the application, will make the `QueueChannel` as distributed and only one node will be able to poll a single `Message` from that `IQueue`. This works similar to <<./jms.adoc#jms-channel,`PollableJmsChannel`>>, <<./kafka.adoc#kafka-channels,`PollableKafkaChannel`>> or <<./amqp.adoc#amqp-channels,`PollableAmqpChannel`>>. -If the producer side is not a Spring Integration, there is no way to configure a `QueueChannel`, and rather a plain Hazelcast `IQueue` API is used to produce the data. -In this case a `QueueChannel` approach is wrong on the consumer side: an <<./channel-adapter.adoc#channel-adapter-namespace-inbound,Inbound Channel Adapter>> solution must be used instead: +If the producer side is not a Spring Integration application, there is no way to configure a `QueueChannel`, and therefore the plain Hazelcast `IQueue` API is used to produce the data. +In this case, the `QueueChannel` approach is wrong on the consumer side: an <<./channel-adapter.adoc#channel-adapter-namespace-inbound,Inbound Channel Adapter>> solution must be used instead: ==== [source,java] @@ -638,8 +638,8 @@ Supplier fromHzIQueueSource(IQueue myStringHzQueue) { ---- ==== -An `ITopic` abstraction in Hazelcast has similar semantics to a `Topic` in JMS: all subscribers receive published messages. -With a pair of simple `MessageChannel` beans this mechanism as an out-of-the-box feature: +The `ITopic` abstraction in Hazelcast has similar semantics to a `Topic` in JMS: all subscribers receive published messages. +With a pair of simple `MessageChannel` beans this mechanism is supported as an out-of-the-box feature: ==== [source,java] @@ -669,7 +669,7 @@ The `FixedSubscriberChannel` is an optimized variant of `DirectChannel`, which r Since the `MessageHandler` is a functional interface a simple lambda for the `handleMessage` method can be provided. When a message is sent to the `publishToHazelcastTopicChannel` it is just published onto the Hazelcast `ITopic`. The `com.hazelcast.topic.MessageListener` is a functional interface, too, hence a lambda to the `ITopic#addMessageListener` can be provided. -So, when ever in the Hazelcast the `fromHazelcastTopicChannel` is subscribed, it is going to consume a message sent to the mentioned `ITopic`. +So, a subscriber to the `fromHazelcastTopicChannel` will consume all messages sent to the mentioned `ITopic`. An `ExecutorChannel` can be supplied with an `IExecutorService`. For example, with respective configuration a cluster-wide singleton can be achieved: