Skip to content

When configuring a HazelcastEventDrivenMessageProducer I'm not seeing the message actually dequeue from my hazelcast queue. #4001

Closed
@vsalvati

Description

@vsalvati

I'm trying to configure a mechanism using spring integration to consume from a hazelcast queue into my spring boot project running on Java 11

I'm using spring integration with the following dependencies in my pom.xml

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.integration</groupId>
  <artifactId>spring-integration-hazelcast</artifactId>
  <version>3.0.0</version>
</dependency>

No matter how I configure my HazelcastEventDrivenMessageProducer, MessageChannel, and QueueChannel I can't get my consumer code to run once per message and dequeue from my hazelcast queue. I am trying to read from an externalized queue in my external hazelcast cluster and dequeue the message once done.

Here is my configuration setup..

 @Bean
    HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer(@Qualifier("myQueue") IQueue<String> myQueue) {
        HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer = new HazelcastEventDrivenMessageProducer(myQueue);
        hazelcastEventDrivenMessageProducer.setOutputChannel(hazelcastQueueChannel());
        hazelcastEventDrivenMessageProducer.setCacheEventTypes("ADDED,REMOVED");
        hazelcastEventDrivenMessageProducer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE);
        return hazelcastEventDrivenMessageProducer;
    }
    
    @Bean
    IQueue<String> myQueue(@Qualifier("externalHazelcastInstance") HazelcastInstance hazelcastInstance) {
        return hazelcastInstance.getQueue("myQueue");
    }

    @Bean
    public PollableChannel hazelcastQueueChannel() {
        return new QueueChannel();
    }

I also have a consumer class that looks like this.

@Slf4j
@Component
public class QueueTaskConsumer {
    @ServiceActivator(inputChannel = "hazelcastQueueChannel")
    public void readFromTaskQueue(String message) {
        log.info("received the message inside QueueTaskConsumer " + message);
    }
}

To Reproduce

Configure your setup as the same above and try to add 10 messages(String) to a hazelcast queue

Expected behavior

I expect to see the output of "received the message inside QueueTaskConsumer" in my test 10 times and the queue to be empty.

Currently I have 17 messages in my queue and I have a test that writes 10 messages to my queue with my consumer set up this way. I can see the logging of these 10 messages with "received the message inside QueueTaskConsumer" but the other 17 remain and now I have 27 messages in my queue. It never seems to dequeue at all from my queue.

The consumer should have logged those 10 + the 17 that already existed and dequeued all 27.

I have also tried adding a consumer inside my class representing a Spring Boot app with @SpringBootTest. There I try to inject my channel itself and call receive and the item variable is always null when trying to get from the channel.

@SpringBootApplication(scanBasePackages = "com.mypackage")
public class ExternalHazelcastApp {
    public static void main(String[] args) {
        SpringApplication app = new SpringApplication(ExternalHazelcastApp.class);
        app.setDefaultProperties(Collections.singletonMap("server.port", "8085"));
        app.run(args);
    }

    @Component
    @Slf4j
    public static class QueueTaskConsumerInternal {
        @Autowired @Qualifier("hazelcastQueueChannel") public PollableChannel hazelcastQueueChannel;

        @EventListener(value = ApplicationReadyEvent.class)
        public void start() {
            log.info("CONSUMER HAS STARTED");
            while (true)
            {
                try {
                    log.info("INSIDE WHILE LOOP");
                    Message<?> item = hazelcastQueueChannel.receive(1_000); // I always get a null here and a npe on line below.
                    log.info("Consumed: " + item.getPayload());
                }
                catch (Exception e) {
                    log.info("exception " + e.getCause());
                    e.printStackTrace();
                }
            }
        }
    }

I have seen my queue actually dequeue messages when my queue setup is like the following below. With this though, I see consumption code run 50 times for one message and that cannot happen. The following is how i configured it.

@Bean
    IQueue<Message<?>> myQueue(@Qualifier("externalHazelcastInstance") HazelcastInstance hazelcastInstance) {
        return hazelcastInstance.getQueue(MY_TEST_QUEUE);
    }

    @Bean
    public MessageChannel hazelcastQueueChannel(@Qualifier("myQueue") IQueue<Message<?>> myQueue) {
        return new QueueChannel(myQueue);
    }
    
    @Bean
    HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer(@Qualifier("myQueue") IQueue<Message<?>> myQueue) {
        HazelcastEventDrivenMessageProducer hazelcastEventDrivenMessageProducer = new HazelcastEventDrivenMessageProducer(myQueue);
        hazelcastEventDrivenMessageProducer.setOutputChannel(hazelcastQueueChannel(myQueue));
        hazelcastEventDrivenMessageProducer.setCacheEventTypes("ADDED");
        hazelcastEventDrivenMessageProducer.setCacheListeningPolicy(CacheListeningPolicyType.SINGLE); 
        return hazelcastEventDrivenMessageProducer;
    }

In this case i get the message and then it wrapped again and again over and over in a class called GenericMessage in my output. I abandoned this approach for now.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions