Skip to content

Commit 6783f04

Browse files
committed
[consumption] onIdle -> onPostConsume
1 parent 91abc46 commit 6783f04

14 files changed

+228
-102
lines changed

pkg/enqueue/Consumption/ChainExtension.php

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\Consumption\Context\MessageReceived;
66
use Enqueue\Consumption\Context\MessageResult;
7+
use Enqueue\Consumption\Context\PostConsume;
78
use Enqueue\Consumption\Context\PostMessageReceived;
89
use Enqueue\Consumption\Context\PreConsume;
910
use Enqueue\Consumption\Context\PreSubscribe;
@@ -79,10 +80,10 @@ public function onPostMessageReceived(PostMessageReceived $context): void
7980
}
8081
}
8182

82-
public function onIdle(Context $context)
83+
public function onPostConsume(PostConsume $context): void
8384
{
8485
foreach ($this->extensions as $extension) {
85-
$extension->onIdle($context);
86+
$extension->onPostConsume($context);
8687
}
8788
}
8889

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption\Context;
4+
5+
use Interop\Queue\Context;
6+
use Interop\Queue\SubscriptionConsumer;
7+
use Psr\Log\LoggerInterface;
8+
9+
final class PostConsume
10+
{
11+
/**
12+
* @var Context
13+
*/
14+
private $context;
15+
16+
/**
17+
* @var SubscriptionConsumer
18+
*/
19+
private $subscriptionConsumer;
20+
21+
/**
22+
* @var int
23+
*/
24+
private $receivedMessagesCount;
25+
26+
/**
27+
* @var int
28+
*/
29+
private $cycle;
30+
31+
/**
32+
* @var int
33+
*/
34+
private $startTime;
35+
36+
/**
37+
* @var LoggerInterface
38+
*/
39+
private $logger;
40+
41+
/**
42+
* @var bool
43+
*/
44+
private $executionInterrupted;
45+
46+
public function __construct(Context $context, SubscriptionConsumer $subscriptionConsumer, int $receivedMessagesCount, int $cycle, int $startTime, LoggerInterface $logger)
47+
{
48+
$this->context = $context;
49+
$this->subscriptionConsumer = $subscriptionConsumer;
50+
$this->receivedMessagesCount = $receivedMessagesCount;
51+
$this->cycle = $cycle;
52+
$this->startTime = $startTime;
53+
$this->logger = $logger;
54+
55+
$this->executionInterrupted = false;
56+
}
57+
58+
public function getContext(): Context
59+
{
60+
return $this->context;
61+
}
62+
63+
public function getSubscriptionConsumer(): SubscriptionConsumer
64+
{
65+
return $this->subscriptionConsumer;
66+
}
67+
68+
public function getReceivedMessagesCount(): int
69+
{
70+
return $this->receivedMessagesCount;
71+
}
72+
73+
public function getCycle(): int
74+
{
75+
return $this->cycle;
76+
}
77+
78+
public function getStartTime(): int
79+
{
80+
return $this->startTime;
81+
}
82+
83+
public function getLogger(): LoggerInterface
84+
{
85+
return $this->logger;
86+
}
87+
88+
public function isExecutionInterrupted(): bool
89+
{
90+
return $this->executionInterrupted;
91+
}
92+
93+
public function interruptExecution(): void
94+
{
95+
$this->executionInterrupted = true;
96+
}
97+
}

pkg/enqueue/Consumption/EmptyExtensionTrait.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\Consumption\Context\MessageReceived;
66
use Enqueue\Consumption\Context\MessageResult;
7+
use Enqueue\Consumption\Context\PostConsume;
78
use Enqueue\Consumption\Context\PostMessageReceived;
89
use Enqueue\Consumption\Context\PreConsume;
910
use Enqueue\Consumption\Context\PreSubscribe;
@@ -40,7 +41,7 @@ public function onProcessorException(ProcessorException $context): void
4041
{
4142
}
4243

43-
public function onIdle(Context $context)
44+
public function onPostConsume(PostConsume $context): void
4445
{
4546
}
4647

pkg/enqueue/Consumption/Extension/LimitConsumerMemoryExtension.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\Consumption\Extension;
44

