Skip to content

Commit e74b9f1

Browse files
committed
[consumption] use custom MessageResutl context on onResult.
1 parent 83bd77d commit e74b9f1

15 files changed

+156
-104
lines changed

pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrineClearIdentityMapExtensionTest.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ protected function createContext(): MessageReceived
5353
$this->createMock(Consumer::class),
5454
$this->createMock(Message::class),
5555
$this->createMock(Processor::class),
56+
1,
5657
$this->createMock(LoggerInterface::class)
5758
);
5859
}

pkg/enqueue-bundle/Tests/Unit/Consumption/Extension/DoctrinePingConnectionExtensionTest.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ protected function createContext(): MessageReceived
154154
$this->createMock(Consumer::class),
155155
$this->createMock(Message::class),
156156
$this->createMock(Processor::class),
157+
1,
157158
$this->createMock(LoggerInterface::class)
158159
);
159160
}

pkg/enqueue/Consumption/ChainExtension.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Consumption;
44

55
use Enqueue\Consumption\Context\MessageReceived;
6+
use Enqueue\Consumption\Context\MessageResult;
67
use Enqueue\Consumption\Context\PreConsume;
78
use Enqueue\Consumption\Context\PreSubscribe;
89
use Enqueue\Consumption\Context\Start;
@@ -55,7 +56,7 @@ public function onMessageReceived(MessageReceived $context): void
5556
}
5657
}
5758

