Skip to content

Commit 04d4530

Browse files
committed
[kafka] add ability to set offset.
1 parent 9d6d9ff commit 04d4530

File tree

1 file changed

+21
-1
lines changed

1 file changed

+21
-1
lines changed

pkg/rdkafka/RdKafkaConsumer.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Interop\Queue\PsrConsumer;
77
use Interop\Queue\PsrMessage;
88
use RdKafka\KafkaConsumer;
9+
use RdKafka\TopicPartition;
910

1011
class RdKafkaConsumer implements PsrConsumer
1112
{
@@ -36,6 +37,11 @@ class RdKafkaConsumer implements PsrConsumer
3637
*/
3738
private $commitAsync;
3839

40+
/**
41+
* @var int|null
42+
*/
43+
private $offset;
44+
3945
/**
4046
* @param KafkaConsumer $consumer
4147
* @param RdKafkaContext $context
@@ -49,6 +55,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd
4955
$this->topic = $topic;
5056
$this->subscribed = false;
5157
$this->commitAsync = false;
58+
$this->offset = null;
5259

5360
$this->setSerializer($serializer);
5461
}
@@ -69,6 +76,15 @@ public function setCommitAsync($async)
6976
$this->commitAsync = (bool) $async;
7077
}
7178

79+
public function setOffset($offset)
80+
{
81+
if ($this->subscribed) {
82+
throw new \LogicException('The consumer has already subscribed.');
83+
}
84+
85+
$this->offset = $offset;
86+
}
87+
7288
/**
7389
* {@inheritdoc}
7490
*/
@@ -83,7 +99,11 @@ public function getQueue()
8399
public function receive($timeout = 0)
84100
{
85101
if (false == $this->subscribed) {
86-
$this->consumer->subscribe([$this->topic->getTopicName()]);
102+
$this->consumer->assign([new TopicPartition(
103+
$this->getQueue()->getQueueName(),
104+
$this->getQueue()->getPartition(),
105+
$this->offset
106+
)]);
87107

88108
$this->subscribed = true;
89109
}

0 commit comments

Comments
 (0)