Skip to content

Commit dd4e46e

Browse files
committed
[consumption] Add LogExtension.
1 parent cf3ba56 commit dd4e46e

File tree

12 files changed

+411
-2
lines changed

12 files changed

+411
-2
lines changed

pkg/enqueue/Consumption/Context/PostMessageReceived.php

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Consumption\Context;
44

55
use Enqueue\Consumption\Result;
6+
use Interop\Queue\Consumer;
67
use Interop\Queue\Context;
78
use Interop\Queue\Message;
89
use Psr\Log\LoggerInterface;
@@ -14,6 +15,11 @@ final class PostMessageReceived
1415
*/
1516
private $context;
1617

18+
/**
19+
* @var Consumer
20+
*/
21+
private $consumer;
22+
1723
/**
1824
* @var Message
1925
*/
@@ -41,12 +47,14 @@ final class PostMessageReceived
4147

4248
public function __construct(
4349
Context $context,
50+
Consumer $consumer,
4451
Message $message,
4552
$result,
4653
int $receivedAt,
4754
LoggerInterface $logger
4855
) {
4956
$this->context = $context;
57+
$this->consumer = $consumer;
5058
$this->message = $message;
5159
$this->result = $result;
5260
$this->receivedAt = $receivedAt;
@@ -60,6 +68,11 @@ public function getContext(): Context
6068
return $this->context;
6169
}
6270

71+
public function getConsumer(): Consumer
72+
{
73+
return $this->consumer;
74+
}
75+
6376
public function getMessage(): Message
6477
{
6578
return $this->message;
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption\Extension;
4+
5+
use Enqueue\Consumption\Context\End;
6+
use Enqueue\Consumption\Context\MessageReceived;
7+
use Enqueue\Consumption\Context\PostMessageReceived;
8+
use Enqueue\Consumption\Context\Start;
9+
use Enqueue\Consumption\EndExtensionInterface;
10+
use Enqueue\Consumption\MessageReceivedExtensionInterface;
11+
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
12+
use Enqueue\Consumption\Result;
13+
use Enqueue\Consumption\StartExtensionInterface;
14+
use Enqueue\Util\Stringify;
15+
use Psr\Log\LogLevel;
16+
17+
class LogExtension implements StartExtensionInterface, MessageReceivedExtensionInterface, PostMessageReceivedExtensionInterface, EndExtensionInterface
18+
{
19+
public function onStart(Start $context): void
20+
{
21+
$context->getLogger()->debug('Consumption has started');
22+
}
23+
24+
public function onEnd(End $context): void
25+
{
26+
$context->getLogger()->debug('Consumption has ended');
27+
}
28+
29+
public function onMessageReceived(MessageReceived $context): void
30+
{
31+
$message = $context->getMessage();
32+
33+
$context->getLogger()->debug("Received from {queueName}\t{body}", [
34+
'queueName' => $context->getConsumer()->getQueue()->getQueueName(),
35+
'redelivered' => $message->isRedelivered(),
36+
'body' => Stringify::that($message->getBody()),
37+
'properties' => Stringify::that($message->getProperties()),
38+
'headers' => Stringify::that($message->getHeaders()),
39+
]);
40+
}
41+
42+
public function onPostMessageReceived(PostMessageReceived $context): void
43+
{
44+
$message = $context->getMessage();
45+
$queue = $context->getConsumer()->getQueue();
46+
$result = $context->getResult();
47+
48+
$reason = '';
49+
$logMessage = "Processed from {queueName}\t{body}\t{result}";
50+
if ($result instanceof Result && $result->getReason()) {
51+
$reason = $result->getReason();
52+
$logMessage .= ' {reason}';
53+
}
54+
$logContext = [
55+
'result' => str_replace('enqueue.', '', $result),
56+
'reason' => $reason,
57+
'queueName' => $queue->getQueueName(),
58+
'body' => Stringify::that($message->getBody()),
59+
'properties' => Stringify::that($message->getProperties()),
60+
'headers' => Stringify::that($message->getHeaders()),
61+
];
62+
63+
$logLevel = Result::REJECT == ((string) $result) ? LogLevel::ERROR : LogLevel::INFO;
64+
65+
$context->getLogger()->log($logLevel, $logMessage, $logContext);
66+
}
67+
}

pkg/enqueue/Consumption/QueueConsumer.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
220220
throw new \LogicException(sprintf('Status is not supported: %s', $result));
221221
}
222222

223-
$postMessageReceived = new PostMessageReceived($this->interopContext, $message, $result, $receivedAt, $this->logger);
223+
$postMessageReceived = new PostMessageReceived($this->interopContext, $consumer, $message, $result, $receivedAt, $this->logger);
224224
$extension->onPostMessageReceived($postMessageReceived);
225225

