Skip to content

Commit 5faac4f

Browse files
committed
[consumption] Introduce ProcessorException extension point. Split Extension Interface.
1 parent e74b9f1 commit 5faac4f

12 files changed

+290
-39
lines changed

pkg/enqueue/Consumption/ChainExtension.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Enqueue\Consumption\Context\MessageResult;
77
use Enqueue\Consumption\Context\PreConsume;
88
use Enqueue\Consumption\Context\PreSubscribe;
9+
use Enqueue\Consumption\Context\ProcessorException;
910
use Enqueue\Consumption\Context\Start;
1011

1112
class ChainExtension implements ExtensionInterface
@@ -63,6 +64,13 @@ public function onResult(MessageResult $context): void
6364
}
6465
}
6566

67+
public function onProcessorException(ProcessorException $context): void
68+
{
69+
foreach ($this->extensions as $extension) {
70+
$extension->onProcessorException($context);
71+
}
72+
}
73+
6674
public function onPostReceived(Context $context)
6775
{
6876
foreach ($this->extensions as $extension) {
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption\Context;
4+
5+
use Enqueue\Consumption\Result;
6+
use Interop\Queue\Context;
7+
use Interop\Queue\Message;
8+
use Psr\Log\LoggerInterface;
9+
10+
final class ProcessorException
11+
{
12+
/**
13+
* @var Context
14+
*/
15+
private $context;
16+
17+
/**
18+
* @var Message
19+
*/
20+
private $message;
21+
22+
/**
23+
* @var \Exception
24+
*/
25+
private $exception;
26+
27+
/**
28+
* @var Result|string|object|null
29+
*/
30+
private $result;
31+
32+
/**
33+
* @var int
34+
*/
35+
private $receivedAt;
36+
/**
37+
* @var LoggerInterface
38+
*/
39+
private $logger;
40+
41+
public function __construct(Context $context, Message $message, \Exception $exception, int $receivedAt, LoggerInterface $logger)
42+
{
43+
$this->context = $context;
44+
$this->message = $message;
45+
$this->exception = $exception;
46+
$this->logger = $logger;
47+
$this->receivedAt = $receivedAt;
48+
}
49+
50+
public function getContext(): Context
51+
{
52+
return $this->context;
53+
}
54+
55+
public function getMessage(): Message
56+
{
57+
return $this->message;
58+
}
59+
60+
public function getException(): \Exception
61+
{
62+
return $this->exception;
63+
}
64+
65+
public function getLogger(): LoggerInterface
66+
{
67+
return $this->logger;
68+
}
69+
70+
public function getReceivedAt(): int
71+
{
72+
return $this->receivedAt;
73+
}
74+
75+
public function getResult(): ?Result
76+
{
77+
return $this->result;
78+
}
79+
80+
public function setResult(Result $result): void
81+
{
82+
$this->result = $result;
83+
}
84+
}

pkg/enqueue/Consumption/EmptyExtensionTrait.php

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Enqueue\Consumption\Context\MessageResult;
77
use Enqueue\Consumption\Context\PreConsume;
88
use Enqueue\Consumption\Context\PreSubscribe;
9+
use Enqueue\Consumption\Context\ProcessorException;
910
use Enqueue\Consumption\Context\Start;
1011

1112
trait EmptyExtensionTrait
@@ -30,6 +31,10 @@ public function onResult(MessageResult $context): void
3031
{
3132
}
3233

34+
public function onProcessorException(ProcessorException $context): void
35+
{
36+
}
37+
3338
public function onPostReceived(Context $context)
3439
{
3540
}

pkg/enqueue/Consumption/ExtensionInterface.php

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -2,44 +2,8 @@
22

33
namespace Enqueue\Consumption;
44

5-
use Enqueue\Consumption\Context\MessageReceived;
6-
use Enqueue\Consumption\Context\MessageResult;
7-
use Enqueue\Consumption\Context\PreConsume;
8-
use Enqueue\Consumption\Context\PreSubscribe;
9-
use Enqueue\Consumption\Context\Start;
10-
11-
interface ExtensionInterface
5+
interface ExtensionInterface extends StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, MessageReceivedExtensionInterface, MessageResultExtensionInterface, ProcessorExceptionExtensionInterface
126
{
13-
/**
14-
* Executed only once at the very beginning of the QueueConsumer::consume method call.
15-
*/
16-
public function onStart(Start $context): void;
17-
18-
/**
19-
* The method is called for each BoundProcessor before calling SubscriptionConsumer::subscribe method.
20-
*/
21-
public function onPreSubscribe(PreSubscribe $context): void;
22-
23-
/**
24-
* Executed at every new cycle before calling SubscriptionConsumer::consume method.
25-
* The consumption could be interrupted at this step.
26-
*/
27-
public function onPreConsume(PreConsume $context): void;
28-
29-
/**
30-
* Executed as soon as a a message is received, before it is passed to a processor
31-
* The extension may set a result. If the result is set the processor is not called
32-
* The processor could be changed or decorated at this point.
33-
*/
34-
public function onMessageReceived(MessageReceived $context): void;
35-
36-
/**
37-
* Executed when a message is processed by a processor or a result was set in onMessageReceived extension method.
38-
* BEFORE the message status was sent to the broker
39-
* The result could be changed at this point.
40-
*/
41-
public function onResult(MessageResult $context): void;
42-
437
/**
448
* Executed when a message is processed by a processor.
459
* The context contains a status, which could not be changed.
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption;
4+
5+
use Enqueue\Consumption\Context\MessageReceived;
6+
7+
interface MessageReceivedExtensionInterface
8+
{
9+
/**
10+
* Executed as soon as a a message is received, before it is passed to a processor
11+
* The extension may set a result. If the result is set the processor is not called
12+
* The processor could be changed or decorated at this point.
13+
*/
14+
public function onMessageReceived(MessageReceived $context): void;
15+
}
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption;
4+
5+
use Enqueue\Consumption\Context\MessageResult;
6+
7+
interface MessageResultExtensionInterface
8+
{
9+
/**
10+
* Executed when a message is processed by a processor or a result was set in onMessageReceived extension method.
11+
* BEFORE the message status was sent to the broker
12+
* The result could be changed at this point.
13+
*/
14+
public function onResult(MessageResult $context): void;
15+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption;
4+
5+
use Enqueue\Consumption\Context\PreConsume;
6+
7+
interface PreConsumeExtensionInterface
8+
{
9+
/**
10+
* Executed at every new cycle before calling SubscriptionConsumer::consume method.
11+
* The consumption could be interrupted at this step.
12+
*/
13+
public function onPreConsume(PreConsume $context): void;
14+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption;
4+
5+
use Enqueue\Consumption\Context\PreSubscribe;
6+
7+
interface PreSubscribeExtensionInterface
8+
{
9+
/**
10+
* The method is called for each BoundProcessor before calling SubscriptionConsumer::subscribe method.
11+
*/
12+
public function onPreSubscribe(PreSubscribe $context): void;
13+
}
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption;
4+
5+
use Enqueue\Consumption\Context\ProcessorException;
6+
7+
interface ProcessorExceptionExtensionInterface
8+
{
9+
/**
10+
* Execute if a processor throws an exception.
11+
* The result could be set, if result is not set the exception is thrown again.
12+
*/
13+
public function onProcessorException(ProcessorException $context): void;
14+
}

pkg/enqueue/Consumption/QueueConsumer.php

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Enqueue\Consumption\Context\MessageResult;
77
use Enqueue\Consumption\Context\PreConsume;
88
use Enqueue\Consumption\Context\PreSubscribe;
9+
use Enqueue\Consumption\Context\ProcessorException;
910
use Enqueue\Consumption\Context\Start;
1011
use Enqueue\Consumption\Exception\ConsumptionInterruptedException;
1112
use Enqueue\Consumption\Exception\InvalidArgumentException;
@@ -241,8 +242,19 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
241242
$result = $messageReceived->getResult();
242243
$processor = $messageReceived->getProcessor();
243244
if (null === $result) {
244-
$result = $processor->process($message, $context->getInteropContext());
245-
$context->setResult($result);
245+
try {
246+
$result = $processor->process($message, $context->getInteropContext());
247+
248+
$context->setResult($result);
249+
} catch (\Exception $e) {
250+
$processorException = new ProcessorException($this->interopContext, $message, $e, $receivedAt, $this->logger);
251+
$this->extension->onProcessorException($processorException);
252+
253+
$result = $processorException->getResult();
254+
if (null === $result) {
255+
throw $e;
256+
}
257+
}
246258
}
247259

248260
$messageResult = new MessageResult($this->interopContext, $message, $result, $receivedAt, $this->logger);
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption;
4+
5+
use Enqueue\Consumption\Context\Start;
6+
7+
interface StartExtensionInterface
8+
{
9+
/**
10+
* Executed only once at the very beginning of the QueueConsumer::consume method call.
11+
*/
12+
public function onStart(Start $context): void;
13+
}

pkg/enqueue/Tests/Consumption/QueueConsumerTest.php

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Enqueue\Consumption\Context\MessageResult;
1111
use Enqueue\Consumption\Context\PreConsume;
1212
use Enqueue\Consumption\Context\PreSubscribe;
13+
use Enqueue\Consumption\Context\ProcessorException;
1314
use Enqueue\Consumption\Context\Start;
1415
use Enqueue\Consumption\Exception\InvalidArgumentException;
1516
use Enqueue\Consumption\ExtensionInterface;
@@ -838,6 +839,99 @@ public function testShouldCallOnResultExtensionMethodWithExpectedContext()
838839
$queueConsumer->consume();
839840
}
840841

842+
public function testShouldCallOnProcessorExceptionExtensionMethodWithExpectedContext()
843+
{
844+
$exception = new \LogicException('Exception exception');
845+
846+
$expectedMessage = $this->createMessageMock();
847+
848+
$subscriptionConsumerMock = new DummySubscriptionConsumer();
849+
$subscriptionConsumerMock->addMessage($expectedMessage, 'foo_queue');
850+
851+
$consumerStub = $this->createConsumerStub('foo_queue');
852+
853+
$contextStub = $this->createContextStub($consumerStub);
854+
855+
$processorMock = $this->createProcessorStub();
856+
$processorMock
857+
->expects($this->once())
858+
->method('process')
859+
->willThrowException($exception)
860+
;
861+
862+
$extension = $this->createExtension();
863+
$extension
864+
->expects($this->never())
865+
->method('onResult')
866+
;
867+
$extension
868+
->expects($this->once())
869+
->method('onProcessorException')
870+
->with($this->isInstanceOf(ProcessorException::class))
871+
->willReturnCallback(function (ProcessorException $context) use ($contextStub, $expectedMessage, $exception) {
872+
$this->assertSame($contextStub, $context->getContext());
873+
$this->assertSame($expectedMessage, $context->getMessage());
874+
$this->assertSame($exception, $context->getException());
875+
$this->assertGreaterThan(1, $context->getReceivedAt());
876+
$this->assertInstanceOf(NullLogger::class, $context->getLogger());
877+
$this->assertNull($context->getResult());
878+
})
879+
;
880+
881+
$chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]);
882+
$queueConsumer = new QueueConsumer($contextStub, $chainExtensions);
883+
$queueConsumer->setFallbackSubscriptionConsumer($subscriptionConsumerMock);
884+
$queueConsumer->bind(new NullQueue('foo_queue'), $processorMock);
885+
886+
$this->expectException(\LogicException::class);
887+
$this->expectExceptionMessage('Exception exception');
888+
$queueConsumer->consume();
889+
}
890+
891+
public function testShouldContinueConsumptionIfResultSetOnProcessorExceptionExtension()
892+
{
893+
$result = Result::ack();
894+
895+
$expectedMessage = $this->createMessageMock();
896+
897+
$subscriptionConsumerMock = new DummySubscriptionConsumer();
898+
$subscriptionConsumerMock->addMessage($expectedMessage, 'foo_queue');
899+
900+
$consumerStub = $this->createConsumerStub('foo_queue');
901+
902+
$contextStub = $this->createContextStub($consumerStub);
903+
904+
$processorMock = $this->createProcessorStub();
905+
$processorMock
906+
->expects($this->once())
907+
->method('process')
908+
->willThrowException(new \LogicException())
909+
;
910+
911+
$extension = $this->createExtension();
912+
$extension
913+
->expects($this->once())
914+
->method('onProcessorException')
915+
->willReturnCallback(function (ProcessorException $context) use ($result) {
916+
$context->setResult($result);
917+
})
918+
;
919+
$extension
920+
->expects($this->once())
921+
->method('onResult')
922+
->willReturnCallback(function (MessageResult $context) use ($result) {
923+
$this->assertSame($result, $context->getResult());
924+
})
925+
;
926+
927+
$chainExtensions = new ChainExtension([$extension, new BreakCycleExtension(1)]);
928+
$queueConsumer = new QueueConsumer($contextStub, $chainExtensions);
929+
$queueConsumer->setFallbackSubscriptionConsumer($subscriptionConsumerMock);
930+
$queueConsumer->bind(new NullQueue('foo_queue'), $processorMock);
931+
932+
$queueConsumer->consume();
933+
}
934+
841935
public function testShouldCallOnPostReceivedExtensionMethodWithExpectedContext()
842936
{
843937
$expectedMessage = $this->createMessageMock();

0 commit comments

Comments
 (0)