6
6
use Interop \Queue \PsrConsumer ;
7
7
use Interop \Queue \PsrMessage ;
8
8
use RdKafka \KafkaConsumer ;
9
+ use RdKafka \TopicPartition ;
9
10
10
11
class RdKafkaConsumer implements PsrConsumer
11
12
{
@@ -36,6 +37,11 @@ class RdKafkaConsumer implements PsrConsumer
36
37
*/
37
38
private $ commitAsync ;
38
39
40
+ /**
41
+ * @var int|null
42
+ */
43
+ private $ offset ;
44
+
39
45
/**
40
46
* @param KafkaConsumer $consumer
41
47
* @param RdKafkaContext $context
@@ -49,6 +55,7 @@ public function __construct(KafkaConsumer $consumer, RdKafkaContext $context, Rd
49
55
$ this ->topic = $ topic ;
50
56
$ this ->subscribed = false ;
51
57
$ this ->commitAsync = false ;
58
+ $ this ->offset = null ;
52
59
53
60
$ this ->setSerializer ($ serializer );
54
61
}
@@ -69,6 +76,15 @@ public function setCommitAsync($async)
69
76
$ this ->commitAsync = (bool ) $ async ;
70
77
}
71
78
79
+ public function setOffset ($ offset )
80
+ {
81
+ if ($ this ->subscribed ) {
82
+ throw new \LogicException ('The consumer has already subscribed. ' );
83
+ }
84
+
85
+ $ this ->offset = $ offset ;
86
+ }
87
+
72
88
/**
73
89
* {@inheritdoc}
74
90
*/
@@ -83,7 +99,11 @@ public function getQueue()
83
99
public function receive ($ timeout = 0 )
84
100
{
85
101
if (false == $ this ->subscribed ) {
86
- $ this ->consumer ->subscribe ([$ this ->topic ->getTopicName ()]);
102
+ $ this ->consumer ->assign ([new TopicPartition (
103
+ $ this ->getQueue ()->getQueueName (),
104
+ $ this ->getQueue ()->getPartition (),
105
+ $ this ->offset
106
+ )]);
87
107
88
108
$ this ->subscribed = true ;
89
109
}
0 commit comments