226226
if ($postMessageReceived->isExecutionInterrupted()) {

pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Enqueue\ConnectionFactoryFactory;
66
use Enqueue\ConnectionFactoryFactoryInterface;
77
use Enqueue\Consumption\ChainExtension;
8+
use Enqueue\Consumption\Extension\LogExtension;
89
use Enqueue\Consumption\QueueConsumer;
910
use Enqueue\Consumption\QueueConsumerInterface;
1011
use Enqueue\Resources;
@@ -166,8 +167,13 @@ public function buildQueueConsumer(ContainerBuilder $container, array $config):
166167

167168
$container->setParameter($this->format('receive_timeout'), $config['receive_timeout'] ?? 10000);
168169

170+
$logExtensionId = $this->format('log_extension');
171+
$container->register($logExtensionId, LogExtension::class);
172+
169173
$container->register($this->format('consumption_extensions'), ChainExtension::class)
170-
->addArgument([])
174+
->addArgument([
175+
new Reference($logExtensionId),
176+
])
171177
;
172178

173179
$container->register($this->format('queue_consumer'), QueueConsumer::class)

pkg/enqueue/Tests/Client/ConsumptionExtension/FlushSpoolProducerExtensionTest.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
use Enqueue\Consumption\EndExtensionInterface;
1010
use Enqueue\Consumption\PostMessageReceivedExtensionInterface;
1111
use Enqueue\Test\ClassExtensionTrait;
12+
use Interop\Queue\Consumer;
1213
use Interop\Queue\Context;
1314
use Interop\Queue\Message;
1415
use PHPUnit\Framework\TestCase;
@@ -57,6 +58,7 @@ public function testShouldFlushSpoolProducerOnPostReceived()
5758

5859
$context = new PostMessageReceived(
5960
$this->createInteropContextMock(),
61+
$this->createMock(Consumer::class),
6062
$this->createMock(Message::class),
6163
'aResult',
6264
1,

pkg/enqueue/Tests/Consumption/ChainExtensionTest.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ public function testShouldProxyOnPostReceiveToAllInternalExtensions()
197197
{
198198
$context = new PostMessageReceived(
199199
$this->createInteropContextMock(),
200+
$this->createMock(Consumer::class),
200201
$this->createMock(Message::class),
201202
'aResult',
202203
1,

pkg/enqueue/Tests/Consumption/Extension/LimitConsumedMessagesExtensionTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use Enqueue\Consumption\Context\PostMessageReceived;
66
use Enqueue\Consumption\Context\PreConsume;
77
use Enqueue\Consumption\Extension\LimitConsumedMessagesExtension;
8+
use Interop\Queue\Consumer;
89
use Interop\Queue\Context;
910
use Interop\Queue\Message;
1011
use Interop\Queue\SubscriptionConsumer;
@@ -49,6 +50,7 @@ public function testOnPreConsumeShouldInterruptWhenLimitIsReached()
4950

5051
$postReceivedMessage = new PostMessageReceived(
5152
$this->createInteropContextMock(),
53+
$this->createMock(Consumer::class),
5254
$this->createMock(Message::class),
5355
'aResult',
5456
1,
@@ -135,6 +137,7 @@ public function testOnPostReceivedShouldInterruptExecutionIfMessageLimitExceeded
135137

136138
$postReceivedMessage = new PostMessageReceived(
137139
$this->createInteropContextMock(),
140+
$this->createMock(Consumer::class),
138141
$this->createMock(Message::class),
139142
'aResult',
140143
1,

pkg/enqueue/Tests/Consumption/Extension/LimitConsumerMemoryExtensionTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Enqueue\Consumption\Context\PostMessageReceived;
77
use Enqueue\Consumption\Context\PreConsume;
88
use Enqueue\Consumption\Extension\LimitConsumerMemoryExtension;
9+
use Interop\Queue\Consumer;
910
use Interop\Queue\Context;
1011
use Interop\Queue\Message;
1112
use Interop\Queue\SubscriptionConsumer;
@@ -66,6 +67,7 @@ public function testOnPostReceivedShouldInterruptExecutionIfMemoryLimitReached()
6667

6768
$postReceivedMessage = new PostMessageReceived(
6869
$this->createInteropContextMock(),
70+
$this->createMock(Consumer::class),
6971
$this->createMock(Message::class),
7072
'aResult',
7173
1,
@@ -156,6 +158,7 @@ public function testOnPostMessageReceivedShouldNotInterruptExecutionIfMemoryLimi
156158
{
157159
$postReceivedMessage = new PostMessageReceived(
158160
$this->createInteropContextMock(),
161+
$this->createMock(Consumer::class),
159162
$this->createMock(Message::class),
160163
'aResult',
161164
1,

pkg/enqueue/Tests/Consumption/Extension/LimitConsumptionTimeExtensionTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Enqueue\Consumption\Context\PostMessageReceived;
77
use Enqueue\Consumption\Context\PreConsume;
88
use Enqueue\Consumption\Extension\LimitConsumptionTimeExtension;
9+
use Interop\Queue\Consumer;
910
use Interop\Queue\Context;
1011
use Interop\Queue\Message;
1112
use Interop\Queue\SubscriptionConsumer;
@@ -68,6 +69,7 @@ public function testOnPostReceivedShouldInterruptExecutionIfConsumptionTimeExcee
6869
{
6970
$postReceivedMessage = new PostMessageReceived(
7071
$this->createInteropContextMock(),
72+
$this->createMock(Consumer::class),
7173
$this->createMock(Message::class),
7274
'aResult',
7375
1,
@@ -133,6 +135,7 @@ public function testOnPostReceivedShouldNotInterruptExecutionIfConsumptionTimeIs
133135
{
134136
$postReceivedMessage = new PostMessageReceived(
135137
$this->createInteropContextMock(),
138+
$this->createMock(Consumer::class),
136139
$this->createMock(Message::class),
137140
'aResult',
138141
1,

0 commit comments

Comments
 (0)