Skip to content

Commit 5c96dee

Browse files
committed
Multiple transport support. Container based approach.
1 parent b609c7a commit 5c96dee

20 files changed

+383
-455
lines changed

pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
use Enqueue\Client\CommandSubscriberInterface;
88
use Enqueue\Client\TopicSubscriberInterface;
99
use Enqueue\Client\TraceableProducer;
10-
use Enqueue\Consumption\QueueConsumer;
1110
use Enqueue\JobQueue\Job;
1211
use Enqueue\Symfony\DependencyInjection\ClientFactory;
1312
use Enqueue\Symfony\DependencyInjection\TransportFactory;
@@ -32,8 +31,7 @@ public function load(array $configs, ContainerBuilder $container): void
3231
$this->setupAutowiringForProcessors($container);
3332

3433
$transportFactory = (new TransportFactory('default'));
35-
$transportFactory->createConnectionFactory($container, $config['transport']);
36-
$transportFactory->createContext($container, $config['transport']);
34+
$transportFactory->build($container, $config['transport']);
3735

3836
if (isset($config['client'])) {
3937
$loader->load('client.yml');
@@ -80,7 +78,7 @@ public function load(array $configs, ContainerBuilder $container): void
8078
}
8179

8280
// todo configure queue consumer
83-
$container->getDefinition(QueueConsumer::class)
81+
$container->getDefinition('enqueue.transport.default.queue_consumer')
8482
->replaceArgument(2, $config['consumption']['idle_timeout'])
8583
->replaceArgument(3, $config['consumption']['receive_timeout'])
8684
;

