Skip to content

Commit bc571c2

Browse files
committed
Make build processor registyry pass support multi transports.
1 parent 48b169c commit bc571c2

File tree

8 files changed

+176
-100
lines changed

8 files changed

+176
-100
lines changed

pkg/enqueue-bundle/EnqueueBundle.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ class EnqueueBundle extends Bundle
2323
public function build(ContainerBuilder $container): void
2424
{
2525
//transport passes
26-
$container->addCompilerPass(new BuildConsumptionExtensionsPass('default'));
27-
$container->addCompilerPass(new BuildProcessorRegistryPass('default'));
26+
$container->addCompilerPass(new BuildConsumptionExtensionsPass());
27+
$container->addCompilerPass(new BuildProcessorRegistryPass());
2828

2929
//client passes
3030
$container->addCompilerPass(new BuildClientConsumptionExtensionsPass('default'));

pkg/enqueue/Symfony/Client/DependencyInjection/ClientFactory.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
use Enqueue\Consumption\QueueConsumer;
2222
use Enqueue\Rpc\RpcFactory;
2323
use Enqueue\Symfony\ContainerProcessorRegistry;
24-
use Enqueue\Symfony\DependencyInjection\FormatClientNameTrait;
2524
use Interop\Queue\Context;
2625
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
2726
use Symfony\Component\DependencyInjection\ContainerBuilder;

pkg/enqueue/Symfony/DependencyInjection/FormatClientNameTrait.php renamed to pkg/enqueue/Symfony/Client/DependencyInjection/FormatClientNameTrait.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<?php
22

3-
namespace Enqueue\Symfony\DependencyInjection;
3+
namespace Enqueue\Symfony\Client\DependencyInjection;
44

55
use Symfony\Component\DependencyInjection\ContainerInterface;
66
use Symfony\Component\DependencyInjection\Reference;

pkg/enqueue/Symfony/DependencyInjection/BuildConsumptionExtensionsPass.php

Lines changed: 40 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -8,55 +8,60 @@
88

99
final class BuildConsumptionExtensionsPass implements CompilerPassInterface
1010
{
11-
/**
12-
* @var string
13-
*/
14-
private $name;
11+
use FormatTransportNameTrait;
1512

16-
public function __construct(string $transportName)
17-
{
18-
if (empty($transportName)) {
19-
throw new \InvalidArgumentException('The name could not be empty.');
20-
}
21-
22-
$this->name = $transportName;
23-
}
13+
protected $name;
2414

2515
public function process(ContainerBuilder $container): void
2616
{
27-
$extensionsId = sprintf('enqueue.transport.%s.consumption_extensions', $this->name);
28-
if (false == $container->hasDefinition($extensionsId)) {
29-
return;
17+
if (false == $container->hasParameter('enqueue.transports')) {
18+
throw new \LogicException('The "enqueue.transports" parameter must be set.');
3019
}
3120

32-
$tags = $container->findTaggedServiceIds('enqueue.transport.consumption_extension');
21+
$names = $container->getParameter('enqueue.transports');
3322

34-
$groupByPriority = [];
35-
foreach ($tags as $serviceId => $tagAttributes) {
36-
foreach ($tagAttributes as $tagAttribute) {
37-
$transport = $tagAttribute['transport'] ?? 'default';
23+
foreach ($names as $name) {
24+
$this->name = $name;
3825

39-
if ($transport !== $this->name && 'all' !== $transport) {
40-
continue;
41-
}
26+
$extensionsId = $this->format('consumption_extensions');
27+
if (false == $container->hasDefinition($extensionsId)) {
28+
throw new \LogicException(sprintf('Service "%s" not found', $extensionsId));
29+
}
30+
31+
$tags = $container->findTaggedServiceIds('enqueue.transport.consumption_extension');
32+
33+
$groupByPriority = [];
34+
foreach ($tags as $serviceId => $tagAttributes) {
35+
foreach ($tagAttributes as $tagAttribute) {
36+
$transport = $tagAttribute['transport'] ?? 'default';
4237

43-
$priority = (int) ($tagAttribute['priority'] ?? 0);
38+
if ($transport !== $this->name && 'all' !== $transport) {
39+
continue;
40+
}
4441

45-
$groupByPriority[$priority][] = new Reference($serviceId);
42+
$priority = (int) ($tagAttribute['priority'] ?? 0);
43+
44+
$groupByPriority[$priority][] = new Reference($serviceId);
45+
}
4646
}
47-
}
4847

49-
krsort($groupByPriority, SORT_NUMERIC);
48+
krsort($groupByPriority, SORT_NUMERIC);
5049

51-
$flatExtensions = [];
52-
foreach ($groupByPriority as $extension) {
53-
$flatExtensions = array_merge($flatExtensions, $extension);
50+
$flatExtensions = [];
51+
foreach ($groupByPriority as $extension) {
52+
$flatExtensions = array_merge($flatExtensions, $extension);
53+
}
54+
55+
$extensionsService = $container->getDefinition($extensionsId);
56+
$extensionsService->replaceArgument(0, array_merge(
57+
$extensionsService->getArgument(0),
58+
$flatExtensions
59+
));
5460
}
61+
}
5562

56-
$extensionsService = $container->getDefinition($extensionsId);
57-
$extensionsService->replaceArgument(0, array_merge(
58-
$extensionsService->getArgument(0),
59-
$flatExtensions
60-
));
63+
protected function getName(): string
64+
{
65+
return $this->name;
6166
}
6267
}

pkg/enqueue/Symfony/DependencyInjection/BuildProcessorRegistryPass.php

Lines changed: 32 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,44 +9,49 @@
99

1010
final class BuildProcessorRegistryPass implements CompilerPassInterface
1111
{
12-
/**
13-
* @var string
14-
*/
15-
private $name;
12+
use FormatTransportNameTrait;
1613

17-
public function __construct(string $transportName)
18-
{
19-
if (empty($transportName)) {
20-
throw new \InvalidArgumentException('The name could not be empty.');
21-
}
22-
23-
$this->name = $transportName;
24-
}
14+
protected $name;
2515

2616
public function process(ContainerBuilder $container): void
2717
{
28-
$processorRegistryId = sprintf('enqueue.transport.%s.processor_registry', $this->name);
29-
if (false == $container->hasDefinition($processorRegistryId)) {
30-
return;
18+
if (false == $container->hasParameter('enqueue.transports')) {
19+
throw new \LogicException('The "enqueue.transports" parameter must be set.');
3120
}
3221

33-
$tag = 'enqueue.transport.processor';
34-
$map = [];
35-
foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) {
36-
foreach ($tagAttributes as $tagAttribute) {
37-
$transport = $tagAttribute['transport'] ?? 'default';
22+
$names = $container->getParameter('enqueue.transports');
3823

39-
if ($transport !== $this->name && 'all' !== $transport) {
40-
continue;
41-
}
24+
foreach ($names as $name) {
25+
$this->name = $name;
26+
27+
$processorRegistryId = $this->format('processor_registry');
28+
if (false == $container->hasDefinition($processorRegistryId)) {
29+
throw new \LogicException(sprintf('Service "%s" not found', $processorRegistryId));
30+
}
4231

43-
$processor = $tagAttribute['processor'] ?? $serviceId;
32+
$tag = 'enqueue.transport.processor';
33+
$map = [];
34+
foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) {
35+
foreach ($tagAttributes as $tagAttribute) {
36+
$transport = $tagAttribute['transport'] ?? 'default';
4437

45-
$map[$processor] = new Reference($serviceId);
38+
if ($transport !== $this->name && 'all' !== $transport) {
39+
continue;
40+
}
41+
42+
$processor = $tagAttribute['processor'] ?? $serviceId;
43+
44+
$map[$processor] = new Reference($serviceId);
45+
}
4646
}
47+
48+
$registry = $container->getDefinition($processorRegistryId);
49+
$registry->setArgument(0, ServiceLocatorTagPass::register($container, $map, $processorRegistryId));
4750
}
51+
}
4852

49-
$registry = $container->getDefinition($processorRegistryId);
50-
$registry->setArgument(0, ServiceLocatorTagPass::register($container, $map, $processorRegistryId));
53+
protected function getName(): string
54+
{
55+
return $this->name;
5156
}
5257
}

pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,25 @@
22

33
namespace Enqueue\Symfony\DependencyInjection;
44

5+
use Symfony\Component\DependencyInjection\ContainerInterface;
6+
use Symfony\Component\DependencyInjection\Reference;
7+
58
trait FormatTransportNameTrait
69
{
710
abstract protected function getName(): string;
811

12+
private function reference(string $serviceName, $invalidBehavior = ContainerInterface::EXCEPTION_ON_INVALID_REFERENCE): Reference
13+
{
14+
return new Reference($this->format($serviceName), $invalidBehavior);
15+
}
16+
17+
private function parameter(string $serviceName): string
18+
{
19+
$fullName = $this->format($serviceName, false);
20+
21+
return "%$fullName%";
22+
}
23+
924
private function format(string $serviceName, $parameter = false): string
1025
{
1126
$pattern = 'enqueue.transport.%s.'.$serviceName;

pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php

Lines changed: 27 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,29 @@ public function testShouldBeFinal()
2525
$this->assertClassFinal(BuildConsumptionExtensionsPass::class);
2626
}
2727

28-
public function testCouldBeConstructedWithName()
28+
public function testCouldBeConstructedWithoutArguments()
2929
{
30-
$pass = new BuildConsumptionExtensionsPass('aName');
31-
32-
$this->assertAttributeSame('aName', 'name', $pass);
30+
new BuildConsumptionExtensionsPass();
3331
}
3432

35-
public function testThrowIfNameEmptyOnConstruct()
33+
public function testThrowIfEnqueueTransportsParameterNotSet()
3634
{
37-
$this->expectException(\InvalidArgumentException::class);
38-
$this->expectExceptionMessage('The name could not be empty.');
39-
new BuildConsumptionExtensionsPass('');
35+
$pass = new BuildConsumptionExtensionsPass();
36+
37+
$this->expectException(\LogicException::class);
38+
$this->expectExceptionMessage('The "enqueue.transports" parameter must be set.');
39+
$pass->process(new ContainerBuilder());
4040
}
4141

42-
public function testShouldDoNothingIfExtensionsServiceIsNotRegistered()
42+
public function testThrowsIfNoConsumptionExtensionsServiceFoundForConfiguredTransport()
4343
{
4444
$container = new ContainerBuilder();
45+
$container->setParameter('enqueue.transports', ['foo', 'bar']);
4546

46-
//guard
47-
$this->assertFalse($container->hasDefinition('enqueue.transport.aName.consumption_extensions'));
47+
$pass = new BuildConsumptionExtensionsPass();
4848

49-
$pass = new BuildConsumptionExtensionsPass('aName');
49+
$this->expectException(\LogicException::class);
50+
$this->expectExceptionMessage('Service "enqueue.transport.foo.consumption_extensions" not found');
5051
$pass->process($container);
5152
}
5253

@@ -56,6 +57,7 @@ public function testShouldRegisterTransportExtension()
5657
$extensions->addArgument([]);
5758

5859
$container = new ContainerBuilder();
60+
$container->setParameter('enqueue.transports', ['aName']);
5961
$container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions);
6062

6163
$container->register('aFooExtension', ExtensionInterface::class)
@@ -65,7 +67,7 @@ public function testShouldRegisterTransportExtension()
6567
->addTag('enqueue.transport.consumption_extension', ['transport' => 'aName'])
6668
;
6769

68-
$pass = new BuildConsumptionExtensionsPass('aName');
70+
$pass = new BuildConsumptionExtensionsPass();
6971
$pass->process($container);
7072

7173
$this->assertInternalType('array', $extensions->getArgument(0));
@@ -81,6 +83,7 @@ public function testShouldIgnoreOtherTransportExtensions()
8183
$extensions->addArgument([]);
8284

8385
$container = new ContainerBuilder();
86+
$container->setParameter('enqueue.transports', ['aName']);
8487
$container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions);
8588

8689
$container->register('aFooExtension', ExtensionInterface::class)
@@ -90,7 +93,7 @@ public function testShouldIgnoreOtherTransportExtensions()
9093
->addTag('enqueue.transport.consumption_extension', ['transport' => 'anotherName'])
9194
;
9295

93-
$pass = new BuildConsumptionExtensionsPass('aName');
96+
$pass = new BuildConsumptionExtensionsPass();
9497
$pass->process($container);
9598

9699
$this->assertInternalType('array', $extensions->getArgument(0));
@@ -105,6 +108,7 @@ public function testShouldAddExtensionIfTransportAll()
105108
$extensions->addArgument([]);
106109

107110
$container = new ContainerBuilder();
111+
$container->setParameter('enqueue.transports', ['aName']);
108112
$container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions);
109113

110114
$container->register('aFooExtension', ExtensionInterface::class)
@@ -114,7 +118,7 @@ public function testShouldAddExtensionIfTransportAll()
114118
->addTag('enqueue.transport.consumption_extension', ['transport' => 'anotherName'])
115119
;
116120

117-
$pass = new BuildConsumptionExtensionsPass('aName');
121+
$pass = new BuildConsumptionExtensionsPass();
118122
$pass->process($container);
119123

120124
$this->assertInternalType('array', $extensions->getArgument(0));
@@ -129,6 +133,7 @@ public function testShouldTreatTagsWithoutTransportAsDefaultTransport()
129133
$extensions->addArgument([]);
130134

131135
$container = new ContainerBuilder();
136+
$container->setParameter('enqueue.transports', ['default']);
132137
$container->setDefinition('enqueue.transport.default.consumption_extensions', $extensions);
133138

134139
$container->register('aFooExtension', ExtensionInterface::class)
@@ -138,7 +143,7 @@ public function testShouldTreatTagsWithoutTransportAsDefaultTransport()
138143
->addTag('enqueue.transport.consumption_extension')
139144
;
140145

141-
$pass = new BuildConsumptionExtensionsPass('default');
146+
$pass = new BuildConsumptionExtensionsPass();
142147
$pass->process($container);
143148

144149
$this->assertInternalType('array', $extensions->getArgument(0));
@@ -151,6 +156,7 @@ public function testShouldTreatTagsWithoutTransportAsDefaultTransport()
151156
public function testShouldOrderExtensionsByPriority()
152157
{
153158
$container = new ContainerBuilder();
159+
$container->setParameter('enqueue.transports', ['default']);
154160

155161
$extensions = new Definition();
156162
$extensions->addArgument([]);
@@ -168,7 +174,7 @@ public function testShouldOrderExtensionsByPriority()
168174
$extension->addTag('enqueue.transport.consumption_extension', ['priority' => 2]);
169175
$container->setDefinition('baz_extension', $extension);
170176

171-
$pass = new BuildConsumptionExtensionsPass('default');
177+
$pass = new BuildConsumptionExtensionsPass();
172178
$pass->process($container);
173179

174180
$orderedExtensions = $extensions->getArgument(0);
@@ -182,6 +188,7 @@ public function testShouldOrderExtensionsByPriority()
182188
public function testShouldAssumePriorityZeroIfPriorityIsNotSet()
183189
{
184190
$container = new ContainerBuilder();
191+
$container->setParameter('enqueue.transports', ['default']);
185192

186193
$extensions = new Definition();
187194
$extensions->addArgument([]);
@@ -199,7 +206,7 @@ public function testShouldAssumePriorityZeroIfPriorityIsNotSet()
199206
$extension->addTag('enqueue.transport.consumption_extension', ['priority' => -1]);
200207
$container->setDefinition('baz_extension', $extension);
201208

202-
$pass = new BuildConsumptionExtensionsPass('default');
209+
$pass = new BuildConsumptionExtensionsPass();
203210
$pass->process($container);
204211

205212
$orderedExtensions = $extensions->getArgument(0);
@@ -219,6 +226,7 @@ public function testShouldMergeWithAddedPreviously()
219226
]);
220227

221228
$container = new ContainerBuilder();
229+
$container->setParameter('enqueue.transports', ['aName']);
222230
$container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions);
223231

224232
$container->register('aFooExtension', ExtensionInterface::class)
@@ -228,7 +236,7 @@ public function testShouldMergeWithAddedPreviously()
228236
->addTag('enqueue.transport.consumption_extension')
229237
;
230238

231-
$pass = new BuildConsumptionExtensionsPass('aName');
239+
$pass = new BuildConsumptionExtensionsPass();
232240
$pass->process($container);
233241

234242
$this->assertInternalType('array', $extensions->getArgument(0));

0 commit comments

Comments
 (0)