diff --git a/pkg/enqueue/Consumption/QueueConsumer.php b/pkg/enqueue/Consumption/QueueConsumer.php index bea21e561..49d0d0660 100644 --- a/pkg/enqueue/Consumption/QueueConsumer.php +++ b/pkg/enqueue/Consumption/QueueConsumer.php @@ -168,39 +168,47 @@ public function consume(ExtensionInterface $runtimeExtension = null) $logger = $context->getLogger() ?: new NullLogger(); $logger->info('Start consuming'); - while (true) { - try { - if ($this->psrContext instanceof AmqpContext) { - $callback = function (AmqpMessage $message, AmqpConsumer $consumer) use ($extension, $logger, &$context) { - $currentProcessor = null; + if ($this->psrContext instanceof AmqpContext) { + $callback = function (AmqpMessage $message, AmqpConsumer $consumer) use ($extension, $logger, &$context) { + $currentProcessor = null; + + /** @var PsrQueue $queue */ + foreach ($this->boundProcessors as list($queue, $processor)) { + if ($queue->getQueueName() === $consumer->getQueue()->getQueueName()) { + $currentProcessor = $processor; + } + } - /** @var PsrQueue $queue */ - foreach ($this->boundProcessors as list($queue, $processor)) { - if ($queue->getQueueName() === $consumer->getQueue()->getQueueName()) { - $currentProcessor = $processor; - } - } + if (false == $currentProcessor) { + throw new \LogicException(sprintf('The processor for the queue "%s" could not be found.', $consumer->getQueue()->getQueueName())); + } - if (false == $currentProcessor) { - throw new \LogicException(sprintf('The processor for the queue "%s" could not be found.', $consumer->getQueue()->getQueueName())); - } + $context = new Context($this->psrContext); + $context->setLogger($logger); + $context->setPsrQueue($consumer->getQueue()); + $context->setPsrConsumer($consumer); + $context->setPsrProcessor($currentProcessor); + $context->setPsrMessage($message); - $context = new Context($this->psrContext); - $context->setLogger($logger); - $context->setPsrQueue($consumer->getQueue()); - $context->setPsrConsumer($consumer); - $context->setPsrProcessor($currentProcessor); - $context->setPsrMessage($message); + $this->doConsume($extension, $context); - $this->doConsume($extension, $context); + return true; + }; - return true; - }; + foreach ($consumers as $consumer) { + /* @var AmqpConsumer $consumer */ - foreach ($consumers as $consumer) { - /* @var AmqpConsumer $consumer */ + $this->psrContext->subscribe($consumer, $callback); + } + } - $this->psrContext->subscribe($consumer, $callback); + while (true) { + try { + if ($this->psrContext instanceof AmqpContext) { + $extension->onBeforeReceive($context); + + if ($context->isExecutionInterrupted()) { + throw new ConsumptionInterruptedException(); } $this->psrContext->consume($this->receiveTimeout); @@ -266,16 +274,14 @@ protected function doConsume(ExtensionInterface $extension, Context $context) $consumer = $context->getPsrConsumer(); $logger = $context->getLogger(); - if (false == $context->getPsrMessage() instanceof AmqpContext) { - $extension->onBeforeReceive($context); - } - if ($context->isExecutionInterrupted()) { throw new ConsumptionInterruptedException(); } $message = $context->getPsrMessage(); if (false == $message) { + $extension->onBeforeReceive($context); + if ($message = $consumer->receive($this->receiveTimeout)) { $context->setPsrMessage($message); } @@ -312,10 +318,6 @@ protected function doConsume(ExtensionInterface $extension, Context $context) $logger->info(sprintf('Message processed: %s', $context->getResult())); $extension->onPostReceived($context); - - if ($context->getPsrMessage() instanceof AmqpContext) { - $extension->onBeforeReceive($context); - } } else { usleep($this->idleTimeout * 1000); $extension->onIdle($context);