5-
use Enqueue\Consumption\Context;
5+
use Enqueue\Consumption\Context\PostConsume;
66
use Enqueue\Consumption\Context\PostMessageReceived;
77
use Enqueue\Consumption\Context\PreConsume;
88
use Enqueue\Consumption\EmptyExtensionTrait;
@@ -47,10 +47,10 @@ public function onPostMessageReceived(PostMessageReceived $context): void
4747
}
4848
}
4949

50-
public function onIdle(Context $context)
50+
public function onPostConsume(PostConsume $context): void
5151
{
5252
if ($this->shouldBeStopped($context->getLogger())) {
53-
$context->setExecutionInterrupted(true);
53+
$context->interruptExecution();
5454
}
5555
}
5656

pkg/enqueue/Consumption/Extension/LimitConsumptionTimeExtension.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
namespace Enqueue\Consumption\Extension;
44

5-
use Enqueue\Consumption\Context;
5+
use Enqueue\Consumption\Context\PostConsume;
66
use Enqueue\Consumption\Context\PostMessageReceived;
77
use Enqueue\Consumption\Context\PreConsume;
88
use Enqueue\Consumption\EmptyExtensionTrait;
@@ -33,10 +33,10 @@ public function onPreConsume(PreConsume $context): void
3333
}
3434
}
3535

36-
public function onIdle(Context $context)
36+
public function onPostConsume(PostConsume $context): void
3737
{
3838
if ($this->shouldBeStopped($context->getLogger())) {
39-
$context->setExecutionInterrupted(true);
39+
$context->interruptExecution();
4040
}
4141
}
4242

pkg/enqueue/Consumption/Extension/SignalExtension.php

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22

33
namespace Enqueue\Consumption\Extension;
44

5-
use Enqueue\Consumption\Context;
65
use Enqueue\Consumption\Context\MessageReceived;
6+
use Enqueue\Consumption\Context\PostConsume;
77
use Enqueue\Consumption\Context\PostMessageReceived;
88
use Enqueue\Consumption\Context\PreConsume;
99
use Enqueue\Consumption\Context\Start;
@@ -62,10 +62,10 @@ public function onPostMessageReceived(PostMessageReceived $context): void
6262
}
6363
}
6464

65-
public function onIdle(Context $context)
65+
public function onPostConsume(PostConsume $context): void
6666
{
6767
if ($this->shouldBeStopped($context->getLogger())) {
68-
$context->setExecutionInterrupted(true);
68+
$context->interruptExecution();
6969
}
7070
}
7171

pkg/enqueue/Consumption/ExtensionInterface.php

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,8 @@
22

33
namespace Enqueue\Consumption;
44

5-
interface ExtensionInterface extends StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, MessageReceivedExtensionInterface, PostMessageReceivedExtensionInterface, MessageResultExtensionInterface, ProcessorExceptionExtensionInterface
5+
interface ExtensionInterface extends StartExtensionInterface, PreSubscribeExtensionInterface, PreConsumeExtensionInterface, MessageReceivedExtensionInterface, PostMessageReceivedExtensionInterface, MessageResultExtensionInterface, ProcessorExceptionExtensionInterface, PostConsumeExtensionInterface
66
{
7-
/**
8-
* Called each time at the end of the cycle if nothing was done.
9-
*
10-
* @param Context $context
11-
*/
12-
public function onIdle(Context $context);
13-
147
/**
158
* Called when the consumption was interrupted by an extension or exception
169
* In case of exception it will be present in the context.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption;
4+
5+
use Enqueue\Consumption\Context\PostConsume;
6+
7+
interface PostConsumeExtensionInterface
8+
{
9+
/**
10+
* The method is called after SubscriptionConsumer::consume method exits.
11+
* The consumption could be interrupted at this point.
12+
*/
13+
public function onPostConsume(PostConsume $context): void;
14+
}

pkg/enqueue/Consumption/QueueConsumer.php

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\Consumption\Context\MessageReceived;
66
use Enqueue\Consumption\Context\MessageResult;
7+
use Enqueue\Consumption\Context\PostConsume;
78
use Enqueue\Consumption\Context\PostMessageReceived;
89
use Enqueue\Consumption\Context\PreConsume;
910
use Enqueue\Consumption\Context\PreSubscribe;
@@ -196,7 +197,11 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
196197
$subscriptionConsumer = $this->fallbackSubscriptionConsumer;
197198
}
198199

