diff --git a/pkg/enqueue/Consumption/QueueConsumer.php b/pkg/enqueue/Consumption/QueueConsumer.php index 62a710350..4e59abcd8 100644 --- a/pkg/enqueue/Consumption/QueueConsumer.php +++ b/pkg/enqueue/Consumption/QueueConsumer.php @@ -263,7 +263,9 @@ protected function doConsume(ExtensionInterface $extension, Context $context) $consumer = $context->getPsrConsumer(); $logger = $context->getLogger(); - $extension->onBeforeReceive($context); + if (false == $context->getPsrMessage() instanceof AmqpContext) { + $extension->onBeforeReceive($context); + } if ($context->isExecutionInterrupted()) { throw new ConsumptionInterruptedException(); @@ -307,6 +309,10 @@ protected function doConsume(ExtensionInterface $extension, Context $context) $logger->info(sprintf('Message processed: %s', $context->getResult())); $extension->onPostReceived($context); + + if ($context->getPsrMessage() instanceof AmqpContext) { + $extension->onBeforeReceive($context); + } } else { usleep($this->idleTimeout * 1000); $extension->onIdle($context);