58-
public function onResult(Context $context)
59+
public function onResult(MessageResult $context): void
5960
{
6061
foreach ($this->extensions as $extension) {
6162
$extension->onResult($context);

pkg/enqueue/Consumption/Context.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -164,9 +164,9 @@ public function getResult()
164164
*/
165165
public function setResult($result)
166166
{
167-
if ($this->result) {
168-
throw new IllegalContextModificationException('The result modification is not allowed');
169-
}
167+
// if ($this->result) {
168+
// throw new IllegalContextModificationException('The result modification is not allowed');
169+
// }
170170

171171
$this->result = $result;
172172
}

pkg/enqueue/Consumption/Context/MessageReceived.php

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,17 +36,29 @@ final class MessageReceived
3636
*/
3737
private $logger;
3838

39+
/**
40+
* @var int
41+
*/
42+
private $receivedAt;
43+
3944
/**
4045
* @var Result|null
4146
*/
4247
private $result;
4348

44-
public function __construct(Context $context, Consumer $consumer, Message $message, Processor $processor, LoggerInterface $logger)
45-
{
49+
public function __construct(
50+
Context $context,
51+
Consumer $consumer,
52+
Message $message,
53+
Processor $processor,
54+
int $receivedAt,
55+
LoggerInterface $logger
56+
) {
4657
$this->context = $context;
4758
$this->consumer = $consumer;
4859
$this->message = $message;
4960
$this->processor = $processor;
61+
$this->receivedAt = $receivedAt;
5062
$this->logger = $logger;
5163
}
5264

@@ -80,6 +92,11 @@ public function getLogger(): LoggerInterface
8092
return $this->logger;
8193
}
8294

95+
public function getReceivedAt(): int
96+
{
97+
return $this->receivedAt;
98+
}
99+
83100
public function getResult(): ?Result
84101
{
85102
return $this->result;
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption\Context;
4+
5+
use Enqueue\Consumption\Result;
6+
use Interop\Queue\Context;
7+
use Interop\Queue\Message;
8+
use Psr\Log\LoggerInterface;
9+
10+
final class MessageResult
11+
{
12+
/**
13+
* @var Context
14+
*/
15+
private $context;
16+
17+
/**
18+
* @var Message
19+
*/
20+
private $message;
21+
22+
/**
23+
* @var Result|string|object|null
24+
*/
25+
private $result;
26+
27+
/**
28+
* @var int
29+
*/
30+
private $receivedAt;
31+
32+
/**
33+
* @var LoggerInterface
34+
*/
35+
private $logger;
36+
37+
public function __construct(Context $context, Message $message, $result, int $receivedAt, LoggerInterface $logger)
38+
{
39+
$this->context = $context;
40+
$this->message = $message;
41+
$this->logger = $logger;
42+
$this->result = $result;
43+
$this->receivedAt = $receivedAt;
44+
}
45+
46+
public function getContext(): Context
47+
{
48+
return $this->context;
49+
}
50+
51+
public function getMessage(): Message
52+
{
53+
return $this->message;
54+
}
55+
56+
public function getLogger(): LoggerInterface
57+
{
58+
return $this->logger;
59+
}
60+
61+
public function getReceivedAt(): int
62+
{
63+
return $this->receivedAt;
64+
}
65+
66+
/**
67+
* @return Result|null|object|string
68+
*/
69+
public function getResult()
70+
{
71+
return $this->result;
72+
}
73+
74+
/**
75+
* @param Result|string|object|null $result
76+
*/
77+
public function changeResult($result): void
78+
{
79+
$this->result = $result;
80+
}
81+
}

pkg/enqueue/Consumption/EmptyExtensionTrait.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Consumption;
44

55
use Enqueue\Consumption\Context\MessageReceived;
6+
use Enqueue\Consumption\Context\MessageResult;
67
use Enqueue\Consumption\Context\PreConsume;
78
use Enqueue\Consumption\Context\PreSubscribe;
89
use Enqueue\Consumption\Context\Start;
@@ -25,7 +26,7 @@ public function onMessageReceived(MessageReceived $context): void
2526
{
2627
}
2728

28-
public function onResult(Context $context)
29+
public function onResult(MessageResult $context): void
2930
{
3031
}
3132

pkg/enqueue/Consumption/ExtensionInterface.php

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Consumption;
44

55
use Enqueue\Consumption\Context\MessageReceived;
6+
use Enqueue\Consumption\Context\MessageResult;
67
use Enqueue\Consumption\Context\PreConsume;
78
use Enqueue\Consumption\Context\PreSubscribe;
89
use Enqueue\Consumption\Context\Start;
@@ -33,13 +34,11 @@ public function onPreConsume(PreConsume $context): void;
3334
public function onMessageReceived(MessageReceived $context): void;
3435

3536
/**
36-
* Executed when a message is processed by a processor or a result was set in onPreReceived method.
37-
* BUT before the message status was sent to the broker
38-
* The consumption could be interrupted at this step but it exits after the message is processed.
39-
*
40-
* @param Context $context
37+
* Executed when a message is processed by a processor or a result was set in onMessageReceived extension method.
38+
* BEFORE the message status was sent to the broker
39+
* The result could be changed at this point.
4140
*/
42-
public function onResult(Context $context);
41+
public function onResult(MessageResult $context): void;
4342

4443
/**
4544
* Executed when a message is processed by a processor.

pkg/enqueue/Consumption/QueueConsumer.php

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\Consumption;
44

55
use Enqueue\Consumption\Context\MessageReceived;
6+
use Enqueue\Consumption\Context\MessageResult;
67
use Enqueue\Consumption\Context\PreConsume;
78
use Enqueue\Consumption\Context\PreSubscribe;
89
use Enqueue\Consumption\Context\Start;
@@ -143,13 +144,14 @@ public function bindCallback($queue, callable $processor): QueueConsumerInterfac
143144
public function consume(ExtensionInterface $runtimeExtension = null): void
144145
{
145146
/*
146-
* onStart
147-
* onPreSubscribe
148-
* onPreConsume
149-
* onPostConsume
150-
* onReceived
147+
* onStart +
148+
* onPreSubscribe +
149+
* onPreConsume +
150+
* onMessageReceived +
151151
* onResult
152+
* onProcessorException
152153
* onPostReceived
154+
* onPostConsume
153155
* onEnd
154156
*/
155157

@@ -213,6 +215,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
213215
}
214216

215217
$callback = function (InteropMessage $message, Consumer $consumer) use (&$context) {
218+
$receivedAt = (int) (microtime(true) * 1000);
216219
$queue = $consumer->getQueue();
217220
if (false == array_key_exists($queue->getQueueName(), $this->boundProcessors)) {
218221
throw new \LogicException(sprintf('The processor for the queue "%s" could not be found.', $queue->getQueueName()));
@@ -233,7 +236,7 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
233236
$context->setProcessor($processor);
234237
$context->setInteropMessage($message);
235238

236-
$messageReceived = new MessageReceived($this->interopContext, $consumer, $message, $processor, $this->logger);
239+
$messageReceived = new MessageReceived($this->interopContext, $consumer, $message, $processor, $receivedAt, $this->logger);
237240
$this->extension->onMessageReceived($messageReceived);
238241
$result = $messageReceived->getResult();
239242
$processor = $messageReceived->getProcessor();
@@ -242,7 +245,12 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
242245
$context->setResult($result);
243246
}
244247

245-
$this->extension->onResult($context);
248+
$messageResult = new MessageResult($this->interopContext, $message, $result, $receivedAt, $this->logger);
249+
$this->extension->onResult($messageResult);
250+
$result = $messageResult->getResult();
251+
252+
//todo
253+
$context->setResult($result);
246254

247255
switch ($result) {
248256
case Result::ACK:

pkg/enqueue/Tests/Client/ConsumptionExtension/DelayRedeliveredMessageExtensionTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ public function testShouldSendDelayedMessageAndRejectOriginalMessage()
6969
$this->createConsumerStub($queue),
7070
$originMessage,
7171
$this->createProcessorMock(),
72+
1,
7273
$logger
7374
);
7475

@@ -103,6 +104,7 @@ public function testShouldDoNothingIfMessageIsNotRedelivered()
103104
$this->createConsumerStub(null),
104105
$message,
105106
$this->createProcessorMock(),
107+
1,
106108
new NullLogger()
107109
);
108110

@@ -128,6 +130,7 @@ public function testShouldDoNothingIfMessageIsRedeliveredButResultWasAlreadySetO
128130
$this->createConsumerStub(null),
129131
$message,
130132
$this->createProcessorMock(),
133+
1,
131134
new NullLogger()
132135
);
133136
$messageReceived->setResult(Result::ack());

pkg/enqueue/Tests/Client/ConsumptionExtension/ExclusiveCommandExtensionTest.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ public function testShouldDoNothingIfMessageHasTopicPropertySetOnPreReceive()
5353
$this->createConsumerStub(null),
5454
$message,
5555
$this->createProcessorMock(),
56+
1,
5657
new NullLogger()
5758
);
5859

