Skip to content

Commit 754cd77

Browse files
committed
[consumption] Add preSubscribe extension point.
1 parent 52c4cb9 commit 754cd77

File tree

8 files changed

+203
-40
lines changed

8 files changed

+203
-40
lines changed

pkg/enqueue/Consumption/ChainExtension.php

Lines changed: 8 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Consumption;
44

5+
use Enqueue\Consumption\Context\PreSubscribe;
56
use Enqueue\Consumption\Context\Start;
67

78
class ChainExtension implements ExtensionInterface
@@ -31,59 +32,48 @@ public function onStart(Start $context): void
3132
}
3233
}
3334

34-
/**
35-
* @param Context $context
36-
*/
35+
public function preSubscribe(PreSubscribe $context): void
36+
{
37+
foreach ($this->extensions as $extension) {
38+
$extension->preSubscribe($context);
39+
}
40+
}
41+
3742
public function onBeforeReceive(Context $context)
3843
{
3944
foreach ($this->extensions as $extension) {
4045
$extension->onBeforeReceive($context);
4146
}
4247
}
4348

44-
/**
45-
* @param Context $context
46-
*/
4749
public function onPreReceived(Context $context)
4850
{
4951
foreach ($this->extensions as $extension) {
5052
$extension->onPreReceived($context);
5153
}
5254
}
5355

54-
/**
55-
* @param Context $context
56-
*/
5756
public function onResult(Context $context)
5857
{
5958
foreach ($this->extensions as $extension) {
6059
$extension->onResult($context);
6160
}
6261
}
6362

64-
/**
65-
* @param Context $context
66-
*/
6763
public function onPostReceived(Context $context)
6864
{
6965
foreach ($this->extensions as $extension) {
7066
$extension->onPostReceived($context);
7167
}
7268
}
7369

74-
/**
75-
* @param Context $context
76-
*/
7770
public function onIdle(Context $context)
7871
{
7972
foreach ($this->extensions as $extension) {
8073
$extension->onIdle($context);
8174
}
8275
}
8376

