diff --git a/pkg/enqueue/Client/ConsumptionExtension/FlushSpoolProducerExtension.php b/pkg/enqueue/Client/ConsumptionExtension/FlushSpoolProducerExtension.php index b7e9ca016..6682cad8e 100644 --- a/pkg/enqueue/Client/ConsumptionExtension/FlushSpoolProducerExtension.php +++ b/pkg/enqueue/Client/ConsumptionExtension/FlushSpoolProducerExtension.php @@ -15,9 +15,6 @@ class FlushSpoolProducerExtension implements PostMessageReceivedExtensionInterfa */ private $producer; - /** - * @param SpoolProducer $producer - */ public function __construct(SpoolProducer $producer) { $this->producer = $producer; diff --git a/pkg/enqueue/Client/ConsumptionExtension/LogExtension.php b/pkg/enqueue/Client/ConsumptionExtension/LogExtension.php new file mode 100644 index 000000000..693be2035 --- /dev/null +++ b/pkg/enqueue/Client/ConsumptionExtension/LogExtension.php @@ -0,0 +1,69 @@ +getResult(); + $message = $context->getMessage(); + + $logLevel = Result::REJECT == ((string) $result) ? LogLevel::ERROR : LogLevel::INFO; + + if ($command = $message->getProperty(Config::COMMAND)) { + $reason = ''; + $logMessage = "[client] Processed {command}\t{body}\t{result}"; + if ($result instanceof Result && $result->getReason()) { + $reason = $result->getReason(); + + $logMessage .= ' {reason}'; + } + + $context->getLogger()->log($logLevel, $logMessage, [ + 'result' => str_replace('enqueue.', '', $result), + 'reason' => $reason, + 'command' => $command, + 'queueName' => $context->getConsumer()->getQueue()->getQueueName(), + 'body' => Stringify::that($message->getBody()), + 'properties' => Stringify::that($message->getProperties()), + 'headers' => Stringify::that($message->getHeaders()), + ]); + + return; + } + + $topic = $message->getProperty(Config::TOPIC); + $processor = $message->getProperty(Config::PROCESSOR); + if ($topic && $processor) { + $reason = ''; + $logMessage = "[client] Processed {topic} -> {processor}\t{body}\t{result}"; + if ($result instanceof Result && $result->getReason()) { + $reason = $result->getReason(); + + $logMessage .= ' {reason}'; + } + + $context->getLogger()->log($logLevel, $logMessage, [ + 'result' => str_replace('enqueue.', '', $result), + 'reason' => $reason, + 'topic' => $topic, + 'processor' => $processor, + 'queueName' => $context->getConsumer()->getQueue()->getQueueName(), + 'body' => Stringify::that($message->getBody()), + 'properties' => Stringify::that($message->getProperties()), + 'headers' => Stringify::that($message->getHeaders()), + ]); + + return; + } + + parent::onPostMessageReceived($context); + } +} diff --git a/pkg/enqueue/Consumption/ChainExtension.php b/pkg/enqueue/Consumption/ChainExtension.php index 68c4dd1de..6911b5b6b 100644 --- a/pkg/enqueue/Consumption/ChainExtension.php +++ b/pkg/enqueue/Consumption/ChainExtension.php @@ -3,6 +3,7 @@ namespace Enqueue\Consumption; use Enqueue\Consumption\Context\End; +use Enqueue\Consumption\Context\InitLogger; use Enqueue\Consumption\Context\MessageReceived; use Enqueue\Consumption\Context\MessageResult; use Enqueue\Consumption\Context\PostConsume; @@ -15,6 +16,7 @@ class ChainExtension implements ExtensionInterface { private $startExtensions; + private $initLoggerExtensions; private $preSubscribeExtensions; private $preConsumeExtensions; private $messageReceivedExtensions; @@ -24,12 +26,10 @@ class ChainExtension implements ExtensionInterface private $postConsumeExtensions; private $endExtensions; - /** - * @param ExtensionInterface[] $extensions - */ public function __construct(array $extensions) { $this->startExtensions = []; + $this->initLoggerExtensions = []; $this->preSubscribeExtensions = []; $this->preConsumeExtensions = []; $this->messageReceivedExtensions = []; @@ -42,6 +42,7 @@ public function __construct(array $extensions) array_walk($extensions, function ($extension) { if ($extension instanceof ExtensionInterface) { $this->startExtensions[] = $extension; + $this->initLoggerExtensions[] = $extension; $this->preSubscribeExtensions[] = $extension; $this->preConsumeExtensions[] = $extension; $this->messageReceivedExtensions[] = $extension; @@ -61,6 +62,12 @@ public function __construct(array $extensions) $extensionValid = true; } + if ($extension instanceof InitLoggerExtensionInterface) { + $this->initLoggerExtensions[] = $extension; + + $extensionValid = true; + } + if ($extension instanceof PreSubscribeExtensionInterface) { $this->preSubscribeExtensions[] = $extension; @@ -115,6 +122,13 @@ public function __construct(array $extensions) }); } + public function onInitLogger(InitLogger $context): void + { + foreach ($this->initLoggerExtensions as $extension) { + $extension->onInitLogger($context); + } + } + public function onStart(Start $context): void { foreach ($this->startExtensions as $extension) { diff --git a/pkg/enqueue/Consumption/Context/InitLogger.php b/pkg/enqueue/Consumption/Context/InitLogger.php new file mode 100644 index 000000000..c48057268 --- /dev/null +++ b/pkg/enqueue/Consumption/Context/InitLogger.php @@ -0,0 +1,28 @@ +logger = $logger; + } + + public function getLogger(): LoggerInterface + { + return $this->logger; + } + + public function changeLogger(LoggerInterface $logger): void + { + $this->logger = $logger; + } +} diff --git a/pkg/enqueue/Consumption/Context/PostMessageReceived.php b/pkg/enqueue/Consumption/Context/PostMessageReceived.php index 85ecaecb7..423830e3d 100644 --- a/pkg/enqueue/Consumption/Context/PostMessageReceived.php +++ b/pkg/enqueue/Consumption/Context/PostMessageReceived.php @@ -3,6 +3,7 @@ namespace Enqueue\Consumption\Context; use Enqueue\Consumption\Result; +use Interop\Queue\Consumer; use Interop\Queue\Context; use Interop\Queue\Message; use Psr\Log\LoggerInterface; @@ -14,6 +15,11 @@ final class PostMessageReceived */ private $context; + /** + * @var Consumer + */ + private $consumer; + /** * @var Message */ @@ -41,12 +47,14 @@ final class PostMessageReceived public function __construct( Context $context, + Consumer $consumer, Message $message, $result, int $receivedAt, LoggerInterface $logger ) { $this->context = $context; + $this->consumer = $consumer; $this->message = $message; $this->result = $result; $this->receivedAt = $receivedAt; @@ -60,6 +68,11 @@ public function getContext(): Context return $this->context; } + public function getConsumer(): Consumer + { + return $this->consumer; + } + public function getMessage(): Message { return $this->message; diff --git a/pkg/enqueue/Consumption/Context/Start.php b/pkg/enqueue/Consumption/Context/Start.php index d1a7be375..cd9f2108e 100644 --- a/pkg/enqueue/Consumption/Context/Start.php +++ b/pkg/enqueue/Consumption/Context/Start.php @@ -62,11 +62,6 @@ public function getLogger(): LoggerInterface return $this->logger; } - public function changeLogger(LoggerInterface $logger): void - { - $this->logger = $logger; - } - /** * In milliseconds. */ diff --git a/pkg/enqueue/Consumption/Extension/LogExtension.php b/pkg/enqueue/Consumption/Extension/LogExtension.php new file mode 100644 index 000000000..14383c4d1 --- /dev/null +++ b/pkg/enqueue/Consumption/Extension/LogExtension.php @@ -0,0 +1,67 @@ +getLogger()->debug('Consumption has started'); + } + + public function onEnd(End $context): void + { + $context->getLogger()->debug('Consumption has ended'); + } + + public function onMessageReceived(MessageReceived $context): void + { + $message = $context->getMessage(); + + $context->getLogger()->debug("Received from {queueName}\t{body}", [ + 'queueName' => $context->getConsumer()->getQueue()->getQueueName(), + 'redelivered' => $message->isRedelivered(), + 'body' => Stringify::that($message->getBody()), + 'properties' => Stringify::that($message->getProperties()), + 'headers' => Stringify::that($message->getHeaders()), + ]); + } + + public function onPostMessageReceived(PostMessageReceived $context): void + { + $message = $context->getMessage(); + $queue = $context->getConsumer()->getQueue(); + $result = $context->getResult(); + + $reason = ''; + $logMessage = "Processed from {queueName}\t{body}\t{result}"; + if ($result instanceof Result && $result->getReason()) { + $reason = $result->getReason(); + $logMessage .= ' {reason}'; + } + $logContext = [ + 'result' => str_replace('enqueue.', '', $result), + 'reason' => $reason, + 'queueName' => $queue->getQueueName(), + 'body' => Stringify::that($message->getBody()), + 'properties' => Stringify::that($message->getProperties()), + 'headers' => Stringify::that($message->getHeaders()), + ]; + + $logLevel = Result::REJECT == ((string) $result) ? LogLevel::ERROR : LogLevel::INFO; + + $context->getLogger()->log($logLevel, $logMessage, $logContext); + } +} diff --git a/pkg/enqueue/Consumption/Extension/LoggerExtension.php b/pkg/enqueue/Consumption/Extension/LoggerExtension.php index a240a1263..0de2739c1 100644 --- a/pkg/enqueue/Consumption/Extension/LoggerExtension.php +++ b/pkg/enqueue/Consumption/Extension/LoggerExtension.php @@ -2,16 +2,11 @@ namespace Enqueue\Consumption\Extension; -use Enqueue\Consumption\Context\PostMessageReceived; -use Enqueue\Consumption\Context\Start; -use Enqueue\Consumption\PostMessageReceivedExtensionInterface; -use Enqueue\Consumption\Result; -use Enqueue\Consumption\StartExtensionInterface; -use Interop\Queue\Message as InteropMessage; +use Enqueue\Consumption\Context\InitLogger; +use Enqueue\Consumption\InitLoggerExtensionInterface; use Psr\Log\LoggerInterface; -use Psr\Log\NullLogger; -class LoggerExtension implements StartExtensionInterface, PostMessageReceivedExtensionInterface +class LoggerExtension implements InitLoggerExtensionInterface { /** * @var LoggerInterface @@ -26,59 +21,14 @@ public function __construct(LoggerInterface $logger) $this->logger = $logger; } - public function onStart(Start $context): void + public function onInitLogger(InitLogger $context): void { - if ($context->getLogger() && false == $context->getLogger() instanceof NullLogger) { - $context->getLogger()->debug(sprintf( - 'Skip setting context\'s logger "%s". Another one "%s" has already been set.', - get_class($this->logger), - get_class($context->getLogger()) - )); - } else { - $context->changeLogger($this->logger); - $this->logger->debug(sprintf('Set context\'s logger "%s"', get_class($this->logger))); - } - } - - public function onPostMessageReceived(PostMessageReceived $context): void - { - if (false == $context->getResult() instanceof Result) { - return; - } - - /** @var $result Result */ - $result = $context->getResult(); + $previousLogger = $context->getLogger(); - switch ($result->getStatus()) { - case Result::REJECT: - case Result::REQUEUE: - if ($result->getReason()) { - $this->logger->error($result->getReason(), $this->messageToLogContext($context->getMessage())); - } - - break; - case Result::ACK: - if ($result->getReason()) { - $this->logger->info($result->getReason(), $this->messageToLogContext($context->getMessage())); - } + if ($previousLogger !== $this->logger) { + $context->changeLogger($this->logger); - break; - default: - throw new \LogicException(sprintf('Got unexpected message result. "%s"', $result->getStatus())); + $this->logger->debug(sprintf('Change logger from "%s" to "%s"', get_class($previousLogger), get_class($this->logger))); } } - - /** - * @param InteropMessage $message - * - * @return array - */ - private function messageToLogContext(InteropMessage $message) - { - return [ - 'body' => $message->getBody(), - 'headers' => $message->getHeaders(), - 'properties' => $message->getProperties(), - ]; - } } diff --git a/pkg/enqueue/Consumption/ExtensionInterface.php b/pkg/enqueue/Consumption/ExtensionInterface.php index b9280241d..326a98f0d 100644 --- a/pkg/enqueue/Consumption/ExtensionInterface.php +++ b/pkg/enqueue/Consumption/ExtensionInterface.php @@ -2,6 +2,6 @@ namespace Enqueue\Consumption; -interface ExtensionInterface extends StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, MessageReceivedExtensionInterface, PostMessageReceivedExtensionInterface, MessageResultExtensionInterface, ProcessorExceptionExtensionInterface, PostConsumeExtensionInterface, EndExtensionInterface +interface ExtensionInterface extends StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, MessageReceivedExtensionInterface, PostMessageReceivedExtensionInterface, MessageResultExtensionInterface, ProcessorExceptionExtensionInterface, PostConsumeExtensionInterface, EndExtensionInterface, InitLoggerExtensionInterface { } diff --git a/pkg/enqueue/Consumption/InitLoggerExtensionInterface.php b/pkg/enqueue/Consumption/InitLoggerExtensionInterface.php new file mode 100644 index 000000000..936e32d6e --- /dev/null +++ b/pkg/enqueue/Consumption/InitLoggerExtensionInterface.php @@ -0,0 +1,14 @@ +extension = $runtimeExtension ? + $extension = $runtimeExtension ? new ChainExtension([$this->staticExtension, $runtimeExtension]) : $this->staticExtension ; + $initLogger = new InitLogger($this->logger); + $extension->onInitLogger($initLogger); + + $this->logger = $initLogger->getLogger(); + $startTime = (int) (microtime(true) * 1000); $start = new Start( @@ -143,10 +144,10 @@ public function consume(ExtensionInterface $runtimeExtension = null): void $startTime ); - $this->extension->onStart($start); + $extension->onStart($start); if ($start->isExecutionInterrupted()) { - $this->onEnd($startTime); + $this->onEnd($extension, $startTime); return; } @@ -176,7 +177,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void $receivedMessagesCount = 0; $interruptExecution = false; - $callback = function (InteropMessage $message, Consumer $consumer) use (&$receivedMessagesCount, &$interruptExecution) { + $callback = function (InteropMessage $message, Consumer $consumer) use (&$receivedMessagesCount, &$interruptExecution, $extension) { ++$receivedMessagesCount; $receivedAt = (int) (microtime(true) * 1000); @@ -188,19 +189,19 @@ public function consume(ExtensionInterface $runtimeExtension = null): void $processor = $this->boundProcessors[$queue->getQueueName()]->getProcessor(); $messageReceived = new MessageReceived($this->interopContext, $consumer, $message, $processor, $receivedAt, $this->logger); - $this->extension->onMessageReceived($messageReceived); + $extension->onMessageReceived($messageReceived); $result = $messageReceived->getResult(); $processor = $messageReceived->getProcessor(); if (null === $result) { try { $result = $processor->process($message, $this->interopContext); } catch (\Exception $e) { - $result = $this->onProcessorException($message, $e, $receivedAt); + $result = $this->onProcessorException($extension, $message, $e, $receivedAt); } } $messageResult = new MessageResult($this->interopContext, $message, $result, $receivedAt, $this->logger); - $this->extension->onResult($messageResult); + $extension->onResult($messageResult); $result = $messageResult->getResult(); switch ($result) { @@ -219,8 +220,8 @@ public function consume(ExtensionInterface $runtimeExtension = null): void throw new \LogicException(sprintf('Status is not supported: %s', $result)); } - $postMessageReceived = new PostMessageReceived($this->interopContext, $message, $result, $receivedAt, $this->logger); - $this->extension->onPostMessageReceived($postMessageReceived); + $postMessageReceived = new PostMessageReceived($this->interopContext, $consumer, $message, $result, $receivedAt, $this->logger); + $extension->onPostMessageReceived($postMessageReceived); if ($postMessageReceived->isExecutionInterrupted()) { $interruptExecution = true; @@ -241,7 +242,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void $this->logger ); - $this->extension->onPreSubscribe($preSubscribe); + $extension->onPreSubscribe($preSubscribe); $subscriptionConsumer->subscribe($consumer, $callback); } @@ -252,10 +253,10 @@ public function consume(ExtensionInterface $runtimeExtension = null): void $interruptExecution = false; $preConsume = new PreConsume($this->interopContext, $subscriptionConsumer, $this->logger, $cycle, $this->receiveTimeout, $startTime); - $this->extension->onPreConsume($preConsume); + $extension->onPreConsume($preConsume); if ($preConsume->isExecutionInterrupted()) { - $this->onEnd($startTime, $subscriptionConsumer); + $this->onEnd($extension, $startTime, $subscriptionConsumer); return; } @@ -263,10 +264,10 @@ public function consume(ExtensionInterface $runtimeExtension = null): void $subscriptionConsumer->consume($this->receiveTimeout); $postConsume = new PostConsume($this->interopContext, $subscriptionConsumer, $receivedMessagesCount, $cycle, $startTime, $this->logger); - $this->extension->onPostConsume($postConsume); + $extension->onPostConsume($postConsume); if ($interruptExecution || $postConsume->isExecutionInterrupted()) { - $this->onEnd($startTime, $subscriptionConsumer); + $this->onEnd($extension, $startTime, $subscriptionConsumer); return; } @@ -285,11 +286,11 @@ public function setFallbackSubscriptionConsumer(SubscriptionConsumer $fallbackSu $this->fallbackSubscriptionConsumer = $fallbackSubscriptionConsumer; } - private function onEnd(int $startTime, SubscriptionConsumer $subscriptionConsumer = null): void + private function onEnd(ExtensionInterface $extension, int $startTime, SubscriptionConsumer $subscriptionConsumer = null): void { $endTime = (int) (microtime(true) * 1000); - $this->extension->onEnd(new End($this->interopContext, $startTime, $endTime, $this->logger)); + $extension->onEnd(new End($this->interopContext, $startTime, $endTime, $this->logger)); if ($subscriptionConsumer) { $subscriptionConsumer->unsubscribeAll(); @@ -301,12 +302,12 @@ private function onEnd(int $startTime, SubscriptionConsumer $subscriptionConsume * * https://github.com/symfony/symfony/blob/cbe289517470eeea27162fd2d523eb29c95f775f/src/Symfony/Component/HttpKernel/EventListener/ExceptionListener.php#L77 */ - private function onProcessorException(Message $message, \Exception $exception, int $receivedAt) + private function onProcessorException(ExtensionInterface $extension, Message $message, \Exception $exception, int $receivedAt) { $processorException = new ProcessorException($this->interopContext, $message, $exception, $receivedAt, $this->logger); try { - $this->extension->onProcessorException($processorException); + $extension->onProcessorException($processorException); $result = $processorException->getResult(); if (null === $result) { diff --git a/pkg/enqueue/Consumption/QueueConsumerInterface.php b/pkg/enqueue/Consumption/QueueConsumerInterface.php index ed229381b..c9c99ef15 100644 --- a/pkg/enqueue/Consumption/QueueConsumerInterface.php +++ b/pkg/enqueue/Consumption/QueueConsumerInterface.php @@ -35,7 +35,7 @@ public function bindCallback($queue, callable $processor): self; * Runtime extension - is an extension or a collection of extensions which could be set on runtime. * Here's a good example: @see LimitsExtensionsCommandTrait. * - * @param ExtensionInterface|ChainExtension|null $runtimeExtension + * @param ExtensionInterface|null $runtimeExtension * * @throws \Exception */ diff --git a/pkg/enqueue/Symfony/Consumption/LimitsExtensionsCommandTrait.php b/pkg/enqueue/Symfony/Consumption/LimitsExtensionsCommandTrait.php index 2a756180c..b721af925 100644 --- a/pkg/enqueue/Symfony/Consumption/LimitsExtensionsCommandTrait.php +++ b/pkg/enqueue/Symfony/Consumption/LimitsExtensionsCommandTrait.php @@ -13,9 +13,6 @@ trait LimitsExtensionsCommandTrait { - /** - * {@inheritdoc} - */ protected function configureLimitsExtensions() { $this diff --git a/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php b/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php index b60420c52..02347164c 100644 --- a/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php +++ b/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php @@ -5,6 +5,7 @@ use Enqueue\ConnectionFactoryFactory; use Enqueue\ConnectionFactoryFactoryInterface; use Enqueue\Consumption\ChainExtension; +use Enqueue\Consumption\Extension\LogExtension; use Enqueue\Consumption\QueueConsumer; use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Resources; @@ -166,6 +167,11 @@ public function buildQueueConsumer(ContainerBuilder $container, array $config): $container->setParameter($this->format('receive_timeout'), $config['receive_timeout'] ?? 10000); + $logExtensionId = $this->format('log_extension'); + $container->register($logExtensionId, LogExtension::class) + ->addTag('enqueue.transport.consumption_extension', ['transport' => $this->name, 'priority' => -100]) + ; + $container->register($this->format('consumption_extensions'), ChainExtension::class) ->addArgument([]) ; diff --git a/pkg/enqueue/Tests/Client/ConsumptionExtension/FlushSpoolProducerExtensionTest.php b/pkg/enqueue/Tests/Client/ConsumptionExtension/FlushSpoolProducerExtensionTest.php index 4f3af656e..53d88aa8d 100644 --- a/pkg/enqueue/Tests/Client/ConsumptionExtension/FlushSpoolProducerExtensionTest.php +++ b/pkg/enqueue/Tests/Client/ConsumptionExtension/FlushSpoolProducerExtensionTest.php @@ -9,6 +9,7 @@ use Enqueue\Consumption\EndExtensionInterface; use Enqueue\Consumption\PostMessageReceivedExtensionInterface; use Enqueue\Test\ClassExtensionTrait; +use Interop\Queue\Consumer; use Interop\Queue\Context; use Interop\Queue\Message; use PHPUnit\Framework\TestCase; @@ -57,6 +58,7 @@ public function testShouldFlushSpoolProducerOnPostReceived() $context = new PostMessageReceived( $this->createInteropContextMock(), + $this->createMock(Consumer::class), $this->createMock(Message::class), 'aResult', 1, diff --git a/pkg/enqueue/Tests/Client/ConsumptionExtension/LogExtensionTest.php b/pkg/enqueue/Tests/Client/ConsumptionExtension/LogExtensionTest.php new file mode 100644 index 000000000..0bd6514d8 --- /dev/null +++ b/pkg/enqueue/Tests/Client/ConsumptionExtension/LogExtensionTest.php @@ -0,0 +1,540 @@ +assertClassImplements(StartExtensionInterface::class, LogExtension::class); + } + + public function testShouldImplementEndExtensionInterface() + { + $this->assertClassImplements(EndExtensionInterface::class, LogExtension::class); + } + + public function testShouldImplementMessageReceivedExtensionInterface() + { + $this->assertClassImplements(MessageReceivedExtensionInterface::class, LogExtension::class); + } + + public function testShouldImplementPostMessageReceivedExtensionInterface() + { + $this->assertClassImplements(PostMessageReceivedExtensionInterface::class, LogExtension::class); + } + + public function testShouldSubClassOfLogExtension() + { + $this->assertClassExtends(\Enqueue\Consumption\Extension\LogExtension::class, LogExtension::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new LogExtension(); + } + + public function testShouldLogStartOnStart() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('debug') + ->with('Consumption has started') + ; + + $context = new Start($this->createContextMock(), $logger, [], 1, 1); + + $extension = new LogExtension(); + $extension->onStart($context); + } + + public function testShouldLogEndOnEnd() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('debug') + ->with('Consumption has ended') + ; + + $context = new End($this->createContextMock(), 1, 2, $logger); + + $extension = new LogExtension(); + $extension->onEnd($context); + } + + public function testShouldLogMessageReceived() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('debug') + ->with('Received from {queueName} {body}', [ + 'queueName' => 'aQueue', + 'redelivered' => false, + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + ]) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new MessageReceived($this->createContextMock(), $consumerMock, $message, $this->createProcessorMock(), 1, $logger); + + $extension = new LogExtension(); + $extension->onMessageReceived($context); + } + + public function testShouldLogMessageProcessedWithStringResult() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + 'Processed from {queueName} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'aResult', + 'reason' => '', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, 'aResult', 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogRejectedMessageAsError() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::ERROR, + 'Processed from {queueName} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'reject', + 'reason' => '', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Processor::REJECT, 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogMessageProcessedWithResultObject() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + 'Processed from {queueName} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'ack', + 'reason' => '', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Result::ack(), 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogMessageProcessedWithReasonResultObject() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + 'Processed from {queueName} {body} {result} {reason}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'ack', + 'reason' => 'aReason', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Result::ack('aReason'), 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogProcessedCommandMessageWithStringResult() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + '[client] Processed {command} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal', Config::COMMAND => 'aCommand']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'aResult', + 'reason' => '', + 'command' => 'aCommand', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty(Config::COMMAND, 'aCommand'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, 'aResult', 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogRejectedCommandMessageAsError() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::ERROR, + '[client] Processed {command} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal', Config::COMMAND => 'aCommand']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'reject', + 'reason' => '', + 'command' => 'aCommand', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setProperty(Config::COMMAND, 'aCommand'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Processor::REJECT, 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogProcessedCommandMessageWithResultObject() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + '[client] Processed {command} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal', Config::COMMAND => 'aCommand']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'ack', + 'reason' => '', + 'command' => 'aCommand', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setProperty(Config::COMMAND, 'aCommand'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Result::ack(), 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogProcessedCommandMessageWithReasonResultObject() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + '[client] Processed {command} {body} {result} {reason}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal', Config::COMMAND => 'aCommand']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'ack', + 'reason' => 'aReason', + 'command' => 'aCommand', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setProperty(Config::COMMAND, 'aCommand'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Result::ack('aReason'), 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogProcessedTopicProcessorMessageWithStringResult() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + '[client] Processed {topic} -> {processor} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal', Config::TOPIC => 'aTopic', Config::PROCESSOR => 'aProcessor']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'aResult', + 'reason' => '', + 'topic' => 'aTopic', + 'processor' => 'aProcessor', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty(Config::TOPIC, 'aTopic'); + $message->setProperty(Config::PROCESSOR, 'aProcessor'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, 'aResult', 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogRejectedTopicProcessorMessageAsError() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::ERROR, + '[client] Processed {topic} -> {processor} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal', Config::TOPIC => 'aTopic', Config::PROCESSOR => 'aProcessor']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'reject', + 'reason' => '', + 'topic' => 'aTopic', + 'processor' => 'aProcessor', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty(Config::TOPIC, 'aTopic'); + $message->setProperty(Config::PROCESSOR, 'aProcessor'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Processor::REJECT, 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogProcessedTopicProcessorMessageWithResultObject() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + '[client] Processed {topic} -> {processor} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal', Config::TOPIC => 'aTopic', Config::PROCESSOR => 'aProcessor']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'ack', + 'reason' => '', + 'topic' => 'aTopic', + 'processor' => 'aProcessor', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty(Config::TOPIC, 'aTopic'); + $message->setProperty(Config::PROCESSOR, 'aProcessor'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Result::ack(), 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogProcessedTopicProcessorMessageWithReasonResultObject() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + '[client] Processed {topic} -> {processor} {body} {result} {reason}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal', Config::TOPIC => 'aTopic', Config::PROCESSOR => 'aProcessor']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'ack', + 'reason' => 'aReason', + 'topic' => 'aTopic', + 'processor' => 'aProcessor', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty(Config::TOPIC, 'aTopic'); + $message->setProperty(Config::PROCESSOR, 'aProcessor'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Result::ack('aReason'), 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject + */ + private function createConsumerStub(Queue $queue): Consumer + { + $consumerMock = $this->createMock(Consumer::class); + $consumerMock + ->expects($this->any()) + ->method('getQueue') + ->willReturn($queue) + ; + + return $consumerMock; + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject + */ + private function createContextMock(): Context + { + return $this->createMock(Context::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject + */ + private function createProcessorMock(): Processor + { + return $this->createMock(Processor::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|LoggerInterface + */ + private function createLogger() + { + return $this->createMock(LoggerInterface::class); + } +} diff --git a/pkg/enqueue/Tests/Consumption/ChainExtensionTest.php b/pkg/enqueue/Tests/Consumption/ChainExtensionTest.php index cf905d898..dcde84db3 100644 --- a/pkg/enqueue/Tests/Consumption/ChainExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/ChainExtensionTest.php @@ -4,6 +4,7 @@ use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Context\End; +use Enqueue\Consumption\Context\InitLogger; use Enqueue\Consumption\Context\MessageReceived; use Enqueue\Consumption\Context\MessageResult; use Enqueue\Consumption\Context\PostConsume; @@ -36,6 +37,28 @@ public function testCouldBeConstructedWithExtensionsArray() new ChainExtension([$this->createExtension(), $this->createExtension()]); } + public function testShouldProxyOnInitLoggerToAllInternalExtensions() + { + $context = new InitLogger(new NullLogger()); + + $fooExtension = $this->createExtension(); + $fooExtension + ->expects($this->once()) + ->method('onInitLogger') + ->with($this->identicalTo($context)) + ; + $barExtension = $this->createExtension(); + $barExtension + ->expects($this->once()) + ->method('onInitLogger') + ->with($this->identicalTo($context)) + ; + + $extensions = new ChainExtension([$fooExtension, $barExtension]); + + $extensions->onInitLogger($context); + } + public function testShouldProxyOnStartToAllInternalExtensions() { $context = new Start($this->createInteropContextMock(), $this->createLoggerMock(), [], 0, 0); @@ -174,6 +197,7 @@ public function testShouldProxyOnPostReceiveToAllInternalExtensions() { $context = new PostMessageReceived( $this->createInteropContextMock(), + $this->createMock(Consumer::class), $this->createMock(Message::class), 'aResult', 1, diff --git a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumedMessagesExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumedMessagesExtensionTest.php index 2f4f8041b..37d592e68 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumedMessagesExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumedMessagesExtensionTest.php @@ -5,6 +5,7 @@ use Enqueue\Consumption\Context\PostMessageReceived; use Enqueue\Consumption\Context\PreConsume; use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension; +use Interop\Queue\Consumer; use Interop\Queue\Context; use Interop\Queue\Message; use Interop\Queue\SubscriptionConsumer; @@ -49,6 +50,7 @@ public function testOnPreConsumeShouldInterruptWhenLimitIsReached() $postReceivedMessage = new PostMessageReceived( $this->createInteropContextMock(), + $this->createMock(Consumer::class), $this->createMock(Message::class), 'aResult', 1, @@ -135,6 +137,7 @@ public function testOnPostReceivedShouldInterruptExecutionIfMessageLimitExceeded $postReceivedMessage = new PostMessageReceived( $this->createInteropContextMock(), + $this->createMock(Consumer::class), $this->createMock(Message::class), 'aResult', 1, diff --git a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php index 8af630c51..de1531ea5 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php @@ -6,6 +6,7 @@ use Enqueue\Consumption\Context\PostMessageReceived; use Enqueue\Consumption\Context\PreConsume; use Enqueue\Consumption\Extension\LimitConsumerMemoryExtension; +use Interop\Queue\Consumer; use Interop\Queue\Context; use Interop\Queue\Message; use Interop\Queue\SubscriptionConsumer; @@ -66,6 +67,7 @@ public function testOnPostReceivedShouldInterruptExecutionIfMemoryLimitReached() $postReceivedMessage = new PostMessageReceived( $this->createInteropContextMock(), + $this->createMock(Consumer::class), $this->createMock(Message::class), 'aResult', 1, @@ -156,6 +158,7 @@ public function testOnPostMessageReceivedShouldNotInterruptExecutionIfMemoryLimi { $postReceivedMessage = new PostMessageReceived( $this->createInteropContextMock(), + $this->createMock(Consumer::class), $this->createMock(Message::class), 'aResult', 1, diff --git a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php index fe0cb5e89..1eaef7874 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php @@ -6,6 +6,7 @@ use Enqueue\Consumption\Context\PostMessageReceived; use Enqueue\Consumption\Context\PreConsume; use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension; +use Interop\Queue\Consumer; use Interop\Queue\Context; use Interop\Queue\Message; use Interop\Queue\SubscriptionConsumer; @@ -68,6 +69,7 @@ public function testOnPostReceivedShouldInterruptExecutionIfConsumptionTimeExcee { $postReceivedMessage = new PostMessageReceived( $this->createInteropContextMock(), + $this->createMock(Consumer::class), $this->createMock(Message::class), 'aResult', 1, @@ -133,6 +135,7 @@ public function testOnPostReceivedShouldNotInterruptExecutionIfConsumptionTimeIs { $postReceivedMessage = new PostMessageReceived( $this->createInteropContextMock(), + $this->createMock(Consumer::class), $this->createMock(Message::class), 'aResult', 1, diff --git a/pkg/enqueue/Tests/Consumption/Extension/LogExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LogExtensionTest.php new file mode 100644 index 000000000..8038e5504 --- /dev/null +++ b/pkg/enqueue/Tests/Consumption/Extension/LogExtensionTest.php @@ -0,0 +1,270 @@ +assertClassImplements(StartExtensionInterface::class, LogExtension::class); + } + + public function testShouldImplementEndExtensionInterface() + { + $this->assertClassImplements(EndExtensionInterface::class, LogExtension::class); + } + + public function testShouldImplementMessageReceivedExtensionInterface() + { + $this->assertClassImplements(MessageReceivedExtensionInterface::class, LogExtension::class); + } + + public function testShouldImplementPostMessageReceivedExtensionInterface() + { + $this->assertClassImplements(PostMessageReceivedExtensionInterface::class, LogExtension::class); + } + + public function testCouldBeConstructedWithoutAnyArguments() + { + new LogExtension(); + } + + public function testShouldLogStartOnStart() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('debug') + ->with('Consumption has started') + ; + + $context = new Start($this->createContextMock(), $logger, [], 1, 1); + + $extension = new LogExtension(); + $extension->onStart($context); + } + + public function testShouldLogEndOnEnd() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('debug') + ->with('Consumption has ended') + ; + + $context = new End($this->createContextMock(), 1, 2, $logger); + + $extension = new LogExtension(); + $extension->onEnd($context); + } + + public function testShouldLogMessageReceived() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('debug') + ->with('Received from {queueName} {body}', [ + 'queueName' => 'aQueue', + 'redelivered' => false, + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + ]) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new MessageReceived($this->createContextMock(), $consumerMock, $message, $this->createProcessorMock(), 1, $logger); + + $extension = new LogExtension(); + $extension->onMessageReceived($context); + } + + public function testShouldLogMessageProcessedWithStringResult() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + 'Processed from {queueName} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'aResult', + 'reason' => '', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, 'aResult', 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogRejectedMessageAsError() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::ERROR, + 'Processed from {queueName} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'reject', + 'reason' => '', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Processor::REJECT, 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogMessageProcessedWithResultObject() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + 'Processed from {queueName} {body} {result}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'ack', + 'reason' => '', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Result::ack(), 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + public function testShouldLogMessageProcessedWithReasonResultObject() + { + $logger = $this->createLogger(); + $logger + ->expects($this->once()) + ->method('log') + ->with(LogLevel::INFO, + 'Processed from {queueName} {body} {result} {reason}', + [ + 'queueName' => 'aQueue', + 'body' => Stringify::that('aBody'), + 'properties' => Stringify::that(['aProp' => 'aPropVal']), + 'headers' => Stringify::that(['aHeader' => 'aHeaderVal']), + 'result' => 'ack', + 'reason' => 'aReason', + ] + ) + ; + + $consumerMock = $this->createConsumerStub(new NullQueue('aQueue')); + $message = new NullMessage('aBody'); + $message->setProperty('aProp', 'aPropVal'); + $message->setHeader('aHeader', 'aHeaderVal'); + + $context = new PostMessageReceived($this->createContextMock(), $consumerMock, $message, Result::ack('aReason'), 1, $logger); + + $extension = new LogExtension(); + $extension->onPostMessageReceived($context); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject + */ + private function createConsumerStub(Queue $queue): Consumer + { + $consumerMock = $this->createMock(Consumer::class); + $consumerMock + ->expects($this->any()) + ->method('getQueue') + ->willReturn($queue) + ; + + return $consumerMock; + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject + */ + private function createContextMock(): Context + { + return $this->createMock(Context::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject + */ + private function createProcessorMock(): Processor + { + return $this->createMock(Processor::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|LoggerInterface + */ + private function createLogger() + { + return $this->createMock(LoggerInterface::class); + } +} diff --git a/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php index 37ef696d8..674720e23 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/LoggerExtensionTest.php @@ -2,17 +2,10 @@ namespace Enqueue\Tests\Consumption\Extension; -use Enqueue\Consumption\Context\PostMessageReceived; -use Enqueue\Consumption\Context\Start; +use Enqueue\Consumption\Context\InitLogger; use Enqueue\Consumption\Extension\LoggerExtension; -use Enqueue\Consumption\PostMessageReceivedExtensionInterface; -use Enqueue\Consumption\Result; -use Enqueue\Consumption\StartExtensionInterface; -use Enqueue\Null\NullMessage; +use Enqueue\Consumption\InitLoggerExtensionInterface; use Enqueue\Test\ClassExtensionTrait; -use Interop\Queue\Consumer; -use Interop\Queue\Context as InteropContext; -use Interop\Queue\Message; use PHPUnit\Framework\TestCase; use Psr\Log\LoggerInterface; use Psr\Log\NullLogger; @@ -21,14 +14,9 @@ class LoggerExtensionTest extends TestCase { use ClassExtensionTrait; - public function testShouldImplementStartExtensionInterface() + public function testShouldImplementInitLoggerExtensionInterface() { - $this->assertClassImplements(StartExtensionInterface::class, LoggerExtension::class); - } - - public function testShouldImplementPostMessageReceivedExtensionInterface() - { - $this->assertClassImplements(PostMessageReceivedExtensionInterface::class, LoggerExtension::class); + $this->assertClassImplements(InitLoggerExtensionInterface::class, LoggerExtension::class); } public function testCouldBeConstructedWithLoggerAsFirstArgument() @@ -36,180 +24,53 @@ public function testCouldBeConstructedWithLoggerAsFirstArgument() new LoggerExtension($this->createLogger()); } - public function testShouldSetLoggerToContextOnStart() + public function testShouldSetLoggerToContextOnInitLogger() { $logger = $this->createLogger(); $extension = new LoggerExtension($logger); - $context = new Start($this->createContextMock(), new NullLogger(), [], 0, 0); + $previousLogger = new NullLogger(); + $context = new InitLogger($previousLogger); - $extension->onStart($context); + $extension->onInitLogger($context); $this->assertSame($logger, $context->getLogger()); } public function testShouldAddInfoMessageOnStart() { - $logger = $this->createLogger(); - $logger - ->expects($this->once()) - ->method('debug') - ->with($this->stringStartsWith('Set context\'s logger')) - ; - - $extension = new LoggerExtension($logger); - - $context = new Start($this->createContextMock(), new NullLogger(), [], 0, 0); - - $extension->onStart($context); - } - - public function testShouldLogRejectMessageStatus() - { - $logger = $this->createLogger(); - $logger - ->expects($this->once()) - ->method('error') - ->with('reason', ['body' => 'message body', 'headers' => [], 'properties' => []]) - ; - - $extension = new LoggerExtension($logger); - - $message = new NullMessage(); - $message->setBody('message body'); - - $postReceivedMessage = new PostMessageReceived( - $this->createContextMock(), - $message, - Result::reject('reason'), - 1, - $logger - ); - - $extension->onPostMessageReceived($postReceivedMessage); - } - - public function testShouldLogRequeueMessageStatus() - { - $logger = $this->createLogger(); - $logger - ->expects($this->once()) - ->method('error') - ->with('reason', ['body' => 'message body', 'headers' => [], 'properties' => []]) - ; - - $extension = new LoggerExtension($logger); - - $message = new NullMessage(); - $message->setBody('message body'); - - $postReceivedMessage = new PostMessageReceived( - $this->createContextMock(), - $message, - Result::requeue('reason'), - 1, - $logger - ); - - $extension->onPostMessageReceived($postReceivedMessage); - } - - public function testShouldNotLogRequeueMessageStatusIfReasonIsEmpty() - { - $logger = $this->createLogger(); - $logger - ->expects($this->never()) - ->method('error') - ; - - $extension = new LoggerExtension($logger); + $previousLogger = $this->createLogger(); - $postReceivedMessage = new PostMessageReceived( - $this->createContextMock(), - $this->createMock(Message::class), - Result::requeue(), - 1, - $logger - ); - - $extension->onPostMessageReceived($postReceivedMessage); - } - - public function testShouldLogAckMessageStatus() - { $logger = $this->createLogger(); $logger ->expects($this->once()) - ->method('info') - ->with('reason', ['body' => 'message body', 'headers' => [], 'properties' => []]) + ->method('debug') + ->with(sprintf('Change logger from "%s" to "%s"', get_class($logger), get_class($previousLogger))) ; $extension = new LoggerExtension($logger); - $message = new NullMessage(); - $message->setBody('message body'); + $context = new InitLogger($previousLogger); - $postReceivedMessage = new PostMessageReceived( - $this->createContextMock(), - $message, - Result::ack('reason'), - 1, - $logger - ); - - $extension->onPostMessageReceived($postReceivedMessage); + $extension->onInitLogger($context); } - public function testShouldNotLogAckMessageStatusIfReasonIsEmpty() + public function testShouldDoNothingIfSameLoggerInstanceAlreadySet() { $logger = $this->createLogger(); $logger ->expects($this->never()) - ->method('info') - ; - - $extension = new LoggerExtension($logger); - - $postReceivedMessage = new PostMessageReceived( - $this->createContextMock(), - $this->createMock(Message::class), - Result::ack(), - 1, - $logger - ); - - $extension->onPostMessageReceived($postReceivedMessage); - } - - public function testShouldNotSetLoggerIfOneHasBeenSetOnStart() - { - $logger = $this->createLogger(); - - $alreadySetLogger = $this->createLogger(); - $alreadySetLogger - ->expects($this->once()) ->method('debug') - ->with(sprintf( - 'Skip setting context\'s logger "%s". Another one "%s" has already been set.', - get_class($logger), - get_class($alreadySetLogger) - )) ; $extension = new LoggerExtension($logger); - $context = new Start($this->createContextMock(), $alreadySetLogger, [], 0, 0); + $context = new InitLogger($logger); - $extension->onStart($context); - } + $extension->onInitLogger($context); - /** - * @return \PHPUnit_Framework_MockObject_MockObject|InteropContext - */ - protected function createContextMock(): InteropContext - { - return $this->createMock(InteropContext::class); + $this->assertSame($logger, $context->getLogger()); } /** @@ -219,12 +80,4 @@ protected function createLogger() { return $this->createMock(LoggerInterface::class); } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|Consumer - */ - protected function createConsumerMock() - { - return $this->createMock(Consumer::class); - } } diff --git a/pkg/enqueue/Tests/Consumption/Extension/ReplyExtensionTest.php b/pkg/enqueue/Tests/Consumption/Extension/ReplyExtensionTest.php index e78a7451f..18e5727fd 100644 --- a/pkg/enqueue/Tests/Consumption/Extension/ReplyExtensionTest.php +++ b/pkg/enqueue/Tests/Consumption/Extension/ReplyExtensionTest.php @@ -9,6 +9,7 @@ use Enqueue\Null\NullMessage; use Enqueue\Null\NullQueue; use Enqueue\Test\ClassExtensionTrait; +use Interop\Queue\Consumer; use Interop\Queue\Context; use Interop\Queue\Producer as InteropProducer; use PHPUnit\Framework\TestCase; @@ -34,6 +35,7 @@ public function testShouldDoNothingIfReceivedMessageNotHaveReplyToSet() $postReceivedMessage = new PostMessageReceived( $this->createNeverUsedContextMock(), + $this->createMock(Consumer::class), new NullMessage(), 'aResult', 1, @@ -52,6 +54,7 @@ public function testShouldDoNothingIfContextResultIsNotInstanceOfResult() $postReceivedMessage = new PostMessageReceived( $this->createNeverUsedContextMock(), + $this->createMock(Consumer::class), $message, 'notInstanceOfResult', 1, @@ -70,6 +73,7 @@ public function testShouldDoNothingIfResultInstanceOfResultButReplyMessageNotSet $postReceivedMessage = new PostMessageReceived( $this->createNeverUsedContextMock(), + $this->createMock(Consumer::class), $message, Result::ack(), 1, @@ -114,6 +118,7 @@ public function testShouldSendReplyMessageToReplyQueueOnPostReceived() $postReceivedMessage = new PostMessageReceived( $contextMock, + $this->createMock(Consumer::class), $message, Result::reply($replyMessage), 1, diff --git a/pkg/enqueue/Tests/Consumption/Mock/BreakCycleExtension.php b/pkg/enqueue/Tests/Consumption/Mock/BreakCycleExtension.php index 831c7a901..cbc2f8b1e 100644 --- a/pkg/enqueue/Tests/Consumption/Mock/BreakCycleExtension.php +++ b/pkg/enqueue/Tests/Consumption/Mock/BreakCycleExtension.php @@ -3,6 +3,7 @@ namespace Enqueue\Tests\Consumption\Mock; use Enqueue\Consumption\Context\End; +use Enqueue\Consumption\Context\InitLogger; use Enqueue\Consumption\Context\MessageReceived; use Enqueue\Consumption\Context\MessageResult; use Enqueue\Consumption\Context\PostConsume; @@ -24,6 +25,10 @@ public function __construct($limit) $this->limit = $limit; } + public function onInitLogger(InitLogger $context): void + { + } + public function onPostMessageReceived(PostMessageReceived $context): void { if ($this->cycles >= $this->limit) { diff --git a/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php b/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php index 50ced5ef6..9b0111fda 100644 --- a/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php +++ b/pkg/enqueue/Tests/Consumption/QueueConsumerTest.php @@ -6,6 +6,7 @@ use Enqueue\Consumption\CallbackProcessor; use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Context\End; +use Enqueue\Consumption\Context\InitLogger; use Enqueue\Consumption\Context\MessageReceived; use Enqueue\Consumption\Context\MessageResult; use Enqueue\Consumption\Context\PostConsume; @@ -591,6 +592,34 @@ public function testShouldNotPassMessageToProcessorIfItWasProcessedByExtension() $queueConsumer->consume(); } + public function testShouldCallOnInitLoggerExtensionMethod() + { + $consumerStub = $this->createConsumerStub('foo_queue'); + + $contextStub = $this->createContextStub($consumerStub); + + $processorMock = $this->createProcessorMock(); + + $logger = $this->createMock(LoggerInterface::class); + + $extension = $this->createExtension(); + $extension + ->expects($this->once()) + ->method('onInitLogger') + ->with($this->isInstanceOf(InitLogger::class)) + ->willReturnCallback(function (InitLogger $context) use ($logger) { + $this->assertSame($logger, $context->getLogger()); + }) + ; + + $chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]); + $queueConsumer = new QueueConsumer($contextStub, $chainExtensions, [], $logger); + $queueConsumer->setFallbackSubscriptionConsumer(new DummySubscriptionConsumer()); + $queueConsumer->bind(new NullQueue('foo_queue'), $processorMock); + + $queueConsumer->consume(); + } + public function testShouldCallOnStartExtensionMethod() { $consumerStub = $this->createConsumerStub('foo_queue'); @@ -1212,6 +1241,11 @@ public function testShouldCallExtensionPassedOnRuntime() ; $runtimeExtension = $this->createExtension(); + $runtimeExtension + ->expects($this->once()) + ->method('onInitLogger') + ->with($this->isInstanceOf(InitLogger::class)) + ; $runtimeExtension ->expects($this->once()) ->method('onStart') @@ -1250,7 +1284,7 @@ public function testShouldCallExtensionPassedOnRuntime() $queueConsumer->consume(new ChainExtension([$runtimeExtension])); } - public function testShouldChangeLoggerOnStart() + public function testShouldChangeLoggerOnInitLogger() { $expectedMessage = $this->createMessageMock(); @@ -1271,12 +1305,20 @@ public function testShouldChangeLoggerOnStart() $expectedLogger = new NullLogger(); $extension = $this->createExtension(); + $extension + ->expects($this->atLeastOnce()) + ->method('onInitLogger') + ->with($this->isInstanceOf(InitLogger::class)) + ->willReturnCallback(function (InitLogger $context) use ($expectedLogger) { + $context->changeLogger($expectedLogger); + }) + ; $extension ->expects($this->atLeastOnce()) ->method('onStart') ->with($this->isInstanceOf(Start::class)) ->willReturnCallback(function (Start $context) use ($expectedLogger) { - $context->changeLogger($expectedLogger); + $this->assertSame($expectedLogger, $context->getLogger()); }) ; $extension diff --git a/pkg/enqueue/Util/Stringify.php b/pkg/enqueue/Util/Stringify.php new file mode 100644 index 000000000..39b1f1305 --- /dev/null +++ b/pkg/enqueue/Util/Stringify.php @@ -0,0 +1,36 @@ +value = $value; + } + + public function __toString(): string + { + if (is_string($this->value) || is_scalar($this->value)) { + return $this->value; + } + + return json_encode($this->value, JSON_UNESCAPED_SLASHES); + } + + public static function that($value): self + { + return new static($value); + } +} diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index ea7f3c7ab..de5922df7 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -6,6 +6,7 @@ use Enqueue\Client\ChainExtension as ClientChainExtensions; use Enqueue\Client\Config; use Enqueue\Client\ConsumptionExtension\DelayRedeliveredMessageExtension; +use Enqueue\Client\ConsumptionExtension\LogExtension; use Enqueue\Client\ConsumptionExtension\SetRouterPropertiesExtension; use Enqueue\Client\DelegateProcessor; use Enqueue\Client\DriverFactory; @@ -26,6 +27,8 @@ use Enqueue\Rpc\RpcFactory; use Enqueue\Symfony\DependencyInjection\TransportFactory; use Interop\Queue\Processor; +use Psr\Log\LoggerInterface; +use Psr\Log\NullLogger; use Symfony\Component\Config\Definition\Builder\TreeBuilder; use Symfony\Component\Config\Definition\NodeInterface; use Symfony\Component\Config\Definition\Processor as ConfigProcessor; @@ -57,6 +60,11 @@ final class SimpleClient */ private $delegateProcessor; + /** + * @var LoggerInterface + */ + private $logger; + /** * The config could be a transport DSN (string) or an array, here's an example of a few DSNs:. * @@ -104,9 +112,11 @@ final class SimpleClient * * @param string|array $config */ - public function __construct($config) + public function __construct($config, LoggerInterface $logger = null) { $this->build(['enqueue' => $config]); + + $this->logger = $logger ?: new NullLogger(); } /** @@ -262,9 +272,10 @@ public function build(array $configs): void } $consumptionExtensions[] = new SetRouterPropertiesExtension($driver); + $consumptionExtensions[] = new LogExtension(); $consumptionChainExtension = new ConsumptionChainExtension($consumptionExtensions); - $queueConsumer = new QueueConsumer($driver->getContext(), $consumptionChainExtension); + $queueConsumer = new QueueConsumer($driver->getContext(), $consumptionChainExtension, [], $this->logger); $routerProcessor = new RouterProcessor($driver);