@@ -50,8 +50,7 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
50
50
$ kafkaConsumer = $ this ->createKafkaConsumerMock ();
51
51
$ kafkaConsumer
52
52
->expects ($ this ->once ())
53
- ->method ('subscribe ' )
54
- ->with (['dest ' ])
53
+ ->method ('assign ' )
55
54
;
56
55
$ kafkaConsumer
57
56
->expects ($ this ->once ())
@@ -70,6 +69,36 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
70
69
$ this ->assertNull ($ consumer ->receive (1000 ));
71
70
}
72
71
72
+ public function testShouldPassProperlyConfiguredTopicPartitionOnAssign ()
73
+ {
74
+ $ destination = new RdKafkaTopic ('dest ' );
75
+
76
+ $ kafkaMessage = new Message ();
77
+ $ kafkaMessage ->err = RD_KAFKA_RESP_ERR__TIMED_OUT ;
78
+
79
+ $ kafkaConsumer = $ this ->createKafkaConsumerMock ();
80
+ $ kafkaConsumer
81
+ ->expects ($ this ->once ())
82
+ ->method ('assign ' )
83
+ ;
84
+ $ kafkaConsumer
85
+ ->expects ($ this ->any ())
86
+ ->method ('consume ' )
87
+ ->willReturn ($ kafkaMessage )
88
+ ;
89
+
90
+ $ consumer = new RdKafkaConsumer (
91
+ $ kafkaConsumer ,
92
+ $ this ->createContextMock (),
93
+ $ destination ,
94
+ $ this ->createSerializerMock ()
95
+ );
96
+
97
+ $ consumer ->receive (1000 );
98
+ $ consumer ->receive (1000 );
99
+ $ consumer ->receive (1000 );
100
+ }
101
+
73
102
public function testShouldSubscribeOnFirstReceiveOnly ()
74
103
{
75
104
$ destination = new RdKafkaTopic ('dest ' );
@@ -80,8 +109,7 @@ public function testShouldSubscribeOnFirstReceiveOnly()
80
109
$ kafkaConsumer = $ this ->createKafkaConsumerMock ();
81
110
$ kafkaConsumer
82
111
->expects ($ this ->once ())
83
- ->method ('subscribe ' )
84
- ->with (['dest ' ])
112
+ ->method ('assign ' )
85
113
;
86
114
$ kafkaConsumer
87
115
->expects ($ this ->any ())
@@ -101,6 +129,38 @@ public function testShouldSubscribeOnFirstReceiveOnly()
101
129
$ consumer ->receive (1000 );
102
130
}
103
131
132
+ public function testThrowOnOffsetChangeAfterSubscribing ()
133
+ {
134
+ $ destination = new RdKafkaTopic ('dest ' );
135
+
136
+ $ kafkaMessage = new Message ();
137
+ $ kafkaMessage ->err = RD_KAFKA_RESP_ERR__TIMED_OUT ;
138
+
139
+ $ kafkaConsumer = $ this ->createKafkaConsumerMock ();
140
+ $ kafkaConsumer
141
+ ->expects ($ this ->once ())
142
+ ->method ('assign ' )
143
+ ;
144
+ $ kafkaConsumer
145
+ ->expects ($ this ->any ())
146
+ ->method ('consume ' )
147
+ ->willReturn ($ kafkaMessage )
148
+ ;
149
+
150
+ $ consumer = new RdKafkaConsumer (
151
+ $ kafkaConsumer ,
152
+ $ this ->createContextMock (),
153
+ $ destination ,
154
+ $ this ->createSerializerMock ()
155
+ );
156
+
157
+ $ consumer ->receive (1000 );
158
+
159
+ $ this ->expectException (\LogicException::class);
160
+ $ this ->expectExceptionMessage ('The consumer has already subscribed. ' );
161
+ $ consumer ->setOffset (123 );
162
+ }
163
+
104
164
public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue ()
105
165
{
106
166
$ destination = new RdKafkaTopic ('dest ' );
@@ -114,8 +174,7 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
114
174
$ kafkaConsumer = $ this ->createKafkaConsumerMock ();
115
175
$ kafkaConsumer
116
176
->expects ($ this ->once ())
117
- ->method ('subscribe ' )
118
- ->with (['dest ' ])
177
+ ->method ('assign ' )
119
178
;
120
179
$ kafkaConsumer
121
180
->expects ($ this ->once ())
0 commit comments