Skip to content

Commit 9521081

Browse files
committed
[kafka] change offset.
1 parent 323a0a8 commit 9521081

File tree

1 file changed

+65
-6
lines changed

1 file changed

+65
-6
lines changed

pkg/rdkafka/Tests/RdKafkaConsumerTest.php

Lines changed: 65 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,7 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
5050
$kafkaConsumer = $this->createKafkaConsumerMock();
5151
$kafkaConsumer
5252
->expects($this->once())
53-
->method('subscribe')
54-
->with(['dest'])
53+
->method('assign')
5554
;
5655
$kafkaConsumer
5756
->expects($this->once())
@@ -70,6 +69,36 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
7069
$this->assertNull($consumer->receive(1000));
7170
}
7271

72+
public function testShouldPassProperlyConfiguredTopicPartitionOnAssign()
73+
{
74+
$destination = new RdKafkaTopic('dest');
75+
76+
$kafkaMessage = new Message();
77+
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
78+
79+
$kafkaConsumer = $this->createKafkaConsumerMock();
80+
$kafkaConsumer
81+
->expects($this->once())
82+
->method('assign')
83+
;
84+
$kafkaConsumer
85+
->expects($this->any())
86+
->method('consume')
87+
->willReturn($kafkaMessage)
88+
;
89+
90+
$consumer = new RdKafkaConsumer(
91+
$kafkaConsumer,
92+
$this->createContextMock(),
93+
$destination,
94+
$this->createSerializerMock()
95+
);
96+
97+
$consumer->receive(1000);
98+
$consumer->receive(1000);
99+
$consumer->receive(1000);
100+
}
101+
73102
public function testShouldSubscribeOnFirstReceiveOnly()
74103
{
75104
$destination = new RdKafkaTopic('dest');
@@ -80,8 +109,7 @@ public function testShouldSubscribeOnFirstReceiveOnly()
80109
$kafkaConsumer = $this->createKafkaConsumerMock();
81110
$kafkaConsumer
82111
->expects($this->once())
83-
->method('subscribe')
84-
->with(['dest'])
112+
->method('assign')
85113
;
86114
$kafkaConsumer
87115
->expects($this->any())
@@ -101,6 +129,38 @@ public function testShouldSubscribeOnFirstReceiveOnly()
101129
$consumer->receive(1000);
102130
}
103131

132+
public function testThrowOnOffsetChangeAfterSubscribing()
133+
{
134+
$destination = new RdKafkaTopic('dest');
135+
136+
$kafkaMessage = new Message();
137+
$kafkaMessage->err = RD_KAFKA_RESP_ERR__TIMED_OUT;
138+
139+
$kafkaConsumer = $this->createKafkaConsumerMock();
140+
$kafkaConsumer
141+
->expects($this->once())
142+
->method('assign')
143+
;
144+
$kafkaConsumer
145+
->expects($this->any())
146+
->method('consume')
147+
->willReturn($kafkaMessage)
148+
;
149+
150+
$consumer = new RdKafkaConsumer(
151+
$kafkaConsumer,
152+
$this->createContextMock(),
153+
$destination,
154+
$this->createSerializerMock()
155+
);
156+
157+
$consumer->receive(1000);
158+
159+
$this->expectException(\LogicException::class);
160+
$this->expectExceptionMessage('The consumer has already subscribed.');
161+
$consumer->setOffset(123);
162+
}
163+
104164
public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
105165
{
106166
$destination = new RdKafkaTopic('dest');
@@ -114,8 +174,7 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
114174
$kafkaConsumer = $this->createKafkaConsumerMock();
115175
$kafkaConsumer
116176
->expects($this->once())
117-
->method('subscribe')
118-
->with(['dest'])
177+
->method('assign')
119178
;
120179
$kafkaConsumer
121180
->expects($this->once())

0 commit comments

Comments
 (0)