Skip to content

Commit 0af20ba

Browse files
committed
[amqp-ext] Restore default read timeout inside consume callback.
1 parent eca49cf commit 0af20ba

File tree

1 file changed

+18
-11
lines changed

1 file changed

+18
-11
lines changed

pkg/amqp-ext/AmqpContext.php

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -360,17 +360,24 @@ public function consume($timeout = 0)
360360

361361
$extQueue = new \AMQPQueue($this->getExtChannel());
362362
$extQueue->setName($consumer->getQueue()->getQueueName());
363-
$extQueue->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) {
364-
$message = $this->convertMessage($extEnvelope);
365-
$message->setConsumerTag($q->getConsumerTag());
366-
367-
/**
368-
* @var AmqpConsumer
369-
* @var callable $callback
370-
*/
371-
list($consumer, $callback) = $this->subscribers[$q->getConsumerTag()];
372-
373-
return call_user_func($callback, $message, $consumer);
363+
$extQueue->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use ($originalTimeout, $extConnection) {
364+
$consumeTimeout = $extConnection->getReadTimeout();
365+
try {
366+
$extConnection->setReadTimeout($originalTimeout);
367+
368+
$message = $this->convertMessage($extEnvelope);
369+
$message->setConsumerTag($q->getConsumerTag());
370+
371+
/**
372+
* @var AmqpConsumer
373+
* @var callable $callback
374+
*/
375+
list($consumer, $callback) = $this->subscribers[$q->getConsumerTag()];
376+
377+
return call_user_func($callback, $message, $consumer);
378+
} finally {
379+
$extConnection->setReadTimeout($consumeTimeout);
380+
}
374381
}, AMQP_JUST_CONSUME);
375382
} catch (\AMQPQueueException $e) {
376383
if ('Consumer timeout exceed' == $e->getMessage()) {

0 commit comments

Comments
 (0)