@@ -59,9 +59,34 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
59
59
->with (1000 )
60
60
->willReturn ($ kafkaMessage )
61
61
;
62
+
63
+ $ consumer = new RdKafkaConsumer (
64
+ $ kafkaConsumer ,
65
+ $ this ->createContextMock (),
66
+ $ destination ,
67
+ $ this ->createSerializerMock ()
68
+ );
69
+
70
+ $ this ->assertNull ($ consumer ->receive (1000 ));
71
+ }
72
+
73
+ public function testShouldSubscribeOnFirstReceiveOnly ()
74
+ {
75
+ $ destination = new RdKafkaTopic ('dest ' );
76
+
77
+ $ kafkaMessage = new Message ();
78
+ $ kafkaMessage ->err = RD_KAFKA_RESP_ERR__TIMED_OUT ;
79
+
80
+ $ kafkaConsumer = $ this ->createKafkaConsumerMock ();
62
81
$ kafkaConsumer
63
82
->expects ($ this ->once ())
64
- ->method ('unsubscribe ' )
83
+ ->method ('subscribe ' )
84
+ ->with (['dest ' ])
85
+ ;
86
+ $ kafkaConsumer
87
+ ->expects ($ this ->any ())
88
+ ->method ('consume ' )
89
+ ->willReturn ($ kafkaMessage )
65
90
;
66
91
67
92
$ consumer = new RdKafkaConsumer (
@@ -71,7 +96,9 @@ public function testShouldReceiveFromQueueAndReturnNullIfNoMessageInQueue()
71
96
$ this ->createSerializerMock ()
72
97
);
73
98
74
- $ this ->assertNull ($ consumer ->receive (1000 ));
99
+ $ consumer ->receive (1000 );
100
+ $ consumer ->receive (1000 );
101
+ $ consumer ->receive (1000 );
75
102
}
76
103
77
104
public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue ()
@@ -96,10 +123,6 @@ public function testShouldReceiveFromQueueAndReturnMessageIfMessageInQueue()
96
123
->with (1000 )
97
124
->willReturn ($ kafkaMessage )
98
125
;
99
- $ kafkaConsumer
100
- ->expects ($ this ->once ())
101
- ->method ('unsubscribe ' )
102
- ;
103
126
104
127
$ serializer = $ this ->createSerializerMock ();
105
128
$ serializer
0 commit comments