@@ -77,6 +78,7 @@ public function testShouldDoNothingIfMessageHasCommandPropertySetOnPreReceive()
7778
$this->createConsumerStub(null),
7879
$message,
7980
$this->createProcessorMock(),
81+
1,
8082
new NullLogger()
8183
);
8284

@@ -107,6 +109,7 @@ public function testShouldDoNothingIfMessageHasProcessorPropertySetOnPreReceive(
107109
$this->createConsumerStub(null),
108110
$message,
109111
$this->createProcessorMock(),
112+
1,
110113
new NullLogger()
111114
);
112115

@@ -137,6 +140,7 @@ public function testShouldDoNothingIfCurrentQueueHasNoExclusiveProcessor()
137140
$this->createConsumerStub($queue),
138141
$message,
139142
$this->createProcessorMock(),
143+
1,
140144
new NullLogger()
141145
);
142146

@@ -159,6 +163,7 @@ public function testShouldSetCommandPropertiesIfCurrentQueueHasExclusiveCommandP
159163
$this->createConsumerStub($queue),
160164
$message,
161165
$this->createProcessorMock(),
166+
1,
162167
new NullLogger()
163168
);
164169

@@ -203,6 +208,7 @@ public function testShouldDoNothingIfAnotherQueue()
203208
$this->createConsumerStub($queue),
204209
$message,
205210
$this->createProcessorMock(),
211+
1,
206212
new NullLogger()
207213
);
208214

pkg/enqueue/Tests/Client/ConsumptionExtension/SetRouterPropertiesExtensionTest.php

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ public function testShouldSetRouterProcessorPropertyIfNotSetAndOnRouterQueue()
5555
$this->createConsumerStub(new NullQueue('test.router-queue')),
5656
$message,
5757
$this->createProcessorMock(),
58+
1,
5859
new NullLogger()
5960
);
6061

@@ -91,6 +92,7 @@ public function testShouldNotSetRouterProcessorPropertyIfNotSetAndNotOnRouterQue
9192
$this->createConsumerStub(new NullQueue('test.another-queue')),
9293
$message,
9394
$this->createProcessorMock(),
95+
1,
9496
new NullLogger()
9597
);
9698

@@ -116,6 +118,7 @@ public function testShouldNotSetAnyPropertyIfProcessorNamePropertyAlreadySet()
116118
$this->createConsumerStub(null),
117119
$message,
118120
$this->createProcessorMock(),
121+
1,
119122
new NullLogger()
120123
);
121124

0 commit comments

Comments
 (0)