Skip to content

Commit 2ec051d

Browse files
committed
[consumption] post message received
1 parent c2e5d02 commit 2ec051d

23 files changed

+363
-168
lines changed

docs/bundle/consumption_extension.md

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,18 +10,15 @@ namespace AppBundle\Enqueue;
1010

1111
use Enqueue\Consumption\ExtensionInterface;
1212
use Enqueue\Consumption\EmptyExtensionTrait;
13-
use Enqueue\Consumption\Context;
13+
use Enqueue\Consumption\Context\PostMessageReceived;
1414

1515
class CountProcessedMessagesExtension implements ExtensionInterface
1616
{
1717
use EmptyExtensionTrait;
1818

1919
private $processedMessages = 0;
2020

21-
/**
22-
* {@inheritdoc}
23-
*/
24-
public function onPostReceived(Context $context)
21+
public function onPostMessageReceived(PostMessageReceived $context): void
2522
{
2623
$this->processedMessages += 1;
2724
}

pkg/enqueue/Client/ConsumptionExtension/FlushSpoolProducerExtension.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\Client\SpoolProducer;
66
use Enqueue\Consumption\Context;
7+
use Enqueue\Consumption\Context\PostMessageReceived;
78
use Enqueue\Consumption\EmptyExtensionTrait;
89
use Enqueue\Consumption\ExtensionInterface;
910

@@ -24,10 +25,7 @@ public function __construct(SpoolProducer $producer)
2425
$this->producer = $producer;
2526
}
2627

27-
/**
28-
* {@inheritdoc}
29-
*/
30-
public function onPostReceived(Context $context)
28+
public function onPostMessageReceived(PostMessageReceived $context): void
3129
{
3230
$this->producer->flush();
3331
}

pkg/enqueue/Consumption/ChainExtension.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\Consumption\Context\MessageReceived;
66
use Enqueue\Consumption\Context\MessageResult;
7+
use Enqueue\Consumption\Context\PostMessageReceived;
78
use Enqueue\Consumption\Context\PreConsume;
89
use Enqueue\Consumption\Context\PreSubscribe;
910
use Enqueue\Consumption\Context\ProcessorException;
@@ -71,10 +72,10 @@ public function onProcessorException(ProcessorException $context): void
7172
}
7273
}
7374

74-
public function onPostReceived(Context $context)
75+
public function onPostMessageReceived(PostMessageReceived $context): void
7576
{
7677
foreach ($this->extensions as $extension) {
77-
$extension->onPostReceived($context);
78+
$extension->onPostMessageReceived($context);
7879
}
7980
}
8081

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption\Context;
4+
5+
use Enqueue\Consumption\Result;
6+
use Interop\Queue\Context;
7+
use Interop\Queue\Message;
8+
use Psr\Log\LoggerInterface;
9+
10+
final class PostMessageReceived
11+
{
12+
/**
13+
* @var Context
14+
*/
15+
private $context;
16+
17+
/**
18+
* @var Message
19+
*/
20+
private $message;
21+
22+
/**
23+
* @var LoggerInterface
24+
*/
25+
private $logger;
26+
27+
/**
28+
* @var int
29+
*/
30+
private $receivedAt;
31+
32+
/**
33+
* @var Result|string|object|null
34+
*/
35+
private $result;
36+
37+
/**
38+
* @var bool
39+
*/
40+
private $executionInterrupted;
41+
42+
public function __construct(
43+
Context $context,
44+
Message $message,
45+
$result,
46+
int $receivedAt,
47+
LoggerInterface $logger
48+
) {
49+
$this->context = $context;
50+
$this->message = $message;
51+
$this->result = $result;
52+
$this->receivedAt = $receivedAt;
53+
$this->logger = $logger;
54+
55+
$this->executionInterrupted = false;
56+
}
57+
58+
public function getContext(): Context
59+
{
60+
return $this->context;
61+
}
62+
63+
public function getMessage(): Message
64+
{
65+
return $this->message;
66+
}
67+
68+
public function getLogger(): LoggerInterface
69+
{
70+
return $this->logger;
71+
}
72+
73+
public function getReceivedAt(): int
74+
{
75+
return $this->receivedAt;
76+
}
77+
78+
/**
79+
* @return Result|null|object|string
80+
*/
81+
public function getResult()
82+
{
83+
return $this->result;
84+
}
85+
86+
public function isExecutionInterrupted(): bool
87+
{
88+
return $this->executionInterrupted;
89+
}
90+
91+
public function interruptExecution(): void
92+
{
93+
$this->executionInterrupted = true;
94+
}
95+
}

