Description
I have started to use Symfony Messenger component for inter-application communications and stumbled upon a minor issue with Kafka queue implementation. I'm not sure that core issue is part of either Symfony integration or Enqueue, but I've looked over source code and it all points to wrong offset selection in enqueue/rdkafka
, so I wanted to ask for advice and / or contribute.
For my example, I've created a simple dispatch:
$bus->dispatch(new DisplayNotification('test'));
(obviously DisplayNotification is my custom message)
I have created a configuration containing:
enqueue:
async_events:
enabled: true
# if you'd like to send send messages onTerminate use spool_producer (it makes response time even lesser):
# spool_producer: true
transport:
# default: '%env(ENQUEUE_DSN)%'
default: rdkafka
rdkafka:
global:
group.id: app_name
offset.store.method: file
auto.commit.interval.ms: 10
metadata.broker.list: '192.168.56.236:9092'
topic:
offset.store.method: file
offset.store.sync.interval.ms: 60
client: ~
After dispatching message a couple of times, I have started consuming them:
However, upon restarting, I have noticed that messages that were supposed to be already handled are popping up again. So I've tried other implementation: enqueue/fs
- and sure enough messages were handled and removed afterwards.
Then, I've tried calling
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic messages --from-beginning --group app_name
as Kafka quickstart suggests (just added --group app_name
). This showed me that offsets are stored on Kafka, just PHP process keeps ignoring it.
After some looking, I've noticed that RdKafkaConsumer
and in particular RdKafkaConsumer::receive
method are responsible for managing offset
value. I've noticed that __construct
method always sets $this->offset
to null
, and only other place where offset is actually set is in tests.
Then, I've looked into receive
method itself. Excerpt of current code below:
public function receive($timeout = 0)
{
if (false == $this->subscribed) {
$this->consumer->assign([new TopicPartition(
$this->getQueue()->getQueueName(),
$this->getQueue()->getPartition(),
$this->offset
)]);
$this->subscribed = true;
}
$message = null;
if ($timeout > 0) {
$message = $this->doReceive($timeout);
} else {
while (true) {
if ($message = $this->doReceive(500)) {
break;
}
}
}
return $message;
}
I've replaced $this->consumer->assign()
call with:
$this->consumer->subscribe([$this->getQueue()->getQueueName()]);
which, according to https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka-kafkaconsumer.subscribe.html, should allow library to automatically select partition for this consumer (am I right?).
Am I correct in thinking that this is a bug, not some sort of misconfiguration on my part? I'm not really adept in Kafka yet.
Is this solution acceptable, or will it cause issues I'm not able to see yet?