pkg/enqueue-bundle/EnqueueBundle.php

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncTransformersPass;
88
use Enqueue\Symfony\Client\DependencyInjection\AnalyzeRouteCollectionPass;
99
use Enqueue\Symfony\Client\DependencyInjection\BuildClientExtensionsPass;
10-
use Enqueue\Symfony\Client\DependencyInjection\BuildCommandSubscriberRoutesPass;
11-
use Enqueue\Symfony\Client\DependencyInjection\BuildConsumptionExtensionsPass;
12-
use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRegistryPass;
13-
use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRoutesPass;
14-
use Enqueue\Symfony\Client\DependencyInjection\BuildTopicSubscriberRoutesPass;
10+
use Enqueue\Symfony\Client\DependencyInjection\BuildCommandSubscriberRoutesPass as BuildClientCommandSubscriberRoutesPass;
11+
use Enqueue\Symfony\Client\DependencyInjection\BuildConsumptionExtensionsPass as BuildClientConsumptionExtensionsPass;
12+
use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRegistryPass as BuildClientProcessorRegistryPass;
13+
use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRoutesPass as BuildClientProcessorRoutesPass;
14+
use Enqueue\Symfony\Client\DependencyInjection\BuildTopicSubscriberRoutesPass as BuildClientTopicSubscriberRoutesPass;
15+
use Enqueue\Symfony\DependencyInjection\BuildConsumptionExtensionsPass;
16+
use Enqueue\Symfony\DependencyInjection\BuildProcessorRegistryPass;
1517
use Symfony\Component\DependencyInjection\Compiler\PassConfig;
1618
use Symfony\Component\DependencyInjection\ContainerBuilder;
1719
use Symfony\Component\HttpKernel\Bundle\Bundle;
@@ -20,13 +22,18 @@ class EnqueueBundle extends Bundle
2022
{
2123
public function build(ContainerBuilder $container): void
2224
{
25+
//transport passes
2326
$container->addCompilerPass(new BuildConsumptionExtensionsPass('default'));
27+
$container->addCompilerPass(new BuildProcessorRegistryPass('default'));
28+
29+
//client passes
30+
$container->addCompilerPass(new BuildClientConsumptionExtensionsPass('default'));
2431
$container->addCompilerPass(new BuildClientExtensionsPass('default'));
25-
$container->addCompilerPass(new BuildTopicSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
26-
$container->addCompilerPass(new BuildCommandSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
27-
$container->addCompilerPass(new BuildProcessorRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
32+
$container->addCompilerPass(new BuildClientTopicSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
33+
$container->addCompilerPass(new BuildClientCommandSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
34+
$container->addCompilerPass(new BuildClientProcessorRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
2835
$container->addCompilerPass(new AnalyzeRouteCollectionPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 30);
29-
$container->addCompilerPass(new BuildProcessorRegistryPass('default'));
36+
$container->addCompilerPass(new BuildClientProcessorRegistryPass('default'));
3037

3138
if (class_exists(AsyncEventDispatcherExtension::class)) {
3239
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
Lines changed: 8 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,55 +1,16 @@
1-
parameters:
2-
enqueue.queue_consumer.default_idle_time: 0
3-
enqueue.queue_consumer.default_receive_timeout: 10000
4-
51
services:
6-
enqueue.consumption.extensions:
7-
class: 'Enqueue\Consumption\ChainExtension'
8-
public: false
2+
enqueue.locator:
3+
class: 'Symfony\Component\DependencyInjection\ServiceLocator'
94
arguments:
105
- []
6+
tags: ['container.service_locator']
117

12-
Enqueue\Consumption\QueueConsumer:
13-
class: 'Enqueue\Consumption\QueueConsumer'
14-
public: true
15-
arguments:
16-
- '@enqueue.transport.default.context'
17-
- '@enqueue.consumption.extensions'
18-
- '%enqueue.queue_consumer.default_idle_time%'
19-
- '%enqueue.queue_consumer.default_receive_timeout%'
20-
21-
# Deprecated. To be removed in 0.10.
22-
enqueue.consumption.queue_consumer:
23-
public: true
24-
alias: 'Enqueue\Consumption\QueueConsumer'
25-
26-
Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand:
27-
class: 'Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand'
8+
enqueue.consume_command:
9+
class: 'Enqueue\Symfony\Consumption\ConfigurableConsumeCommand'
2810
public: true
2911
arguments:
30-
- '@Enqueue\Consumption\QueueConsumer'
12+
- '@enqueue.locator'
13+
- 'enqueue.transport.%s.queue_consumer'
14+
- 'enqueue.transport.%s.processor_registry'
3115
tags:
3216
- { name: 'console.command' }
33-
34-
# Deprecated. To be removed in 0.10.
35-
enqueue.command.consume_messages:
36-
public: true
37-
alias: 'Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand'
38-
39-
enqueue.transport.rpc_factory:
40-
class: 'Enqueue\Rpc\RpcFactory'
41-
public: false
42-
arguments:
43-
- '@enqueue.transport.default.context'
44-
45-
Enqueue\Rpc\RpcClient:
46-
class: 'Enqueue\Rpc\RpcClient'
47-
public: true
48-
arguments:
49-
- '@enqueue.transport.default.context'
50-
- '@enqueue.transport.rpc_factory'
51-
52-
# Deprecated. To be removed in 0.10.
53-
enqueue.transport.rpc_client:
54-
public: true
55-
alias: 'Enqueue\Rpc\RpcClient'

pkg/enqueue-bundle/Tests/Functional/App/config/custom-config.yml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,16 @@ services:
2323
alias: 'enqueue.transport.default.context'
2424
public: true
2525

26-
test_enqueue.client.default.consume_messages_command:
27-
alias: 'enqueue.client.default.consume_messages_command'
26+
test_enqueue.consume_command:
27+
alias: 'enqueue.consume_command'
2828
public: true
2929

3030
test.message.processor:
3131
class: 'Enqueue\Bundle\Tests\Functional\TestProcessor'
3232
public: true
3333
tags:
3434
- { name: 'enqueue.topic_subscriber', client: 'default' }
35+
- { name: 'enqueue.transport.processor', transport: 'default' }
3536

3637
test.message.command_processor:
3738
class: 'Enqueue\Bundle\Tests\Functional\TestCommandProcessor'

pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php

Lines changed: 32 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -213,39 +213,38 @@ public function testClientConsumeMessagesFromExplicitlySetQueue(array $enqueueCo
213213
$this->assertEquals($expectedBody, $processor->message->getBody());
214214
}
215215

216-
// /**
217-
// * @dataProvider provideEnqueueConfigs
218-
// */
219-
// public function testTransportConsumeMessagesCommandShouldConsumeMessage(array $enqueueConfig)
220-
// {
221-
// $this->customSetUp($enqueueConfig);
222-
//
223-
// if ($this->getTestQueue() instanceof StompDestination) {
224-
// $this->markTestSkipped('The test fails with the exception Stomp\Exception\ErrorFrameException: Error "precondition_failed". '.
225-
// 'It happens because of the destination options are different from the one used while creating the dest. Nothing to do about it'
226-
// );
227-
// }
228-
//
229-
// $expectedBody = __METHOD__.time();
230-
//
231-
// $command = static::$container->get('test_enqueue.client.default.consume_messages_command');
232-
// $command->setContainer(static::$container);
233-
// $processor = static::$container->get('test.message.processor');
234-
//
235-
// $this->getMessageProducer()->sendEvent(TestProcessor::TOPIC, $expectedBody);
236-
//
237-
// $tester = new CommandTester($command);
238-
// $tester->execute([
239-
// '--message-limit' => 1,
240-
// '--time-limit' => '+2sec',
241-
// '--receive-timeout' => 1000,
242-
// '--queue' => [$this->getTestQueue()->getQueueName()],
243-
// 'processor-service' => 'test.message.processor',
244-
// ]);
245-
//
246-
// $this->assertInstanceOf(Message::class, $processor->message);
247-
// $this->assertEquals($expectedBody, $processor->message->getBody());
248-
// }
216+
/**
217+
* @dataProvider provideEnqueueConfigs
218+
*/
219+
public function testTransportConsumeMessagesCommandShouldConsumeMessage(array $enqueueConfig)
220+
{
221+
$this->customSetUp($enqueueConfig);
222+
223+
if ($this->getTestQueue() instanceof StompDestination) {
224+
$this->markTestSkipped('The test fails with the exception Stomp\Exception\ErrorFrameException: Error "precondition_failed". '.
225+
'It happens because of the destination options are different from the one used while creating the dest. Nothing to do about it'
226+
);
227+
}
228+
229+
$expectedBody = __METHOD__.time();
230+
231+
$command = static::$container->get('test_enqueue.consume_command');
232+
$processor = static::$container->get('test.message.processor');
233+
234+
$this->getMessageProducer()->sendEvent(TestProcessor::TOPIC, $expectedBody);
235+
236+
$tester = new CommandTester($command);
237+
$tester->execute([
238+
'--message-limit' => 1,
239+
'--time-limit' => '+2sec',
240+
'--receive-timeout' => 1000,
241+
'processor' => 'test.message.processor',
242+
'queues' => [$this->getTestQueue()->getQueueName()],
243+
]);
244+
245+
$this->assertInstanceOf(Message::class, $processor->message);
246+
$this->assertEquals($expectedBody, $processor->message->getBody());
247+
}
249248

250249
/**
251250
* @return string

pkg/enqueue/Consumption/ArrayQueueConsumerRegistry.php

Lines changed: 0 additions & 36 deletions
This file was deleted.

pkg/enqueue/Consumption/QueueConsumerRegistryInterface.php

Lines changed: 0 additions & 10 deletions
This file was deleted.

pkg/enqueue/Container/Container.php

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
namespace Enqueue\Container;
4+
5+
use Psr\Container\ContainerInterface;
6+
7+
class Container implements ContainerInterface
8+
{
9+
/**
10+
* @var array
11+
*/
12+
private $services;
13+
14+
public function __construct(array $services)
15+
{
16+
$this->services = $services;
17+
}
18+
19+
public function get($id)
20+
{
21+
if (false == $this->has($id)) {
22+
throw new NotFoundException(sprintf('The service "%s" not found.', $id));
23+
}
24+
25+
return $this->services[$id];
26+
}
27+
28+
public function has($id)
29+
{
30+
return array_key_exists($id, $this->services);
31+
}
32+
}
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
<?php
2+
3+
namespace Enqueue\Container;
4+
5+
use Psr\Container\NotFoundExceptionInterface;
6+
7+
class NotFoundException extends \InvalidArgumentException implements NotFoundExceptionInterface
8+
{
9+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
<?php
2+
3+
namespace Enqueue\Symfony\Consumption;
4+
5+
use Enqueue\Consumption\Extension\LoggerExtension;
6+
use Psr\Log\NullLogger;
7+
use Symfony\Component\Console\Input\InputInterface;
8+
use Symfony\Component\Console\Input\InputOption;
9+
use Symfony\Component\Console\Logger\ConsoleLogger;
10+
use Symfony\Component\Console\Output\OutputInterface;
11+
12+
trait ChooseLoggerCommandTrait
13+
{
14+
protected function configureLoggerExtension(): void
15+
{
16+
$this
17+
->addOption('logger', null, InputOption::VALUE_OPTIONAL, 'A logger to be used. Could be "default", "null", "stdout".', 'default')
18+
;
19+
}
20+
21+
protected function getLoggerExtension(InputInterface $input, OutputInterface $output): ?LoggerExtension
22+
{
23+
$logger = $input->getOption('logger');
24+
switch ($logger) {
25+
case 'null':
26+
return new LoggerExtension(new NullLogger());
27+
case 'stdout':
28+
return new LoggerExtension(new ConsoleLogger($output));
29+
case 'default':
30+
return null;
31+
default:
32+
throw new \LogicException(sprintf('The logger "%s" is not supported', $logger));
33+
}
34+
}
35+
}

0 commit comments

Comments
 (0)