Description
In what version(s) of Spring Integration are you seeing this issue?
Tested on: spring boot 3.1.1 and Spring Integration 6.1.1
Also tested on: spring boot 2.7.4 and Spring Integration 5.5.15
Same behaviour on both versions.
Describe the bug
Rabbit MQ provides Single Active Consumers and Exclusive consumers to maintain message order given multiple consumers on a single queue. These maintain order by only allowing a single consumer to process messages at any point in time.
Spring Integration AMQP causes messages to be processed in parallel (and out of order) during shutdown of one instance.
Given:
- Consumer A (active consumer)
- Consumer B (waiting consumer)
- 20 Messages in RMQ (M1 to M20)
- prefetch = 5 (bug happens with any prefetch amount)
- Consumer acknowledgement enabled.
Consumer A "prefetches" M1 to M5 locally.
While processing M1, Consumer A is instructed to shutdown gracefully (kill -INT <process_id>
or similar). The Consumer A's consumer (not channel) is unregistered from RabbitMQ but continues to process M1 to M5, at the same time Consumer B is set to active and "prefetches" and processes M6 to M10. This results in (M1 to M5) and (M6 to M10) to be processed in parallel.
Note if Consumer A fails to process M1 to M5, these will be re-queued in RMQ, and consumed by Consumer B, AFTER M6 to M10 have been processed.
I've attempted to replicate this bug using RabbitMQ client directly and using Spring AMQP (not integration), but was unable to.
To Reproduce
See also provided github sample.
- Register two consumers (on two difference processes) on the same queue.
- Block consumption of messages on Consumer A (using single thread breakpoint or Thread.sleep)
- Note: Some IDEs disable breakpoints if process is instructed to stop from within the IDE 😭
- Publish a number of messages to queue (more than prefetch)
- signal Consumer A to shutdown gracefully
- Watch as Consumer B starts consuming new messages while Consumer A is still processing its messages.
Expected behavior
When using exclusive consumers or Single Active Consumer, at no point should multiple consumer be processing messages at the same time.
On shutdown of active consumer application, waiting consumer cannot become active until prior consumer has processed all of its unacked messages.
Sample
Sample is provided here. Due to the need for multiple instances, its README.md will guide you on how to use.
This was also tested using Spring-AMQP library (here) and using RabbitMQ's client (here) directly. Bug could not be replicated using these libraries.