File tree Expand file tree Collapse file tree 2 files changed +20
-4
lines changed Expand file tree Collapse file tree 2 files changed +20
-4
lines changed Original file line number Diff line number Diff line change @@ -82,7 +82,11 @@ public function getQueue()
82
82
*/
83
83
public function receive ($ timeout = 0 )
84
84
{
85
- $ this ->consumer ->subscribe ([$ this ->topic ->getTopicName ()]);
85
+ if (false == $ this ->subscribed ) {
86
+ $ this ->consumer ->subscribe ([$ this ->topic ->getTopicName ()]);
87
+
88
+ $ this ->subscribed = true ;
89
+ }
86
90
87
91
$ message = null ;
88
92
if ($ timeout > 0 ) {
@@ -95,8 +99,6 @@ public function receive($timeout = 0)
95
99
}
96
100
}
97
101
98
- $ this ->consumer ->unsubscribe ();
99
-
100
102
return $ message ;
101
103
}
102
104
Original file line number Diff line number Diff line change @@ -29,12 +29,18 @@ class RdKafkaContext implements PsrContext
29
29
*/
30
30
private $ producer ;
31
31
32
+ /**
33
+ * @var KafkaConsumer[]
34
+ */
35
+ private $ kafkaConsumers ;
36
+
32
37
/**
33
38
* @param array $config
34
39
*/
35
40
public function __construct (array $ config )
36
41
{
37
42
$ this ->config = $ config ;
43
+ $ this ->kafkaConsumers = [];
38
44
39
45
$ this ->setSerializer (new JsonSerializer ());
40
46
}
@@ -94,8 +100,10 @@ public function createConsumer(PsrDestination $destination)
94
100
{
95
101
InvalidDestinationException::assertDestinationInstanceOf ($ destination , RdKafkaTopic::class);
96
102
103
+ $ this ->kafkaConsumers [] = $ kafkaConsumer = new KafkaConsumer ($ this ->getConf ());
104
+
97
105
$ consumer = new RdKafkaConsumer (
98
- new KafkaConsumer ( $ this -> getConf ()) ,
106
+ $ kafkaConsumer ,
99
107
$ this ,
100
108
$ destination ,
101
109
$ this ->getSerializer ()
@@ -113,6 +121,12 @@ public function createConsumer(PsrDestination $destination)
113
121
*/
114
122
public function close ()
115
123
{
124
+ $ kafkaConsumers = $ this ->kafkaConsumers ;
125
+ $ this ->kafkaConsumers = [];
126
+
127
+ foreach ($ kafkaConsumers as $ kafkaConsumer ) {
128
+ $ kafkaConsumer ->unsubscribe ();
129
+ }
116
130
}
117
131
118
132
/**
You can’t perform that action at this time.
0 commit comments