84-
/**
85-
* @param Context $context
86-
*/
8777
public function onInterrupted(Context $context)
8878
{
8979
foreach ($this->extensions as $extension) {
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace Enqueue\Consumption\Context;
4+
5+
use Interop\Queue\Consumer;
6+
use Interop\Queue\Context;
7+
use Interop\Queue\Processor;
8+
use Psr\Log\LoggerInterface;
9+
10+
class PreSubscribe
11+
{
12+
/**
13+
* @var Context
14+
*/
15+
private $context;
16+
17+
/**
18+
* @var Processor
19+
*/
20+
private $processor;
21+
22+
/**
23+
* @var Consumer
24+
*/
25+
private $consumer;
26+
27+
/**
28+
* @var LoggerInterface
29+
*/
30+
private $logger;
31+
32+
public function __construct(Context $context, Processor $processor, Consumer $consumer, LoggerInterface $logger)
33+
{
34+
$this->context = $context;
35+
$this->processor = $processor;
36+
$this->consumer = $consumer;
37+
$this->logger = $logger;
38+
}
39+
40+
public function getContext(): Context
41+
{
42+
return $this->context;
43+
}
44+
45+
public function getProcessor(): Processor
46+
{
47+
return $this->processor;
48+
}
49+
50+
public function getConsumer(): Consumer
51+
{
52+
return $this->consumer;
53+
}
54+
55+
public function getLogger(): LoggerInterface
56+
{
57+
return $this->logger;
58+
}
59+
}

pkg/enqueue/Consumption/Context/Start.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public function __construct(Context $context, LoggerInterface $logger, array $pr
5858
$this->executionInterrupted = false;
5959
}
6060

61-
public function getInteropContext(): Context
61+
public function getContext(): Context
6262
{
6363
return $this->context;
6464
}

pkg/enqueue/Consumption/EmptyExtensionTrait.php

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Consumption;
44

5+
use Enqueue\Consumption\Context\PreSubscribe;
56
use Enqueue\Consumption\Context\Start;
67

78
trait EmptyExtensionTrait
@@ -10,44 +11,30 @@ public function onStart(Start $context): void
1011
{
1112
}
1213

13-
/**
14-
* @param Context $context
15-
*/
14+
public function preSubscribe(PreSubscribe $preSubscribe): void
15+
{
16+
}
17+
1618
public function onBeforeReceive(Context $context)
1719
{
1820
}
1921

20-
/**
21-
* @param Context $context
22-
*/
2322
public function onPreReceived(Context $context)
2423
{
2524
}
2625

27-
/**
28-
* @param Context $context
29-
*/
3026
public function onResult(Context $context)
3127
{
3228
}
3329

34-
/**
35-
* @param Context $context
36-
*/
3730
public function onPostReceived(Context $context)
3831
{
3932
}
4033

41-
/**
42-
* @param Context $context
43-
*/
4434
public function onIdle(Context $context)
4535
{
4636
}
4737

48-
/**
49-
* @param Context $context
50-
*/
5138
public function onInterrupted(Context $context)
5239
{
5340
}

pkg/enqueue/Consumption/ExtensionInterface.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Consumption;
44

5+
use Enqueue\Consumption\Context\PreSubscribe;
56
use Enqueue\Consumption\Context\Start;
67

78
interface ExtensionInterface
@@ -12,6 +13,11 @@ interface ExtensionInterface
1213
*/
1314
public function onStart(Start $context): void;
1415

16+
/**
17+
* The method is called for each BoundProcessor before calling SubscriptionConsumer::subscribe method.
18+
*/
19+
public function preSubscribe(PreSubscribe $context): void;
20+
1521
/**
1622
* Executed at every new cycle before we asked a broker for a new message.
1723
* At this stage the context already contains processor, consumer and queue.

pkg/enqueue/Consumption/QueueConsumer.php

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace Enqueue\Consumption;
44

5+
use Enqueue\Consumption\Context\PreSubscribe;
56
use Enqueue\Consumption\Context\Start;
67
use Enqueue\Consumption\Exception\ConsumptionInterruptedException;
78
use Enqueue\Consumption\Exception\InvalidArgumentException;
@@ -168,6 +169,10 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
168169

169170
$this->extension->onStart($start);
170171

172+
// todo
173+
if ($start->isExecutionInterrupted()) {
174+
}
175+
171176
$this->logger = $start->getLogger();
172177
$this->idleTime = $start->getIdleTime();
173178
$this->receiveTimeout = $start->getReceiveTimeout();
@@ -182,7 +187,16 @@ public function consume(ExtensionInterface $runtimeExtension = null): void
182187
foreach ($this->boundProcessors as $boundProcessor) {
183188
$queue = $boundProcessor->getQueue();
184189

185-
$consumers[$queue->getQueueName()] = $this->interopContext->createConsumer($queue);
190+
$preSubscribe = new PreSubscribe(
191+
$this->interopContext,
192+
$boundProcessor->getProcessor(),
193+
$this->interopContext->createConsumer($queue),
194+
$this->logger
195+
);
196+
197+
$this->extension->preSubscribe($preSubscribe);
198+
199+
$consumers[$queue->getQueueName()] = $preSubscribe->getConsumer();
186200
}
187201

188202
// todo remove

pkg/enqueue/Tests/Consumption/ChainExtensionTest.php

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use Enqueue\Consumption\ChainExtension;
66
use Enqueue\Consumption\Context;
7+
use Enqueue\Consumption\Context\PreSubscribe;
78
use Enqueue\Consumption\Context\Start;
89
use Enqueue\Consumption\ExtensionInterface;
910
use Enqueue\Test\ClassExtensionTrait;
@@ -26,7 +27,7 @@ public function testCouldBeConstructedWithExtensionsArray()
2627

2728
public function testShouldProxyOnStartToAllInternalExtensions()
2829
{
29-
$context = new Start($this->createMock(\Interop\Queue\Context::class), $this->createLoggerMock(), [], 0, 0, 0);
30+
$context = new Start($this->createInteropContextMock(), $this->createLoggerMock(), [], 0, 0, 0);
3031

3132
$fooExtension = $this->createExtension();
3233
$fooExtension
@@ -46,6 +47,33 @@ public function testShouldProxyOnStartToAllInternalExtensions()
4647
$extensions->onStart($context);
4748
}
4849

50+
public function testShouldProxyPreSubscribeToAllInternalExtensions()
51+
{
52+
$context = new PreSubscribe(
53+
$this->createInteropContextMock(),
54+
$this->createInteropProcessorMock(),
55+
$this->createInteropConsumerMock(),
56+
$this->createLoggerMock()
57+
);
58+
59+
$fooExtension = $this->createExtension();
60+
$fooExtension
61+
->expects($this->once())
62+
->method('preSubscribe')
63+
->with($this->identicalTo($context))
64+
;
65+
$barExtension = $this->createExtension();
66+
$barExtension
67+
->expects($this->once())
68+
->method('preSubscribe')
69+
->with($this->identicalTo($context))
70+
;
71+
72+
$extensions = new ChainExtension([$fooExtension, $barExtension]);
73+
74+
$extensions->preSubscribe($context);
75+
}
76+
4977
public function testShouldProxyOnBeforeReceiveToAllInternalExtensions()
5078
{
5179
$context = $this->createContextMock();
@@ -186,6 +214,30 @@ protected function createLoggerMock(): LoggerInterface
186214
return $this->createMock(LoggerInterface::class);
187215
}
188216

217+
/**
218+
* @return \PHPUnit_Framework_MockObject_MockObject
219+
*/
220+
protected function createInteropContextMock(): \Interop\Queue\Context
221+
{
222+
return $this->createMock(\Interop\Queue\Context::class);
223+
}
224+
225+
/**
226+
* @return \PHPUnit_Framework_MockObject_MockObject
227+
*/
228+
protected function createInteropConsumerMock(): \Interop\Queue\Consumer
229+
{
230+
return $this->createMock(\Interop\Queue\Consumer::class);
231+
}
232+
233+
/**
234+
* @return \PHPUnit_Framework_MockObject_MockObject
235+
*/
236+
protected function createInteropProcessorMock(): \Interop\Queue\Processor
237+
{
238+
return $this->createMock(\Interop\Queue\Processor::class);
239+
}
240+
189241
/**
190242
* @return \PHPUnit_Framework_MockObject_MockObject|Context
191243
*/

0 commit comments

Comments
 (0)