Skip to content

Commit 12470a0

Browse files
authored
Merge pull request #314 from php-enqueue/kafka-set-offset
[kafka] add ability to set offset.
2 parents f4572cd + 8aa983c commit 12470a0

File tree

3 files changed

+107
-7
lines changed

3 files changed

+107
-7
lines changed

docs/transport/kafka.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The transport uses [Kafka](https://kafka.apache.org/) streaming platform as a MQ
88
* [Send message to queue](#send-message-to-queue)
99
* [Consume message](#consume-message)
1010
* [Serialize message](#serialize-message)
11+
* [Chnage offset](#change-offset)
1112

1213
## Installation
1314

@@ -84,6 +85,9 @@ $fooQueue = $psrContext->createQueue('foo');
8485

8586
$consumer = $psrContext->createConsumer($fooQueue);
8687

88+
// Enable async commit to gain better performance.
89+
//$consumer->setCommitAsync(true);
90+
8791
$message = $consumer->receive();
8892

8993
// process a message
@@ -115,4 +119,21 @@ class FooSerializer implements Serializer
115119
$psrContext->setSerializer(new FooSerializer());
116120
```
117121

122+
## Change offset
123+
124+
By default consumers starts from the beginning of the topic and updates the offset while you are processing messages.
125+
There is an ability to change the current offset.
126+
127+
```php
128+
<?php
129+
/** @var \Enqueue\RdKafka\RdKafkaContext $psrContext */
130+
131+
$fooQueue = $psrContext->createQueue('foo');
132+
133+
$consumer = $psrContext->createConsumer($fooQueue);
134+
$consumer->setOffset(123);
135+
136+
$message = $consumer->receive(2000);
137+
```
138+
118139
[back to index](index.md)

pkg/rdkafka/RdKafkaConsumer.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Interop\Queue\PsrConsumer;
77
use Interop\Queue\PsrMessage;
88
use RdKafka\KafkaConsumer;
9+
use RdKafka\TopicPartition;
910

1011
class RdKafkaConsumer implements PsrConsumer
1112
{
@@ -36,6 +37,11 @@ class RdKafkaConsumer implements PsrConsumer
3637
*/
3738
private $commitAsync;
3839

40+
/**
41+
* @var int|null
42+
*/
43+
private $offset;
44+
3945
/**
4046
* @param KafkaConsumer $consumer
4147
* @param RdKafkaContext $context
@@ -49,6 +55,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd
4955
$this->topic = $topic;
5056
$this->subscribed = false;
5157
$this->commitAsync = false;
58+
$this->offset = null;
5259

5360
$this->setSerializer($serializer);
5461
}
@@ -69,6 +76,15 @@ public function setCommitAsync($async)
6976
$this->commitAsync = (bool) $async;
7077
}
7178

79+
public function setOffset($offset)
80+
{
81+
if ($this->subscribed) {
82+
throw new \LogicException('The consumer has already subscribed.');
83+
}
84+
85+
$this->offset = $offset;
86+
}
87+
7288
/**
7389
* {@inheritdoc}
7490
*/
@@ -83,7 +99,11 @@ public function getQueue()
8399
public function receive($timeout = 0)
84100
{
85101
if (false == $this->subscribed) {
86-
$this->consumer->subscribe([$this->topic->getTopicName()]);
102+
$this->consumer->assign([new TopicPartition(
103+
$this->getQueue()->getQueueName(),
104+
$this->getQueue()->getPartition(),
105+
$this->offset
106+
)]);
87107

88108
$this->subscribed = true;
89109
}

pkg/rdkafka/Tests/RdKafkaConsumerTest.php

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
5050
$kafkaConsumer = $this->createKafkaConsumerMock();
5151
$kafkaConsumer
5252
->expects($this->once())
53-
->method('subscribe')
54-
->with(['dest'])
53+
->method('assign')
5554
;
5655
$kafkaConsumer
5756
->expects($this->once())
@@ -70,6 +69,36 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
7069
$this->assertNull($consumer->receive(1000));
7170
}
7271

72+
public function testShouldPassProperlyConfiguredTopicPartitionOnAssign()
73+
{
74+
$destination = new RdKafkaTopic('dest');
75+
76+
$kafkaMessage = new Message();
77+
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
78+
79+
$kafkaConsumer = $this->createKafkaConsumerMock();
80+
$kafkaConsumer
81+
->expects($this->once())
82+
->method('assign')
83+
;
84+
$kafkaConsumer
85+
->expects($this->any())
86+
->method('consume')
87+
->willReturn($kafkaMessage)
88+
;
89+
90+
$consumer = new RdKafkaConsumer(
91+
$kafkaConsumer,
92+
$this->createContextMock(),
93+
$destination,
94+
$this->createSerializerMock()
95+
);
96+
97+
$consumer->receive(1000);
98+
$consumer->receive(1000);
99+
$consumer->receive(1000);
100+
}
101+
73102
public function testShouldSubscribeOnFirstReceiveOnly()
74103
{
75104
$destination = new RdKafkaTopic('dest');
@@ -80,8 +109,7 @@ public function testShouldSubscribeOnFirstReceiveOnly()
80109
$kafkaConsumer = $this->createKafkaConsumerMock();
81110
$kafkaConsumer
82111
->expects($this->once())
83-
->method('subscribe')
84-
->with(['dest'])
112+
->method('assign')
85113
;
86114
$kafkaConsumer
87115
->expects($this->any())
@@ -101,6 +129,38 @@ public function testShouldSubscribeOnFirstReceiveOnly()
101129
$consumer->receive(1000);
102130
}
103131

132+
public function testThrowOnOffsetChangeAfterSubscribing()
133+
{
134+
$destination = new RdKafkaTopic('dest');
135+
136+
$kafkaMessage = new Message();
137+
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
138+
139+
$kafkaConsumer = $this->createKafkaConsumerMock();
140+
$kafkaConsumer
141+
->expects($this->once())
142+
->method('assign')
143+
;
144+
$kafkaConsumer
145+
->expects($this->any())
146+
->method('consume')
147+
->willReturn($kafkaMessage)
148+
;
149+
150+
$consumer = new RdKafkaConsumer(
151+
$kafkaConsumer,
152+
$this->createContextMock(),
153+
$destination,
154+
$this->createSerializerMock()
155+
);
156+
157+
$consumer->receive(1000);
158+
159+
$this->expectException(\LogicException::class);
160+
$this->expectExceptionMessage('The consumer has already subscribed.');
161+
$consumer->setOffset(123);
162+
}
163+
104164
public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
105165
{
106166
$destination = new RdKafkaTopic('dest');
@@ -114,8 +174,7 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
114174
$kafkaConsumer = $this->createKafkaConsumerMock();
115175
$kafkaConsumer
116176
->expects($this->once())
117-
->method('subscribe')
118-
->with(['dest'])
177+
->method('assign')
119178
;
120179
$kafkaConsumer
121180
->expects($this->once())

0 commit comments

Comments
 (0)