Skip to content

Commit d2fa178

Browse files
authored
Allow either to use assign or subscribe
With `assign` it is not possible to use the rebalancing from Kafka. So if no offset is set we use `subscribe` and rebalancing is possible and otherwise if a offset is set we are using assign.
1 parent c52ba7c commit d2fa178

File tree

1 file changed

+10
-8
lines changed

1 file changed

+10
-8
lines changed

pkg/rdkafka/RdKafkaConsumer.php

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd
5555
$this->topic = $topic;
5656
$this->subscribed = false;
5757
$this->commitAsync = false;
58-
$this->offset = null;
59-
58+
6059
$this->setSerializer($serializer);
6160
}
6261

@@ -99,12 +98,15 @@ public function getQueue()
9998
public function receive($timeout = 0)
10099
{
101100
if (false == $this->subscribed) {
102-
$this->consumer->assign([new TopicPartition(
103-
$this->getQueue()->getQueueName(),
104-
$this->getQueue()->getPartition(),
105-
$this->offset
106-
)]);
107-
101+
if ($this->offset === null) {
102+
$this->consumer->subscribe([$this->getQueue()->getQueueName()]);
103+
} else {
104+
$this->consumer->assign([new TopicPartition(
105+
$this->getQueue()->getQueueName(),
106+
$this->getQueue()->getPartition(),
107+
$this->offset
108+
), ]);
109+
}
108110
$this->subscribed = true;
109111
}
110112

0 commit comments

Comments
 (0)