199-
$callback = function (InteropMessage $message, Consumer $consumer) use (&$context) {
200+
$receivedMessagesCount = 0;
201+
202+
$callback = function (InteropMessage $message, Consumer $consumer) use (&$context, &$receivedMessagesCount) {
203+
++$receivedMessagesCount;
204+
200205
$receivedAt = (int) (microtime(true) * 1000);
201206
$queue = $consumer->getQueue();
202207
if (false == array_key_exists($queue->getQueueName(), $this->boundProcessors)) {
@@ -291,6 +296,8 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
291296
$cycle = 1;
292297
while (true) {
293298
try {
299+
$receivedMessagesCount = 0;
300+
294301
$preConsume = new PreConsume($this->interopContext, $subscriptionConsumer, $this->logger, $cycle, $this->receiveTimeout, $startTime);
295302
$this->extension->onPreConsume($preConsume);
296303

@@ -300,9 +307,10 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
300307

301308
$subscriptionConsumer->consume($this->receiveTimeout);
302309

303-
$this->extension->onIdle($context);
310+
$postConsume = new PostConsume($this->interopContext, $subscriptionConsumer, $receivedMessagesCount, $cycle, $startTime, $this->logger);
311+
$this->extension->onPostConsume($postConsume);
304312

305-
if ($context->isExecutionInterrupted()) {
313+
if ($postConsume->isExecutionInterrupted()) {
306314
throw new ConsumptionInterruptedException();
307315
}
308316
} catch (ConsumptionInterruptedException $e) {

pkg/enqueue/Tests/Consumption/ChainExtensionTest.php

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use Enqueue\Consumption\Context;
77
use Enqueue\Consumption\Context\MessageReceived;
88
use Enqueue\Consumption\Context\MessageResult;
9+
use Enqueue\Consumption\Context\PostConsume;
910
use Enqueue\Consumption\Context\PostMessageReceived;
1011
use Enqueue\Consumption\Context\PreConsume;
1112
use Enqueue\Consumption\Context\PreSubscribe;
@@ -36,7 +37,7 @@ public function testCouldBeConstructedWithExtensionsArray()
3637

3738
public function testShouldProxyOnStartToAllInternalExtensions()
3839
{
39-
$context = new Start($this->createInteropContextMock(), $this->createLoggerMock(), [], 0, 0, 0);
40+
$context = new Start($this->createInteropContextMock(), $this->createLoggerMock(), [], 0, 0);
4041

4142
$fooExtension = $this->createExtension();
4243
$fooExtension
@@ -196,26 +197,33 @@ public function testShouldProxyOnPostReceiveToAllInternalExtensions()
196197
$extensions->onPostMessageReceived($context);
197198
}
198199

199-
public function testShouldProxyOnIdleToAllInternalExtensions()
200+
public function testShouldProxyOnPostConsumeToAllInternalExtensions()
200201
{
201-
$context = $this->createContextMock();
202+
$postConsume = new PostConsume(
203+
$this->createInteropContextMock(),
204+
$this->createSubscriptionConsumerMock(),
205+
1,
206+
1,
207+
1,
208+
new NullLogger()
209+
);
202210

203211
$fooExtension = $this->createExtension();
204212
$fooExtension
205213
->expects($this->once())
206-
->method('onIdle')
207-
->with($this->identicalTo($context))
214+
->method('onPostConsume')
215+
->with($this->identicalTo($postConsume))
208216
;
209217
$barExtension = $this->createExtension();
210218
$barExtension
211219
->expects($this->once())
212-
->method('onIdle')
213-
->with($this->identicalTo($context))
220+
->method('onPostConsume')
221+
->with($this->identicalTo($postConsume))
214222
;
215223

216224
$extensions = new ChainExtension([$fooExtension, $barExtension]);
217225

218-
$extensions->onIdle($context);
226+
$extensions->onPostConsume($postConsume);
219227
}
220228

221229
public function testShouldProxyOnInterruptedToAllInternalExtensions()

0 commit comments

Comments
 (0)