Description
I'm trying to configure a mechanism using spring integration to consume from a hazelcast queue into my spring boot project running on Java 11
I'm using spring integration with the following dependencies in my pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-hazelcast</artifactId>
<version>3.0.0</version>
</dependency>
No matter how I configure my HazelcastEventDrivenMessageProducer, MessageChannel, and QueueChannel I can't get my consumer code to run once per message and dequeue from my hazelcast queue. I am trying to read from an externalized queue in my external hazelcast cluster and dequeue the message once done.
Here is my configuration setup..
@Bean
HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer(@Qualifier("myQueue") IQueue<String> myQueue) {
HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer = new HazelcastEventDrivenMessageProducer(myQueue);
hazelcastEventDrivenMessageProducer.setOutputChannel(hazelcastQueueChannel());
hazelcastEventDrivenMessageProducer.setCacheEventTypes("ADDED,REMOVED");
hazelcastEventDrivenMessageProducer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
return hazelcastEventDrivenMessageProducer;
}
@Bean
IQueue<String> myQueue(@Qualifier("externalHazelcastInstance") HazelcastInstance hazelcastInstance) {
return hazelcastInstance.getQueue("myQueue");
}
@Bean
public PollableChannel hazelcastQueueChannel() {
return new QueueChannel();
}
I also have a consumer class that looks like this.
@Slf4j
@Component
public class QueueTaskConsumer {
@ServiceActivator(inputChannel = "hazelcastQueueChannel")
public void readFromTaskQueue(String message) {
log.info("received the message inside QueueTaskConsumer " + message);
}
}
To Reproduce
Configure your setup as the same above and try to add 10 messages(String) to a hazelcast queue
Expected behavior
I expect to see the output of "received the message inside QueueTaskConsumer" in my test 10 times and the queue to be empty.
Currently I have 17 messages in my queue and I have a test that writes 10 messages to my queue with my consumer set up this way. I can see the logging of these 10 messages with "received the message inside QueueTaskConsumer" but the other 17 remain and now I have 27 messages in my queue. It never seems to dequeue at all from my queue.
The consumer should have logged those 10 + the 17 that already existed and dequeued all 27.
I have also tried adding a consumer inside my class representing a Spring Boot app with @SpringBootTest. There I try to inject my channel itself and call receive and the item variable is always null when trying to get from the channel.
@SpringBootApplication(scanBasePackages = "com.mypackage")
public class ExternalHazelcastApp {
public static void main(String[] args) {
SpringApplication app = new SpringApplication(ExternalHazelcastApp.class);
app.setDefaultProperties(Collections.singletonMap("server.port", "8085"));
app.run(args);
}
@Component
@Slf4j
public static class QueueTaskConsumerInternal {
@Autowired @Qualifier("hazelcastQueueChannel") public PollableChannel hazelcastQueueChannel;
@EventListener(value = ApplicationReadyEvent.class)
public void start() {
log.info("CONSUMER HAS STARTED");
while (true)
{
try {
log.info("INSIDE WHILE LOOP");
Message<?> item = hazelcastQueueChannel.receive(1_000); // I always get a null here and a npe on line below.
log.info("Consumed: " + item.getPayload());
}
catch (Exception e) {
log.info("exception " + e.getCause());
e.printStackTrace();
}
}
}
}
I have seen my queue actually dequeue messages when my queue setup is like the following below. With this though, I see consumption code run 50 times for one message and that cannot happen. The following is how i configured it.
@Bean
IQueue<Message<?>> myQueue(@Qualifier("externalHazelcastInstance") HazelcastInstance hazelcastInstance) {
return hazelcastInstance.getQueue(MY_TEST_QUEUE);
}
@Bean
public MessageChannel hazelcastQueueChannel(@Qualifier("myQueue") IQueue<Message<?>> myQueue) {
return new QueueChannel(myQueue);
}
@Bean
HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer(@Qualifier("myQueue") IQueue<Message<?>> myQueue) {
HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer = new HazelcastEventDrivenMessageProducer(myQueue);
hazelcastEventDrivenMessageProducer.setOutputChannel(hazelcastQueueChannel(myQueue));
hazelcastEventDrivenMessageProducer.setCacheEventTypes("ADDED");
hazelcastEventDrivenMessageProducer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
return hazelcastEventDrivenMessageProducer;
}
In this case i get the message and then it wrapped again and again over and over in a class called GenericMessage in my output. I abandoned this approach for now.