pkg/enqueue/Consumption/EmptyExtensionTrait.php

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\Consumption\Context\MessageReceived;
66
use Enqueue\Consumption\Context\MessageResult;
7+
use Enqueue\Consumption\Context\PostMessageReceived;
78
use Enqueue\Consumption\Context\PreConsume;
89
use Enqueue\Consumption\Context\PreSubscribe;
910
use Enqueue\Consumption\Context\ProcessorException;
@@ -27,15 +28,15 @@ public function onMessageReceived(MessageReceived $context): void
2728
{
2829
}
2930

30-
public function onResult(MessageResult $context): void
31+
public function onPostMessageReceived(PostMessageReceived $context): void
3132
{
3233
}
3334

34-
public function onProcessorException(ProcessorException $context): void
35+
public function onResult(MessageResult $context): void
3536
{
3637
}
3738

38-
public function onPostReceived(Context $context)
39+
public function onProcessorException(ProcessorException $context): void
3940
{
4041
}
4142

pkg/enqueue/Consumption/Extension/LimitConsumedMessagesExtension.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\Consumption\Extension;
44

5-
use Enqueue\Consumption\Context;
5+
use Enqueue\Consumption\Context\PostMessageReceived;
66
use Enqueue\Consumption\Context\PreConsume;
77
use Enqueue\Consumption\EmptyExtensionTrait;
88
use Enqueue\Consumption\ExtensionInterface;
@@ -39,12 +39,12 @@ public function onPreConsume(PreConsume $context): void
3939
}
4040
}
4141

42-
public function onPostReceived(Context $context)
42+
public function onPostMessageReceived(PostMessageReceived $context): void
4343
{
4444
++$this->messageConsumed;
4545

4646
if ($this->shouldBeStopped($context->getLogger())) {
47-
$context->setExecutionInterrupted(true);
47+
$context->interruptExecution();
4848
}
4949
}
5050

pkg/enqueue/Consumption/Extension/LimitConsumerMemoryExtension.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Consumption\Extension;
44

55
use Enqueue\Consumption\Context;
6+
use Enqueue\Consumption\Context\PostMessageReceived;
67
use Enqueue\Consumption\Context\PreConsume;
78
use Enqueue\Consumption\EmptyExtensionTrait;
89
use Enqueue\Consumption\ExtensionInterface;
@@ -39,10 +40,10 @@ public function onPreConsume(PreConsume $context): void
3940
}
4041
}
4142

42-
public function onPostReceived(Context $context)
43+
public function onPostMessageReceived(PostMessageReceived $context): void
4344
{
4445
if ($this->shouldBeStopped($context->getLogger())) {
45-
$context->setExecutionInterrupted(true);
46+
$context->interruptExecution();
4647
}
4748
}
4849

pkg/enqueue/Consumption/Extension/LimitConsumptionTimeExtension.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Consumption\Extension;
44

55
use Enqueue\Consumption\Context;
6+
use Enqueue\Consumption\Context\PostMessageReceived;
67
use Enqueue\Consumption\Context\PreConsume;
78
use Enqueue\Consumption\EmptyExtensionTrait;
89
use Enqueue\Consumption\ExtensionInterface;
@@ -39,10 +40,10 @@ public function onIdle(Context $context)
3940
}
4041
}
4142

42-
public function onPostReceived(Context $context)
43+
public function onPostMessageReceived(PostMessageReceived $context): void
4344
{
4445
if ($this->shouldBeStopped($context->getLogger())) {
45-
$context->setExecutionInterrupted(true);
46+
$context->interruptExecution();
4647
}
4748
}
4849

pkg/enqueue/Consumption/Extension/LoggerExtension.php

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\Consumption\Extension;
44

5-
use Enqueue\Consumption\Context;
5+
use Enqueue\Consumption\Context\PostMessageReceived;
66
use Enqueue\Consumption\Context\Start;
77
use Enqueue\Consumption\EmptyExtensionTrait;
88
use Enqueue\Consumption\ExtensionInterface;
@@ -42,10 +42,7 @@ public function onStart(Start $context): void
4242
}
4343
}
4444

45-
/**
46-
* {@inheritdoc}
47-
*/
48-
public function onPostReceived(Context $context)
45+
public function onPostMessageReceived(PostMessageReceived $context): void
4946
{
5047
if (false == $context->getResult() instanceof Result) {
5148
return;
@@ -58,13 +55,13 @@ public function onPostReceived(Context $context)
5855
case Result::REJECT:
5956
case Result::REQUEUE:
6057
if ($result->getReason()) {
61-
$this->logger->error($result->getReason(), $this->messageToLogContext($context->getInteropMessage()));
58+
$this->logger->error($result->getReason(), $this->messageToLogContext($context->getMessage()));
6259
}
6360

6461
break;
6562
case Result::ACK:
6663
if ($result->getReason()) {
67-
$this->logger->info($result->getReason(), $this->messageToLogContext($context->getInteropMessage()));
64+
$this->logger->info($result->getReason(), $this->messageToLogContext($context->getMessage()));
6865
}
6966

7067
break;

pkg/enqueue/Consumption/Extension/ReplyExtension.php

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\Consumption\Extension;
44

5-
use Enqueue\Consumption\Context;
5+
use Enqueue\Consumption\Context\PostMessageReceived;
66
use Enqueue\Consumption\EmptyExtensionTrait;
77
use Enqueue\Consumption\ExtensionInterface;
88
use Enqueue\Consumption\Result;
@@ -11,12 +11,9 @@ class ReplyExtension implements ExtensionInterface
1111
{
1212
use EmptyExtensionTrait;
1313

14-
/**
15-
* {@inheritdoc}
16-
*/
17-
public function onPostReceived(Context $context)
14+
public function onPostMessageReceived(PostMessageReceived $context): void
1815
{
19-
$replyTo = $context->getInteropMessage()->getReplyTo();
16+
$replyTo = $context->getMessage()->getReplyTo();
2017
if (false == $replyTo) {
2118
return;
2219
}
@@ -31,13 +28,13 @@ public function onPostReceived(Context $context)
3128
return;
3229
}
3330

34-
$correlationId = $context->getInteropMessage()->getCorrelationId();
31+
$correlationId = $context->getMessage()->getCorrelationId();
3532
$replyMessage = clone $result->getReply();
3633
$replyMessage->setCorrelationId($correlationId);
3734

38-
$replyQueue = $context->getInteropContext()->createQueue($replyTo);
35+
$replyQueue = $context->getContext()->createQueue($replyTo);
3936

4037
$context->getLogger()->debug(sprintf('[ReplyExtension] Send reply to "%s"', $replyTo));
41-
$context->getInteropContext()->createProducer()->send($replyQueue, $replyMessage);
38+
$context->getContext()->createProducer()->send($replyQueue, $replyMessage);
4239
}
4340
}

pkg/enqueue/Consumption/Extension/SignalExtension.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\Consumption\Context;
66
use Enqueue\Consumption\Context\MessageReceived;
7+
use Enqueue\Consumption\Context\PostMessageReceived;
78
use Enqueue\Consumption\Context\PreConsume;
89
use Enqueue\Consumption\Context\Start;
910
use Enqueue\Consumption\EmptyExtensionTrait;
@@ -54,10 +55,10 @@ public function onMessageReceived(MessageReceived $context): void
5455
{
5556
}
5657

57-
public function onPostReceived(Context $context)
58+
public function onPostMessageReceived(PostMessageReceived $context): void
5859
{
5960
if ($this->shouldBeStopped($context->getLogger())) {
60-
$context->setExecutionInterrupted(true);
61+
$context->interruptExecution();
6162
}
6263
}
6364

pkg/enqueue/Consumption/ExtensionInterface.php

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,8 @@
22

33
namespace Enqueue\Consumption;
44

5-
interface ExtensionInterface extends StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, MessageReceivedExtensionInterface, MessageResultExtensionInterface, ProcessorExceptionExtensionInterface
5+
interface ExtensionInterface extends StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, MessageReceivedExtensionInterface, PostMessageReceivedExtensionInterface, MessageResultExtensionInterface, ProcessorExceptionExtensionInterface
66
{
7-
/**
8-
* Executed when a message is processed by a processor.
9-
* The context contains a status, which could not be changed.
10-
* The consumption could be interrupted at this step but it exits after the message is processed.
11-
*
12-
* @param Context $context
13-
*/
14-
public function onPostReceived(Context $context);
15-
167
/**
178
* Called each time at the end of the cycle if nothing was done.
189
*
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption;
4+
5+
use Enqueue\Consumption\Context\PostMessageReceived;
6+
7+
interface PostMessageReceivedExtensionInterface
8+
{
9+
/**
10+
* Executed at the very end of consumption callback. The message has already been acknowledged.
11+
* The message result could not be changed.
12+
* The consumption could be interrupted at this point.
13+
*/
14+
public function onPostMessageReceived(PostMessageReceived $context): void;
15+
}

0 commit comments

Comments
 (0)