Skip to content

[RdKafka][Symfony] Kafka always consumes messages from the beginning #570

Closed
@Steveb-p

Description

@Steveb-p

I have started to use Symfony Messenger component for inter-application communications and stumbled upon a minor issue with Kafka queue implementation. I'm not sure that core issue is part of either Symfony integration or Enqueue, but I've looked over source code and it all points to wrong offset selection in enqueue/rdkafka, so I wanted to ask for advice and / or contribute.

For my example, I've created a simple dispatch:

$bus->dispatch(new DisplayNotification('test'));

(obviously DisplayNotification is my custom message)

I have created a configuration containing:

enqueue:
    async_events:
        enabled: true
        # if you'd like to send send messages onTerminate use spool_producer (it makes response time even lesser):
        # spool_producer: true
    transport:
#        default: '%env(ENQUEUE_DSN)%'
        default: rdkafka
        rdkafka:
            global:
                group.id: app_name
                offset.store.method: file
                auto.commit.interval.ms: 10
                metadata.broker.list: '192.168.56.236:9092'
            topic:
                offset.store.method: file
                offset.store.sync.interval.ms: 60
    client: ~

After dispatching message a couple of times, I have started consuming them:
image

However, upon restarting, I have noticed that messages that were supposed to be already handled are popping up again. So I've tried other implementation: enqueue/fs - and sure enough messages were handled and removed afterwards.

Then, I've tried calling

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic messages --from-beginning --group app_name

as Kafka quickstart suggests (just added --group app_name). This showed me that offsets are stored on Kafka, just PHP process keeps ignoring it.

After some looking, I've noticed that RdKafkaConsumer and in particular RdKafkaConsumer::receive method are responsible for managing offset value. I've noticed that __construct method always sets $this->offset to null, and only other place where offset is actually set is in tests.

Then, I've looked into receive method itself. Excerpt of current code below:

public function receive($timeout = 0)
{
    if (false == $this->subscribed) {
        $this->consumer->assign([new TopicPartition(
            $this->getQueue()->getQueueName(),
            $this->getQueue()->getPartition(),
            $this->offset
        )]);

        $this->subscribed = true;
    }

    $message = null;
    if ($timeout > 0) {
        $message = $this->doReceive($timeout);
    } else {
        while (true) {
            if ($message = $this->doReceive(500)) {
                break;
            }
        }
    }

    return $message;
}

I've replaced $this->consumer->assign() call with:

$this->consumer->subscribe([$this->getQueue()->getQueueName()]);

which, according to https://arnaud-lb.github.io/php-rdkafka/phpdoc/rdkafka-kafkaconsumer.subscribe.html, should allow library to automatically select partition for this consumer (am I right?).

Am I correct in thinking that this is a bug, not some sort of misconfiguration on my part? I'm not really adept in Kafka yet.

Is this solution acceptable, or will it cause issues I'm not able to see yet?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions