Skip to content

Commit e7baf3b

Browse files
committed
[amqp-lib] The context should allow to get the lib's channel.
fixes #146
1 parent bcf2485 commit e7baf3b

File tree

1 file changed

+20
-20
lines changed

1 file changed

+20
-20
lines changed

pkg/amqp-lib/AmqpContext.php

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -120,18 +120,18 @@ public function createConsumer(PsrDestination $destination)
120120
$queue = $this->createTemporaryQueue();
121121
$this->bind(new AmqpBind($destination, $queue, $queue->getQueueName()));
122122

123-
return new AmqpConsumer($this->getChannel(), $queue, $this->buffer, $this->config['receive_method']);
123+
return new AmqpConsumer($this->getLibChannel(), $queue, $this->buffer, $this->config['receive_method']);
124124
}
125125

126-
return new AmqpConsumer($this->getChannel(), $destination, $this->buffer, $this->config['receive_method']);
126+
return new AmqpConsumer($this->getLibChannel(), $destination, $this->buffer, $this->config['receive_method']);
127127
}
128128

129129
/**
130130
* @return AmqpProducer
131131
*/
132132
public function createProducer()
133133
{
134-
$producer = new AmqpProducer($this->getChannel(), $this);
134+
$producer = new AmqpProducer($this->getLibChannel(), $this);
135135
$producer->setDelayStrategy($this->delayStrategy);
136136

137137
return $producer;
@@ -142,7 +142,7 @@ public function createProducer()
142142
*/
143143
public function createTemporaryQueue()
144144
{
145-
list($name) = $this->getChannel()->queue_declare('', false, false, true, false);
145+
list($name) = $this->getLibChannel()->queue_declare('', false, false, true, false);
146146

147147
$queue = $this->createQueue($name);
148148
$queue->addFlag(InteropAmqpQueue::FLAG_EXCLUSIVE);
@@ -155,7 +155,7 @@ public function createTemporaryQueue()
155155
*/
156156
public function declareTopic(InteropAmqpTopic $topic)
157157
{
158-
$this->getChannel()->exchange_declare(
158+
$this->getLibChannel()->exchange_declare(
159159
$topic->getTopicName(),
160160
$topic->getType(),
161161
(bool) ($topic->getFlags() & InteropAmqpTopic::FLAG_PASSIVE),
@@ -172,7 +172,7 @@ public function declareTopic(InteropAmqpTopic $topic)
172172
*/
173173
public function deleteTopic(InteropAmqpTopic $topic)
174174
{
175-
$this->getChannel()->exchange_delete(
175+
$this->getLibChannel()->exchange_delete(
176176
$topic->getTopicName(),
177177
(bool) ($topic->getFlags() & InteropAmqpTopic::FLAG_IFUNUSED),
178178
(bool) ($topic->getFlags() & InteropAmqpTopic::FLAG_NOWAIT)
@@ -184,7 +184,7 @@ public function deleteTopic(InteropAmqpTopic $topic)
184184
*/
185185
public function declareQueue(InteropAmqpQueue $queue)
186186
{
187-
list(, $messageCount) = $this->getChannel()->queue_declare(
187+
list(, $messageCount) = $this->getLibChannel()->queue_declare(
188188
$queue->getQueueName(),
189189
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_PASSIVE),
190190
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_DURABLE),
@@ -202,7 +202,7 @@ public function declareQueue(InteropAmqpQueue $queue)
202202
*/
203203
public function deleteQueue(InteropAmqpQueue $queue)
204204
{
205-
$this->getChannel()->queue_delete(
205+
$this->getLibChannel()->queue_delete(
206206
$queue->getQueueName(),
207207
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_IFUNUSED),
208208
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_IFEMPTY),
@@ -215,7 +215,7 @@ public function deleteQueue(InteropAmqpQueue $queue)
215215
*/
216216
public function purgeQueue(InteropAmqpQueue $queue)
217217
{
218-
$this->getChannel()->queue_purge(
218+
$this->getLibChannel()->queue_purge(
219219
$queue->getQueueName(),
220220
(bool) ($queue->getFlags() & InteropAmqpQueue::FLAG_NOWAIT)
221221
);
@@ -232,7 +232,7 @@ public function bind(InteropAmqpBind $bind)
232232

233233
// bind exchange to exchange
234234
if ($bind->getSource() instanceof InteropAmqpTopic && $bind->getTarget() instanceof InteropAmqpTopic) {
235-
$this->getChannel()->exchange_bind(
235+
$this->getLibChannel()->exchange_bind(
236236
$bind->getTarget()->getTopicName(),
237237
$bind->getSource()->getTopicName(),
238238
$bind->getRoutingKey(),
@@ -241,7 +241,7 @@ public function bind(InteropAmqpBind $bind)
241241
);
242242
// bind queue to exchange
243243
} elseif ($bind->getSource() instanceof InteropAmqpQueue) {
244-
$this->getChannel()->queue_bind(
244+
$this->getLibChannel()->queue_bind(
245245
$bind->getSource()->getQueueName(),
246246
$bind->getTarget()->getTopicName(),
247247
$bind->getRoutingKey(),
@@ -250,7 +250,7 @@ public function bind(InteropAmqpBind $bind)
250250
);
251251
// bind exchange to queue
252252
} else {
253-
$this->getChannel()->queue_bind(
253+
$this->getLibChannel()->queue_bind(
254254
$bind->getTarget()->getQueueName(),
255255
$bind->getSource()->getTopicName(),
256256
$bind->getRoutingKey(),
@@ -271,7 +271,7 @@ public function unbind(InteropAmqpBind $bind)
271271

272272
// bind exchange to exchange
273273
if ($bind->getSource() instanceof InteropAmqpTopic && $bind->getTarget() instanceof InteropAmqpTopic) {
274-
$this->getChannel()->exchange_unbind(
274+
$this->getLibChannel()->exchange_unbind(
275275
$bind->getTarget()->getTopicName(),
276276
$bind->getSource()->getTopicName(),
277277
$bind->getRoutingKey(),
@@ -280,15 +280,15 @@ public function unbind(InteropAmqpBind $bind)
280280
);
281281
// bind queue to exchange
282282
} elseif ($bind->getSource() instanceof InteropAmqpQueue) {
283-
$this->getChannel()->queue_unbind(
283+
$this->getLibChannel()->queue_unbind(
284284
$bind->getSource()->getQueueName(),
285285
$bind->getTarget()->getTopicName(),
286286
$bind->getRoutingKey(),
287287
$bind->getArguments()
288288
);
289289
// bind exchange to queue
290290
} else {
291-
$this->getChannel()->queue_unbind(
291+
$this->getLibChannel()->queue_unbind(
292292
$bind->getTarget()->getQueueName(),
293293
$bind->getSource()->getTopicName(),
294294
$bind->getRoutingKey(),
@@ -309,7 +309,7 @@ public function close()
309309
*/
310310
public function setQos($prefetchSize, $prefetchCount, $global)
311311
{
312-
$this->getChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
312+
$this->getLibChannel()->basic_qos($prefetchSize, $prefetchCount, $global);
313313
}
314314

315315
/**
@@ -336,7 +336,7 @@ public function subscribe(InteropAmqpConsumer $consumer, callable $callback)
336336
}
337337
};
338338

339-
$consumerTag = $this->getChannel()->basic_consume(
339+
$consumerTag = $this->getLibChannel()->basic_consume(
340340
$consumer->getQueue()->getQueueName(),
341341
$consumer->getConsumerTag(),
342342
(bool) ($consumer->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
@@ -366,10 +366,10 @@ public function unsubscribe(InteropAmqpConsumer $consumer)
366366

367367
$consumerTag = $consumer->getConsumerTag();
368368

369-
$this->getChannel()->basic_cancel($consumerTag);
369+
$this->getLibChannel()->basic_cancel($consumerTag);
370370

371371
$consumer->setConsumerTag(null);
372-
unset($this->subscribers[$consumerTag], $this->getChannel()->callbacks[$consumerTag]);
372+
unset($this->subscribers[$consumerTag], $this->getLibChannel()->callbacks[$consumerTag]);
373373
}
374374

375375
/**
@@ -407,7 +407,7 @@ public function consume($timeout = 0)
407407
/**
408408
* @return AMQPChannel
409409
*/
410-
private function getChannel()
410+
public function getLibChannel()
411411
{
412412
if (null === $this->channel) {
413413
$this->channel = $this->connection->channel();

0 commit comments

Comments
 (0)