diff --git a/composer.json b/composer.json index 3c9af16f7..266a9e326 100644 --- a/composer.json +++ b/composer.json @@ -18,6 +18,7 @@ "doctrine/dbal": "~2.5", "ramsey/uuid": "^2|^3.5", "psr/log": "^1", + "psr/container": "^1", "symfony/event-dispatcher": "4.0.*", "makasim/temp-file": "^0.2", "google/cloud-pubsub": "^0.6.1|^1.0", diff --git a/pkg/enqueue-bundle/DependencyInjection/Configuration.php b/pkg/enqueue-bundle/DependencyInjection/Configuration.php index df0b3f6d7..153adadc5 100644 --- a/pkg/enqueue-bundle/DependencyInjection/Configuration.php +++ b/pkg/enqueue-bundle/DependencyInjection/Configuration.php @@ -26,8 +26,13 @@ public function getConfigTreeBuilder(): TreeBuilder return ['transport' => ['dsn' => 'null:']]; }); + $transportFactory = new TransportFactory('default'); + $transportNode = $rootNode->children()->arrayNode('transport'); - (new TransportFactory('default'))->addConfiguration($transportNode); + $transportFactory->addTransportConfiguration($transportNode); + + $consumptionNode = $rootNode->children()->arrayNode('consumption'); + $transportFactory->addQueueConsumerConfiguration($consumptionNode); $rootNode->children() ->arrayNode('client')->children() @@ -40,18 +45,6 @@ public function getConfigTreeBuilder(): TreeBuilder ->scalarNode('default_processor_queue')->defaultValue('default')->cannotBeEmpty()->end() ->integerNode('redelivered_delay_time')->min(0)->defaultValue(0)->end() ->end()->end() - ->arrayNode('consumption')->addDefaultsIfNotSet()->children() - ->integerNode('idle_timeout') - ->min(0) - ->defaultValue(0) - ->info('the time in milliseconds queue consumer waits if no message received') - ->end() - ->integerNode('receive_timeout') - ->min(0) - ->defaultValue(100) - ->info('the time in milliseconds queue consumer waits for a message (100 ms by default)') - ->end() - ->end()->end() ->booleanNode('job')->defaultFalse()->end() ->arrayNode('async_events') ->addDefaultsIfNotSet() diff --git a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php index ab9dd70ce..4329487e9 100644 --- a/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php +++ b/pkg/enqueue-bundle/DependencyInjection/EnqueueExtension.php @@ -7,7 +7,6 @@ use Enqueue\Client\CommandSubscriberInterface; use Enqueue\Client\TopicSubscriberInterface; use Enqueue\Client\TraceableProducer; -use Enqueue\Consumption\QueueConsumer; use Enqueue\JobQueue\Job; use Enqueue\Symfony\DependencyInjection\ClientFactory; use Enqueue\Symfony\DependencyInjection\TransportFactory; @@ -29,13 +28,15 @@ public function load(array $configs, ContainerBuilder $container): void $loader = new YamlFileLoader($container, new FileLocator(__DIR__.'/../Resources/config')); $loader->load('services.yml'); - $this->setupAutowiringForProcessors($container); - $transportFactory = (new TransportFactory('default')); - $transportFactory->createConnectionFactory($container, $config['transport']); - $transportFactory->createContext($container, $config['transport']); + $transportFactory->buildConnectionFactory($container, $config['transport']); + $transportFactory->buildContext($container, []); + $transportFactory->buildQueueConsumer($container, $config['consumption']); + $transportFactory->buildRpcClient($container, []); if (isset($config['client'])) { + $this->setupAutowiringForProcessors($container); + $loader->load('client.yml'); $loader->load('extensions/flush_spool_producer_extension.yml'); $loader->load('extensions/exclusive_command_extension.yml'); @@ -77,17 +78,20 @@ public function load(array $configs, ContainerBuilder $container): void ->replaceArgument(1, $config['client']['redelivered_delay_time']) ; } - } - // todo configure queue consumer - $container->getDefinition(QueueConsumer::class) - ->replaceArgument(2, $config['consumption']['idle_timeout']) - ->replaceArgument(3, $config['consumption']['receive_timeout']) - ; + $locatorId = 'enqueue.locator'; + if ($container->hasDefinition($locatorId)) { + $locator = $container->getDefinition($locatorId); + $locator->replaceArgument(0, array_replace($locator->getArgument(0), [ + 'enqueue.client.default.queue_consumer' => new Reference('enqueue.client.default.queue_consumer'), + 'enqueue.client.default.driver' => new Reference('enqueue.client.default.driver'), + 'enqueue.client.default.delegate_processor' => new Reference('enqueue.client.default.delegate_processor'), + 'enqueue.client.default.producer' => new Reference('enqueue.client.default.producer'), + ])); + } - if ($container->hasDefinition('enqueue.client.default.queue_consumer')) { $container->getDefinition('enqueue.client.default.queue_consumer') - ->replaceArgument(2, $config['consumption']['idle_timeout']) + ->replaceArgument(2, $config['consumption']['idle_time']) ->replaceArgument(3, $config['consumption']['receive_timeout']) ; } diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index 0fb5cd15d..b1b2ad85d 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -7,11 +7,13 @@ use Enqueue\AsyncEventDispatcher\DependencyInjection\AsyncTransformersPass; use Enqueue\Symfony\Client\DependencyInjection\AnalyzeRouteCollectionPass; use Enqueue\Symfony\Client\DependencyInjection\BuildClientExtensionsPass; -use Enqueue\Symfony\Client\DependencyInjection\BuildCommandSubscriberRoutesPass; -use Enqueue\Symfony\Client\DependencyInjection\BuildConsumptionExtensionsPass; -use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRegistryPass; -use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRoutesPass; -use Enqueue\Symfony\Client\DependencyInjection\BuildTopicSubscriberRoutesPass; +use Enqueue\Symfony\Client\DependencyInjection\BuildCommandSubscriberRoutesPass as BuildClientCommandSubscriberRoutesPass; +use Enqueue\Symfony\Client\DependencyInjection\BuildConsumptionExtensionsPass as BuildClientConsumptionExtensionsPass; +use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRegistryPass as BuildClientProcessorRegistryPass; +use Enqueue\Symfony\Client\DependencyInjection\BuildProcessorRoutesPass as BuildClientProcessorRoutesPass; +use Enqueue\Symfony\Client\DependencyInjection\BuildTopicSubscriberRoutesPass as BuildClientTopicSubscriberRoutesPass; +use Enqueue\Symfony\DependencyInjection\BuildConsumptionExtensionsPass; +use Enqueue\Symfony\DependencyInjection\BuildProcessorRegistryPass; use Symfony\Component\DependencyInjection\Compiler\PassConfig; use Symfony\Component\DependencyInjection\ContainerBuilder; use Symfony\Component\HttpKernel\Bundle\Bundle; @@ -20,13 +22,18 @@ class EnqueueBundle extends Bundle { public function build(ContainerBuilder $container): void { + //transport passes $container->addCompilerPass(new BuildConsumptionExtensionsPass('default')); + $container->addCompilerPass(new BuildProcessorRegistryPass('default')); + + //client passes + $container->addCompilerPass(new BuildClientConsumptionExtensionsPass('default')); $container->addCompilerPass(new BuildClientExtensionsPass('default')); - $container->addCompilerPass(new BuildTopicSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); - $container->addCompilerPass(new BuildCommandSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); - $container->addCompilerPass(new BuildProcessorRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); + $container->addCompilerPass(new BuildClientTopicSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); + $container->addCompilerPass(new BuildClientCommandSubscriberRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); + $container->addCompilerPass(new BuildClientProcessorRoutesPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); $container->addCompilerPass(new AnalyzeRouteCollectionPass('default'), PassConfig::TYPE_BEFORE_OPTIMIZATION, 30); - $container->addCompilerPass(new BuildProcessorRegistryPass('default')); + $container->addCompilerPass(new BuildClientProcessorRegistryPass('default')); if (class_exists(AsyncEventDispatcherExtension::class)) { $container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); diff --git a/pkg/enqueue-bundle/Resources/config/client.yml b/pkg/enqueue-bundle/Resources/config/client.yml index 0a6d3541b..2d929ef17 100644 --- a/pkg/enqueue-bundle/Resources/config/client.yml +++ b/pkg/enqueue-bundle/Resources/config/client.yml @@ -1,5 +1,6 @@ services: - enqueue.client.default.context: +services: + enqueue.client.default.context: class: 'Interop\Queue\Context' factory: ['@enqueue.client.default.driver', 'getContext'] @@ -48,7 +49,7 @@ services: - '@enqueue.client.default.driver' enqueue.client.default.processor_registry: - class: 'Enqueue\Symfony\Client\ContainerProcessorRegistry' + class: 'Enqueue\Symfony\ContainerProcessorRegistry' enqueue.client.default.delegate_processor: class: 'Enqueue\Client\DelegateProcessor' @@ -75,37 +76,6 @@ services: arguments: - [] - enqueue.client.default.consume_messages_command: - class: 'Enqueue\Symfony\Client\ConsumeMessagesCommand' - arguments: - - '@enqueue.client.default.queue_consumer' - - '@enqueue.client.default.delegate_processor' - - '@enqueue.client.default.driver' - tags: - - { name: 'console.command' } - - enqueue.client.default.produce_message_command: - class: 'Enqueue\Symfony\Client\ProduceMessageCommand' - arguments: - - '@enqueue.client.default.producer' - tags: - - { name: 'console.command' } - - enqueue.client.default.setup_broker_command: - class: 'Enqueue\Symfony\Client\SetupBrokerCommand' - arguments: - - '@enqueue.client.default.driver' - tags: - - { name: 'console.command' } - - enqueue.client.default.routes_command: - class: 'Enqueue\Symfony\Client\RoutesCommand' - arguments: - - '@enqueue.client.default.config' - - '@enqueue.client.default.route_collection' - tags: - - { name: 'console.command' } - # todo enqueue.profiler.message_queue_collector: class: 'Enqueue\Bundle\Profiler\MessageQueueCollector' diff --git a/pkg/enqueue-bundle/Resources/config/services.yml b/pkg/enqueue-bundle/Resources/config/services.yml index 9b350c9b9..d63a4e89f 100644 --- a/pkg/enqueue-bundle/Resources/config/services.yml +++ b/pkg/enqueue-bundle/Resources/config/services.yml @@ -1,55 +1,49 @@ -parameters: - enqueue.queue_consumer.default_idle_time: 0 - enqueue.queue_consumer.default_receive_timeout: 10000 - services: - enqueue.consumption.extensions: - class: 'Enqueue\Consumption\ChainExtension' - public: false + enqueue.locator: + class: 'Symfony\Component\DependencyInjection\ServiceLocator' arguments: - [] + tags: ['container.service_locator'] - Enqueue\Consumption\QueueConsumer: - class: 'Enqueue\Consumption\QueueConsumer' - public: true + enqueue.transport.consume_command: + class: 'Enqueue\Symfony\Consumption\ConfigurableConsumeCommand' arguments: - - '@enqueue.transport.default.context' - - '@enqueue.consumption.extensions' - - '%enqueue.queue_consumer.default_idle_time%' - - '%enqueue.queue_consumer.default_receive_timeout%' - - # Deprecated. To be removed in 0.10. - enqueue.consumption.queue_consumer: - public: true - alias: 'Enqueue\Consumption\QueueConsumer' + - '@enqueue.locator' + - 'enqueue.transport.%s.queue_consumer' + - 'enqueue.transport.%s.processor_registry' + tags: + - { name: 'console.command' } - Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand: - class: 'Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand' - public: true + enqueue.client.consume_command: + class: 'Enqueue\Symfony\Client\ConsumeCommand' arguments: - - '@Enqueue\Consumption\QueueConsumer' + - '@enqueue.locator' + - 'enqueue.client.%s.queue_consumer' + - 'enqueue.client.%s.driver' + - 'enqueue.client.%s.delegate_processor' tags: - { name: 'console.command' } - # Deprecated. To be removed in 0.10. - enqueue.command.consume_messages: - public: true - alias: 'Enqueue\Symfony\Consumption\ContainerAwareConsumeMessagesCommand' - - enqueue.transport.rpc_factory: - class: 'Enqueue\Rpc\RpcFactory' - public: false + enqueue.client.produce_command: + class: 'Enqueue\Symfony\Client\ProduceCommand' arguments: - - '@enqueue.transport.default.context' + - '@enqueue.locator' + - 'enqueue.client.%s.producer' + tags: + - { name: 'console.command' } - Enqueue\Rpc\RpcClient: - class: 'Enqueue\Rpc\RpcClient' - public: true + enqueue.client.setup_broker_command: + class: 'Enqueue\Symfony\Client\SetupBrokerCommand' arguments: - - '@enqueue.transport.default.context' - - '@enqueue.transport.rpc_factory' + - '@enqueue.locator' + - 'enqueue.client.%s.driver' + tags: + - { name: 'console.command' } - # Deprecated. To be removed in 0.10. - enqueue.transport.rpc_client: - public: true - alias: 'Enqueue\Rpc\RpcClient' + enqueue.client.routes_command: + class: 'Enqueue\Symfony\Client\RoutesCommand' + arguments: + - '@enqueue.locator' + - 'enqueue.client.%s.driver' + tags: + - { name: 'console.command' } diff --git a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml index ae28909ab..ce1ba9458 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml +++ b/pkg/enqueue-bundle/Tests/Functional/App/config/config.yml @@ -34,6 +34,18 @@ services: alias: 'enqueue.client.default.traceable_producer' public: true + test_enqueue.transport.default.queue_consumer: + alias: 'enqueue.transport.default.queue_consumer' + public: true + + test_enqueue.client.default.queue_consumer: + alias: 'enqueue.client.default.queue_consumer' + public: true + + test_enqueue.transport.default.rpc_client: + alias: 'enqueue.transport.default.rpc_client' + public: true + test_enqueue.client.default.producer: alias: 'enqueue.client.default.producer' public: true @@ -50,12 +62,12 @@ services: alias: 'enqueue.transport.default.context' public: true - test_enqueue.client.default.consume_messages_command: - alias: 'enqueue.client.default.consume_messages_command' + test_enqueue.client.consume_command: + alias: 'enqueue.client.consume_command' public: true - test.enqueue.client.default.routes_command: - alias: 'enqueue.client.default.routes_command' + test.enqueue.client.routes_command: + alias: 'enqueue.client.routes_command' public: true test_async_listener: diff --git a/pkg/enqueue-bundle/Tests/Functional/App/config/custom-config.yml b/pkg/enqueue-bundle/Tests/Functional/App/config/custom-config.yml index 0e59a9528..4768ef8c5 100644 --- a/pkg/enqueue-bundle/Tests/Functional/App/config/custom-config.yml +++ b/pkg/enqueue-bundle/Tests/Functional/App/config/custom-config.yml @@ -23,8 +23,20 @@ services: alias: 'enqueue.transport.default.context' public: true - test_enqueue.client.default.consume_messages_command: - alias: 'enqueue.client.default.consume_messages_command' + test_enqueue.transport.consume_command: + alias: 'enqueue.transport.consume_command' + public: true + + test_enqueue.client.consume_command: + alias: 'enqueue.client.consume_command' + public: true + + test_enqueue.client.produce_command: + alias: 'enqueue.client.produce_command' + public: true + + test_enqueue.client.setup_broker_command: + alias: 'enqueue.client.setup_broker_command' public: true test.message.processor: @@ -32,6 +44,7 @@ services: public: true tags: - { name: 'enqueue.topic_subscriber', client: 'default' } + - { name: 'enqueue.transport.processor', transport: 'default' } test.message.command_processor: class: 'Enqueue\Bundle\Tests\Functional\TestCommandProcessor' diff --git a/pkg/enqueue-bundle/Tests/Functional/QueueConsumerTest.php b/pkg/enqueue-bundle/Tests/Functional/QueueConsumerTest.php index 1c0b16a56..e05d1532d 100644 --- a/pkg/enqueue-bundle/Tests/Functional/QueueConsumerTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/QueueConsumerTest.php @@ -11,8 +11,10 @@ class QueueConsumerTest extends WebTestCase { public function testCouldBeGetFromContainerAsService() { - $queueConsumer = static::$container->get(QueueConsumer::class); + $queueConsumer = static::$container->get('test_enqueue.client.default.queue_consumer'); + $this->assertInstanceOf(QueueConsumer::class, $queueConsumer); + $queueConsumer = static::$container->get('test_enqueue.transport.default.queue_consumer'); $this->assertInstanceOf(QueueConsumer::class, $queueConsumer); } } diff --git a/pkg/enqueue-bundle/Tests/Functional/RoutesCommandTest.php b/pkg/enqueue-bundle/Tests/Functional/RoutesCommandTest.php index c8fd7b7a2..afefe6482 100644 --- a/pkg/enqueue-bundle/Tests/Functional/RoutesCommandTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/RoutesCommandTest.php @@ -12,7 +12,7 @@ class RoutesCommandTest extends WebTestCase { public function testCouldBeGetFromContainerAsService() { - $command = static::$container->get('test.enqueue.client.default.routes_command'); + $command = static::$container->get('test.enqueue.client.routes_command'); $this->assertInstanceOf(RoutesCommand::class, $command); } @@ -20,7 +20,7 @@ public function testCouldBeGetFromContainerAsService() public function testShouldDisplayRegisteredTopics() { /** @var RoutesCommand $command */ - $command = static::$container->get('test.enqueue.client.default.routes_command'); + $command = static::$container->get('test.enqueue.client.routes_command'); $tester = new CommandTester($command); $tester->execute([]); @@ -36,7 +36,7 @@ public function testShouldDisplayRegisteredTopics() public function testShouldDisplayCommands() { /** @var RoutesCommand $command */ - $command = static::$container->get('test.enqueue.client.default.routes_command'); + $command = static::$container->get('test.enqueue.client.routes_command'); $tester = new CommandTester($command); $tester->execute([]); diff --git a/pkg/enqueue-bundle/Tests/Functional/RpcClientTest.php b/pkg/enqueue-bundle/Tests/Functional/RpcClientTest.php index 1b60661a4..3d99bcc72 100644 --- a/pkg/enqueue-bundle/Tests/Functional/RpcClientTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/RpcClientTest.php @@ -11,8 +11,8 @@ class RpcClientTest extends WebTestCase { public function testTransportRpcClientCouldBeGetFromContainerAsService() { - $connection = static::$container->get(RpcClient::class); + $rpcClient = static::$container->get('test_enqueue.transport.default.rpc_client'); - $this->assertInstanceOf(RpcClient::class, $connection); + $this->assertInstanceOf(RpcClient::class, $rpcClient); } } diff --git a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php index e9a576484..d9f7e16d2 100644 --- a/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php +++ b/pkg/enqueue-bundle/Tests/Functional/UseCasesTest.php @@ -125,7 +125,7 @@ public function provideEnqueueConfigs() /** * @dataProvider provideEnqueueConfigs */ - public function testProducerSendsMessage(array $enqueueConfig) + public function testProducerSendsEventMessage(array $enqueueConfig) { $this->customSetUp($enqueueConfig); @@ -163,14 +163,77 @@ public function testProducerSendsCommandMessage(array $enqueueConfig) $this->assertSame($expectedBody, $message->getBody()); } - /** - * @dataProvider provideEnqueueConfigs - */ - public function testClientConsumeCommandMessagesFromExplicitlySetQueue(array $enqueueConfig) + public function testProducerSendsEventMessageViaProduceCommand() { - $this->customSetUp($enqueueConfig); + $this->customSetUp([ + 'transport' => getenv('AMQP_DSN'), + ]); + + $expectedBody = __METHOD__.time(); + + $command = static::$container->get('test_enqueue.client.produce_command'); + $tester = new CommandTester($command); + $tester->execute([ + 'message' => $expectedBody, + '--topic' => TestProcessor::TOPIC, + '--client' => 'default', + ]); + + $consumer = $this->getContext()->createConsumer($this->getTestQueue()); + + $message = $consumer->receive(100); + $this->assertInstanceOf(Message::class, $message); + $consumer->acknowledge($message); + + $this->assertSame($expectedBody, $message->getBody()); + } + + public function testProducerSendsCommandMessageViaProduceCommand() + { + $this->customSetUp([ + 'transport' => getenv('AMQP_DSN'), + ]); + + $expectedBody = __METHOD__.time(); + + $command = static::$container->get('test_enqueue.client.produce_command'); + $tester = new CommandTester($command); + $tester->execute([ + 'message' => $expectedBody, + '--command' => TestCommandProcessor::COMMAND, + '--client' => 'default', + ]); + + $consumer = $this->getContext()->createConsumer($this->getTestQueue()); + + $message = $consumer->receive(100); + $this->assertInstanceOf(Message::class, $message); + $consumer->acknowledge($message); + + $this->assertInstanceOf(Message::class, $message); + $this->assertSame($expectedBody, $message->getBody()); + } + + public function testShouldSetupBroker() + { + $this->customSetUp([ + 'transport' => 'file://'.sys_get_temp_dir(), + ]); + + $command = static::$container->get('test_enqueue.client.setup_broker_command'); + $tester = new CommandTester($command); + $tester->execute([]); + + $this->assertSame("Broker set up\n", $tester->getDisplay()); + } + + public function testClientConsumeCommandMessagesFromExplicitlySetQueue() + { + $this->customSetUp([ + 'transport' => getenv('AMQP_DSN'), + ]); - $command = static::$container->get('test_enqueue.client.default.consume_messages_command'); + $command = static::$container->get('test_enqueue.client.consume_command'); $processor = static::$container->get('test.message.command_processor'); $expectedBody = __METHOD__.time(); @@ -180,6 +243,7 @@ public function testClientConsumeCommandMessagesFromExplicitlySetQueue(array $en $tester = new CommandTester($command); $tester->execute([ '--message-limit' => 2, + '--receive-timeout' => 100, '--time-limit' => 'now + 2 seconds', 'client-queue-names' => ['test'], ]); @@ -188,16 +252,15 @@ public function testClientConsumeCommandMessagesFromExplicitlySetQueue(array $en $this->assertEquals($expectedBody, $processor->message->getBody()); } - /** - * @dataProvider provideEnqueueConfigs - */ - public function testClientConsumeMessagesFromExplicitlySetQueue(array $enqueueConfig) + public function testClientConsumeMessagesFromExplicitlySetQueue() { - $this->customSetUp($enqueueConfig); + $this->customSetUp([ + 'transport' => getenv('AMQP_DSN'), + ]); $expectedBody = __METHOD__.time(); - $command = static::$container->get('test_enqueue.client.default.consume_messages_command'); + $command = static::$container->get('test_enqueue.client.consume_command'); $processor = static::$container->get('test.message.processor'); $this->getMessageProducer()->sendEvent(TestProcessor::TOPIC, $expectedBody); @@ -205,6 +268,7 @@ public function testClientConsumeMessagesFromExplicitlySetQueue(array $enqueueCo $tester = new CommandTester($command); $tester->execute([ '--message-limit' => 2, + '--receive-timeout' => 100, '--time-limit' => 'now + 2 seconds', 'client-queue-names' => ['test'], ]); @@ -213,39 +277,37 @@ public function testClientConsumeMessagesFromExplicitlySetQueue(array $enqueueCo $this->assertEquals($expectedBody, $processor->message->getBody()); } -// /** -// * @dataProvider provideEnqueueConfigs -// */ -// public function testTransportConsumeMessagesCommandShouldConsumeMessage(array $enqueueConfig) -// { -// $this->customSetUp($enqueueConfig); -// -// if ($this->getTestQueue() instanceof StompDestination) { -// $this->markTestSkipped('The test fails with the exception Stomp\Exception\ErrorFrameException: Error "precondition_failed". '. -// 'It happens because of the destination options are different from the one used while creating the dest. Nothing to do about it' -// ); -// } -// -// $expectedBody = __METHOD__.time(); -// -// $command = static::$container->get('test_enqueue.client.default.consume_messages_command'); -// $command->setContainer(static::$container); -// $processor = static::$container->get('test.message.processor'); -// -// $this->getMessageProducer()->sendEvent(TestProcessor::TOPIC, $expectedBody); -// -// $tester = new CommandTester($command); -// $tester->execute([ -// '--message-limit' => 1, -// '--time-limit' => '+2sec', -// '--receive-timeout' => 1000, -// '--queue' => [$this->getTestQueue()->getQueueName()], -// 'processor-service' => 'test.message.processor', -// ]); -// -// $this->assertInstanceOf(Message::class, $processor->message); -// $this->assertEquals($expectedBody, $processor->message->getBody()); -// } + public function testTransportConsumeMessagesCommandShouldConsumeMessage() + { + $this->customSetUp([ + 'transport' => getenv('AMQP_DSN'), + ]); + + if ($this->getTestQueue() instanceof StompDestination) { + $this->markTestSkipped('The test fails with the exception Stomp\Exception\ErrorFrameException: Error "precondition_failed". '. + 'It happens because of the destination options are different from the one used while creating the dest. Nothing to do about it' + ); + } + + $expectedBody = __METHOD__.time(); + + $command = static::$container->get('test_enqueue.transport.consume_command'); + $processor = static::$container->get('test.message.processor'); + + $this->getMessageProducer()->sendEvent(TestProcessor::TOPIC, $expectedBody); + + $tester = new CommandTester($command); + $tester->execute([ + '--message-limit' => 1, + '--time-limit' => '+2sec', + '--receive-timeout' => 1000, + 'processor' => 'test.message.processor', + 'queues' => [$this->getTestQueue()->getQueueName()], + ]); + + $this->assertInstanceOf(Message::class, $processor->message); + $this->assertEquals($expectedBody, $processor->message->getBody()); + } /** * @return string diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php index 5c4a20b73..f8aab5dd1 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/ConfigurationTest.php @@ -39,8 +39,8 @@ public function testShouldUseDefaultConfigurationIfNothingIsConfiguredAtAll() $this->assertEquals([ 'transport' => ['dsn' => 'null:'], 'consumption' => [ - 'idle_timeout' => 0, - 'receive_timeout' => 100, + 'idle_time' => 0, + 'receive_timeout' => 10000, ], 'job' => false, 'async_events' => ['enabled' => false], @@ -66,8 +66,8 @@ public function testShouldUseDefaultTransportIfIfTransportIsConfiguredAtAll() $this->assertEquals([ 'transport' => ['dsn' => 'null:'], 'consumption' => [ - 'idle_timeout' => 0, - 'receive_timeout' => 100, + 'idle_time' => 0, + 'receive_timeout' => 10000, ], 'job' => false, 'async_events' => ['enabled' => false], @@ -383,8 +383,8 @@ public function testShouldSetDefaultConfigurationForConsumption() $this->assertArraySubset([ 'consumption' => [ - 'idle_timeout' => 0, - 'receive_timeout' => 100, + 'idle_time' => 0, + 'receive_timeout' => 10000, ], ], $config); } @@ -397,14 +397,14 @@ public function testShouldAllowConfigureConsumption() $config = $processor->processConfiguration($configuration, [[ 'transport' => [], 'consumption' => [ - 'idle_timeout' => 123, + 'idle_time' => 123, 'receive_timeout' => 456, ], ]]); $this->assertArraySubset([ 'consumption' => [ - 'idle_timeout' => 123, + 'idle_time' => 123, 'receive_timeout' => 456, ], ], $config); diff --git a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php index b779e50dc..3efacf7bd 100644 --- a/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php +++ b/pkg/enqueue-bundle/Tests/Unit/DependencyInjection/EnqueueExtensionTest.php @@ -9,7 +9,6 @@ use Enqueue\Client\ProducerInterface; use Enqueue\Client\TopicSubscriberInterface; use Enqueue\Client\TraceableProducer; -use Enqueue\Consumption\QueueConsumer; use Enqueue\JobQueue\JobRunner; use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; @@ -431,14 +430,17 @@ public function testShouldConfigureQueueConsumer() 'transport' => [ ], 'consumption' => [ - 'idle_timeout' => 123, + 'idle_time' => 123, 'receive_timeout' => 456, ], ]], $container); - $def = $container->getDefinition(QueueConsumer::class); - $this->assertSame(123, $def->getArgument(2)); - $this->assertSame(456, $def->getArgument(3)); + $def = $container->getDefinition('enqueue.transport.default.queue_consumer'); + $this->assertSame('%enqueue.transport.default.idle_time%', $def->getArgument(2)); + $this->assertSame('%enqueue.transport.default.receive_timeout%', $def->getArgument(3)); + + $this->assertSame(123, $container->getParameter('enqueue.transport.default.idle_time')); + $this->assertSame(456, $container->getParameter('enqueue.transport.default.receive_timeout')); $def = $container->getDefinition('enqueue.client.default.queue_consumer'); $this->assertSame(123, $def->getArgument(2)); diff --git a/pkg/enqueue/Client/ArrayProcessorRegistry.php b/pkg/enqueue/ArrayProcessorRegistry.php similarity index 79% rename from pkg/enqueue/Client/ArrayProcessorRegistry.php rename to pkg/enqueue/ArrayProcessorRegistry.php index 5dede785c..592908c51 100644 --- a/pkg/enqueue/Client/ArrayProcessorRegistry.php +++ b/pkg/enqueue/ArrayProcessorRegistry.php @@ -1,6 +1,6 @@ processors = $processors; + $this->processors = []; + array_walk($processors, function (Processor $processor, string $key) { + $this->processors[$key] = $processor; + }); } public function add(string $name, Processor $processor): void diff --git a/pkg/enqueue/Client/DelegateProcessor.php b/pkg/enqueue/Client/DelegateProcessor.php index 705ff1266..18985b454 100644 --- a/pkg/enqueue/Client/DelegateProcessor.php +++ b/pkg/enqueue/Client/DelegateProcessor.php @@ -2,6 +2,7 @@ namespace Enqueue\Client; +use Enqueue\ProcessorRegistryInterface; use Interop\Queue\Context; use Interop\Queue\Message as InteropMessage; use Interop\Queue\Processor; diff --git a/pkg/enqueue/Container/Container.php b/pkg/enqueue/Container/Container.php new file mode 100644 index 000000000..255def33b --- /dev/null +++ b/pkg/enqueue/Container/Container.php @@ -0,0 +1,32 @@ +services = $services; + } + + public function get($id) + { + if (false == $this->has($id)) { + throw new NotFoundException(sprintf('The service "%s" not found.', $id)); + } + + return $this->services[$id]; + } + + public function has($id) + { + return array_key_exists($id, $this->services); + } +} diff --git a/pkg/enqueue/Container/NotFoundException.php b/pkg/enqueue/Container/NotFoundException.php new file mode 100644 index 000000000..fcc3386e6 --- /dev/null +++ b/pkg/enqueue/Container/NotFoundException.php @@ -0,0 +1,9 @@ +consumer = $consumer; - $this->processor = $processor; - $this->driver = $driver; + $this->container = $container; + $this->queueConsumerIdPattern = $queueConsumerIdPattern; + $this->driverIdPattern = $driverIdPattern; + $this->processorIdPattern = $processorIdPatter; } protected function configure(): void @@ -57,6 +68,7 @@ protected function configure(): void $this->configureLimitsExtensions(); $this->configureSetupBrokerExtension(); $this->configureQueueConsumerOptions(); + $this->configureLoggerExtension(); $this ->setAliases(['enq:c']) @@ -65,19 +77,31 @@ protected function configure(): void 'It select an appropriate message processor based on a message headers') ->addArgument('client-queue-names', InputArgument::IS_ARRAY, 'Queues to consume messages from') ->addOption('skip', null, InputOption::VALUE_IS_ARRAY | InputOption::VALUE_OPTIONAL, 'Queues to skip consumption of messages from', []) + ->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', 'default') ; } protected function execute(InputInterface $input, OutputInterface $output): ?int { - $this->setQueueConsumerOptions($this->consumer, $input); + $client = $input->getOption('client'); + + try { + $consumer = $this->getQueueConsumer($client); + } catch (NotFoundExceptionInterface $e) { + throw new \LogicException(sprintf('Client "%s" is not supported.', $client), null, $e); + } + + $driver = $this->getDriver($client); + $processor = $this->getProcessor($client); + + $this->setQueueConsumerOptions($consumer, $input); $clientQueueNames = $input->getArgument('client-queue-names'); if (empty($clientQueueNames)) { - $clientQueueNames[$this->driver->getConfig()->getDefaultQueue()] = true; - $clientQueueNames[$this->driver->getConfig()->getRouterQueue()] = true; + $clientQueueNames[$driver->getConfig()->getDefaultQueue()] = true; + $clientQueueNames[$driver->getConfig()->getRouterQueue()] = true; - foreach ($this->driver->getRouteCollection()->all() as $route) { + foreach ($driver->getRouteCollection()->all() as $route) { if ($route->getQueue()) { $clientQueueNames[$route->getQueue()] = true; } @@ -91,30 +115,45 @@ protected function execute(InputInterface $input, OutputInterface $output): ?int } foreach ($clientQueueNames as $clientQueueName) { - $queue = $this->driver->createQueue($clientQueueName); - $this->consumer->bind($queue, $this->processor); + $queue = $driver->createQueue($clientQueueName); + $consumer->bind($queue, $processor); } - $this->consumer->consume($this->getRuntimeExtensions($input, $output)); + $consumer->consume($this->getRuntimeExtensions($input, $output)); return null; } - /** - * @param InputInterface $input - * @param OutputInterface $output - * - * @return ChainExtension - */ protected function getRuntimeExtensions(InputInterface $input, OutputInterface $output): ExtensionInterface { $extensions = [new LoggerExtension(new ConsoleLogger($output))]; $extensions = array_merge($extensions, $this->getLimitsExtensions($input, $output)); - if ($setupBrokerExtension = $this->getSetupBrokerExtension($input, $this->driver)) { + $driver = $this->getDriver($input->getOption('client')); + + if ($setupBrokerExtension = $this->getSetupBrokerExtension($input, $driver)) { $extensions[] = $setupBrokerExtension; } + if ($loggerExtension = $this->getLoggerExtension($input, $output)) { + array_unshift($extensions, $loggerExtension); + } + return new ChainExtension($extensions); } + + private function getDriver(string $name): DriverInterface + { + return $this->container->get(sprintf($this->driverIdPattern, $name)); + } + + private function getQueueConsumer(string $name): QueueConsumerInterface + { + return $this->container->get(sprintf($this->queueConsumerIdPattern, $name)); + } + + private function getProcessor(string $name): Processor + { + return $this->container->get(sprintf($this->processorIdPattern, $name)); + } } diff --git a/pkg/enqueue/Symfony/Client/ProduceCommand.php b/pkg/enqueue/Symfony/Client/ProduceCommand.php new file mode 100644 index 000000000..302b344c7 --- /dev/null +++ b/pkg/enqueue/Symfony/Client/ProduceCommand.php @@ -0,0 +1,83 @@ +container = $container; + $this->producerIdPattern = $producerIdPattern; + } + + protected function configure(): void + { + $this + ->setDescription('Sends an event to the topic') + ->addArgument('message', InputArgument::REQUIRED, 'A message') + ->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', 'default') + ->addOption('topic', null, InputOption::VALUE_OPTIONAL, 'The topic to send a message to') + ->addOption('command', null, InputOption::VALUE_OPTIONAL, 'The command to send a message to') + ; + } + + protected function execute(InputInterface $input, OutputInterface $output): ?int + { + $topic = $input->getOption('topic'); + $command = $input->getOption('command'); + $message = $input->getArgument('message'); + $client = $input->getOption('client'); + + if ($topic && $command) { + throw new \LogicException('Either topic or command option should be set, both are set.'); + } + + try { + $producer = $this->getProducer($client); + } catch (NotFoundExceptionInterface $e) { + throw new \LogicException(sprintf('Client "%s" is not supported.', $client), null, $e); + } + + if ($topic) { + $producer->sendEvent($topic, $message); + + $output->writeln('An event is sent'); + } elseif ($command) { + $producer->sendCommand($command, $message); + + $output->writeln('A command is sent'); + } else { + throw new \LogicException('Either topic or command option should be set, none is set.'); + } + + return null; + } + + private function getProducer(string $client): ProducerInterface + { + return $this->container->get(sprintf($this->producerIdPattern, $client)); + } +} diff --git a/pkg/enqueue/Symfony/Client/ProduceMessageCommand.php b/pkg/enqueue/Symfony/Client/ProduceMessageCommand.php deleted file mode 100644 index 5e83903ca..000000000 --- a/pkg/enqueue/Symfony/Client/ProduceMessageCommand.php +++ /dev/null @@ -1,55 +0,0 @@ -producer = $producer; - } - - /** - * {@inheritdoc} - */ - protected function configure() - { - $this - ->setAliases(['enq:p']) - ->setDescription('A command to send a message to topic') - ->addArgument('topic', InputArgument::REQUIRED, 'A topic to send message to') - ->addArgument('message', InputArgument::REQUIRED, 'A message to send') - ; - } - - /** - * {@inheritdoc} - */ - protected function execute(InputInterface $input, OutputInterface $output) - { - $this->producer->sendEvent( - $input->getArgument('topic'), - $input->getArgument('message') - ); - - $output->writeln('Message is sent'); - } -} diff --git a/pkg/enqueue/Symfony/Client/RoutesCommand.php b/pkg/enqueue/Symfony/Client/RoutesCommand.php index 132fd1be1..605d24482 100644 --- a/pkg/enqueue/Symfony/Client/RoutesCommand.php +++ b/pkg/enqueue/Symfony/Client/RoutesCommand.php @@ -2,9 +2,10 @@ namespace Enqueue\Symfony\Client; -use Enqueue\Client\Config; +use Enqueue\Client\DriverInterface; use Enqueue\Client\Route; -use Enqueue\Client\RouteCollection; +use Psr\Container\ContainerInterface; +use Psr\Container\NotFoundExceptionInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Helper\Table; use Symfony\Component\Console\Helper\TableSeparator; @@ -17,21 +18,26 @@ final class RoutesCommand extends Command protected static $defaultName = 'enqueue:routes'; /** - * @var Config + * @var ContainerInterface */ - private $config; + private $container; /** - * @var RouteCollection + * @var string */ - private $routeCollection; + private $driverIdPatter; - public function __construct(Config $config, RouteCollection $routeCollection) + /** + * @var DriverInterface + */ + private $driver; + + public function __construct(ContainerInterface $container, string $driverIdPatter = 'enqueue.client.%s.driver') { parent::__construct(static::$defaultName); - $this->config = $config; - $this->routeCollection = $routeCollection; + $this->container = $container; + $this->driverIdPatter = $driverIdPatter; } protected function configure(): void @@ -39,12 +45,22 @@ protected function configure(): void $this ->setAliases(['debug:enqueue:routes']) ->setDescription('A command lists all registered routes.') - ->addOption('show-route-options', null, InputOption::VALUE_NONE, 'Adds ability to hide options.'); + ->addOption('show-route-options', null, InputOption::VALUE_NONE, 'Adds ability to hide options.') + ->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', 'default') + ; + + $this->driver = null; } protected function execute(InputInterface $input, OutputInterface $output): ?int { - $routes = $this->routeCollection->all(); + try { + $this->driver = $this->getDriver($input->getOption('client')); + } catch (NotFoundExceptionInterface $e) { + throw new \LogicException(sprintf('Client "%s" is not supported.', $input->getOption('client')), null, $e); + } + + $routes = $this->driver->getRouteCollection()->all(); $output->writeln(sprintf('Found %s routes', count($routes))); $output->writeln(''); @@ -73,7 +89,7 @@ protected function execute(InputInterface $input, OutputInterface $output): ?int ]); } - foreach ($this->routeCollection->all() as $route) { + foreach ($routes as $route) { if ($route->isTopic()) { continue; } @@ -119,7 +135,7 @@ private function formatProcessor(Route $route): string private function formatQueue(Route $route): string { - $queue = $route->getQueue() ?: $this->config->getDefaultQueue(); + $queue = $route->getQueue() ?: $this->driver->getConfig()->getDefaultQueue(); return $route->isPrefixQueue() ? $queue.' (prefixed)' : $queue.' (as is)'; } @@ -128,4 +144,9 @@ private function formatOptions(Route $route): string { return var_export($route->getOptions(), true); } + + private function getDriver(string $client): DriverInterface + { + return $this->container->get(sprintf($this->driverIdPatter, $client)); + } } diff --git a/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php b/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php index 9bd8ee67e..4a4f322ec 100644 --- a/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php +++ b/pkg/enqueue/Symfony/Client/SetupBrokerCommand.php @@ -3,8 +3,11 @@ namespace Enqueue\Symfony\Client; use Enqueue\Client\DriverInterface; +use Psr\Container\ContainerInterface; +use Psr\Container\NotFoundExceptionInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Logger\ConsoleLogger; use Symfony\Component\Console\Output\OutputInterface; @@ -13,38 +16,49 @@ class SetupBrokerCommand extends Command protected static $defaultName = 'enqueue:setup-broker'; /** - * @var DriverInterface + * @var ContainerInterface */ - private $driver; + private $container; /** - * @param DriverInterface $driver + * @var string */ - public function __construct(DriverInterface $driver) + private $driverIdPattern; + + public function __construct(ContainerInterface $container, string $driverIdPattern = 'enqueue.client.%s.driver') { parent::__construct(static::$defaultName); - $this->driver = $driver; + $this->container = $container; + $this->driverIdPattern = $driverIdPattern; } - /** - * {@inheritdoc} - */ - protected function configure() + protected function configure(): void { $this ->setAliases(['enq:sb']) - ->setDescription('Creates all required queues') + ->setDescription('Setup broker. Configure the broker, creates queues, topics and so on.') + ->addOption('client', 'c', InputOption::VALUE_OPTIONAL, 'The client to consume messages from.', 'default') ; } - /** - * {@inheritdoc} - */ - protected function execute(InputInterface $input, OutputInterface $output) + protected function execute(InputInterface $input, OutputInterface $output): ?int { - $output->writeln('Setup Broker'); + $client = $input->getOption('client'); - $this->driver->setupBroker(new ConsoleLogger($output)); + try { + $this->getDriver($client)->setupBroker(new ConsoleLogger($output)); + } catch (NotFoundExceptionInterface $e) { + throw new \LogicException(sprintf('Client "%s" is not supported.', $client), null, $e); + } + + $output->writeln('Broker set up'); + + return null; + } + + private function getDriver(string $client): DriverInterface + { + return $this->container->get(sprintf($this->driverIdPattern, $client)); } } diff --git a/pkg/enqueue/Symfony/Consumption/ChooseLoggerCommandTrait.php b/pkg/enqueue/Symfony/Consumption/ChooseLoggerCommandTrait.php new file mode 100644 index 000000000..c229c14b0 --- /dev/null +++ b/pkg/enqueue/Symfony/Consumption/ChooseLoggerCommandTrait.php @@ -0,0 +1,35 @@ +addOption('logger', null, InputOption::VALUE_OPTIONAL, 'A logger to be used. Could be "default", "null", "stdout".', 'default') + ; + } + + protected function getLoggerExtension(InputInterface $input, OutputInterface $output): ?LoggerExtension + { + $logger = $input->getOption('logger'); + switch ($logger) { + case 'null': + return new LoggerExtension(new NullLogger()); + case 'stdout': + return new LoggerExtension(new ConsoleLogger($output)); + case 'default': + return null; + default: + throw new \LogicException(sprintf('The logger "%s" is not supported', $logger)); + } + } +} diff --git a/pkg/enqueue/Symfony/Consumption/ConfigurableConsumeCommand.php b/pkg/enqueue/Symfony/Consumption/ConfigurableConsumeCommand.php new file mode 100644 index 000000000..68ae6dae2 --- /dev/null +++ b/pkg/enqueue/Symfony/Consumption/ConfigurableConsumeCommand.php @@ -0,0 +1,118 @@ +container = $container; + $this->queueConsumerIdPattern = $queueConsumerIdPattern; + $this->processorRegistryIdPattern = $processorRegistryIdPattern; + } + + protected function configure(): void + { + $this->configureLimitsExtensions(); + $this->configureQueueConsumerOptions(); + $this->configureLoggerExtension(); + + $this + ->setDescription('A worker that consumes message from a broker. '. + 'To use this broker you have to explicitly set a queue to consume from '. + 'and a message processor service') + ->addArgument('processor', InputArgument::REQUIRED, 'A message processor.') + ->addArgument('queues', InputArgument::OPTIONAL | InputArgument::IS_ARRAY, 'A queue to consume from', []) + ->addOption('transport', 't', InputOption::VALUE_OPTIONAL, 'The transport to consume messages from.', 'default') + ; + } + + protected function execute(InputInterface $input, OutputInterface $output): ?int + { + $transport = $input->getOption('transport'); + + try { + $consumer = $this->getQueueConsumer($transport); + } catch (NotFoundExceptionInterface $e) { + throw new \LogicException(sprintf('Transport "%s" is not supported.', $transport), null, $e); + } + + $this->setQueueConsumerOptions($consumer, $input); + + $processor = $this->getProcessorRegistry($transport)->get($input->getArgument('processor')); + + $queues = $input->getArgument('queues'); + if (empty($queues) && $processor instanceof QueueSubscriberInterface) { + $queues = $processor::getSubscribedQueues(); + } + + if (empty($queues)) { + throw new \LogicException(sprintf( + 'The queue is not provided. The processor must implement "%s" interface and it must return not empty array of queues or a queue set using as a second argument.', + QueueSubscriberInterface::class + )); + } + + $extensions = $this->getLimitsExtensions($input, $output); + + if ($loggerExtension = $this->getLoggerExtension($input, $output)) { + array_unshift($extensions, $loggerExtension); + } + + foreach ($queues as $queue) { + $consumer->bind($queue, $processor); + } + + $consumer->consume(new ChainExtension($extensions)); + + return null; + } + + private function getQueueConsumer(string $name): QueueConsumerInterface + { + return $this->container->get(sprintf($this->queueConsumerIdPattern, $name)); + } + + private function getProcessorRegistry(string $name): ProcessorRegistryInterface + { + return $this->container->get(sprintf($this->processorRegistryIdPattern, $name)); + } +} diff --git a/pkg/enqueue/Symfony/Consumption/ConsumeCommand.php b/pkg/enqueue/Symfony/Consumption/ConsumeCommand.php new file mode 100644 index 000000000..632a7de17 --- /dev/null +++ b/pkg/enqueue/Symfony/Consumption/ConsumeCommand.php @@ -0,0 +1,86 @@ + QueueConsumerInterface]. + * + * @param QueueConsumerInterface[] + */ + public function __construct(ContainerInterface $container, string $queueConsumerIdPattern = 'enqueue.transport.%s.queue_consumer') + { + parent::__construct(static::$defaultName); + + $this->container = $container; + $this->queueConsumerIdPattern = $queueConsumerIdPattern; + } + + protected function configure(): void + { + $this->configureLimitsExtensions(); + $this->configureQueueConsumerOptions(); + $this->configureLoggerExtension(); + + $this + ->addOption('transport', 't', InputOption::VALUE_OPTIONAL, 'The transport to consume messages from.', 'default') + ->setDescription('A worker that consumes message from a broker. '. + 'To use this broker you have to configure queue consumer before adding to the command') + ; + } + + protected function execute(InputInterface $input, OutputInterface $output): ?int + { + $transport = $input->getOption('transport'); + + try { + // QueueConsumer must be pre configured outside of the command! + $consumer = $this->getQueueConsumer($transport); + } catch (NotFoundExceptionInterface $e) { + throw new \LogicException(sprintf('Transport "%s" is not supported.', $transport), null, $e); + } + + $this->setQueueConsumerOptions($consumer, $input); + + $extensions = $this->getLimitsExtensions($input, $output); + + if ($loggerExtension = $this->getLoggerExtension($input, $output)) { + array_unshift($extensions, $loggerExtension); + } + + $consumer->consume(new ChainExtension($extensions)); + + return null; + } + + private function getQueueConsumer(string $name): QueueConsumerInterface + { + return $this->container->get(sprintf($this->queueConsumerIdPattern, $name)); + } +} diff --git a/pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php b/pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php deleted file mode 100644 index 06d88a9ad..000000000 --- a/pkg/enqueue/Symfony/Consumption/ConsumeMessagesCommand.php +++ /dev/null @@ -1,66 +0,0 @@ -consumer = $consumer; - } - - /** - * {@inheritdoc} - */ - protected function configure() - { - $this->configureLimitsExtensions(); - $this->configureQueueConsumerOptions(); - - $this - ->setDescription('A worker that consumes message from a broker. '. - 'To use this broker you have to configure queue consumer before adding to the command') - ; - } - - /** - * {@inheritdoc} - */ - protected function execute(InputInterface $input, OutputInterface $output) - { - $this->setQueueConsumerOptions($this->consumer, $input); - - $extensions = $this->getLimitsExtensions($input, $output); - array_unshift($extensions, new LoggerExtension(new ConsoleLogger($output))); - - $runtimeExtensions = new ChainExtension($extensions); - - $this->consumer->consume($runtimeExtensions); - } -} diff --git a/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php b/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php deleted file mode 100644 index e7607f6d6..000000000 --- a/pkg/enqueue/Symfony/Consumption/ContainerAwareConsumeMessagesCommand.php +++ /dev/null @@ -1,101 +0,0 @@ -consumer = $consumer; - } - - /** - * {@inheritdoc} - */ - protected function configure() - { - $this->configureLimitsExtensions(); - $this->configureQueueConsumerOptions(); - - $this - ->setDescription('A worker that consumes message from a broker. '. - 'To use this broker you have to explicitly set a queue to consume from '. - 'and a message processor service') - ->addArgument('processor-service', InputArgument::REQUIRED, 'A message processor service') - ->addOption('queue', null, InputOption::VALUE_IS_ARRAY | InputOption::VALUE_OPTIONAL, 'Queues to consume from', []) - ; - } - - /** - * {@inheritdoc} - */ - protected function execute(InputInterface $input, OutputInterface $output) - { - $this->setQueueConsumerOptions($this->consumer, $input); - - /** @var Processor $processor */ - $processor = $this->container->get($input->getArgument('processor-service')); - if (false == $processor instanceof Processor) { - throw new \LogicException(sprintf( - 'Invalid message processor service given. It must be an instance of %s but %s', - Processor::class, - get_class($processor) - )); - } - - $queues = $input->getOption('queue'); - if (empty($queues) && $processor instanceof QueueSubscriberInterface) { - $queues = $processor::getSubscribedQueues(); - } - - if (empty($queues)) { - throw new \LogicException(sprintf( - 'The queues are not provided. The processor must implement "%s" interface and it must return not empty array of queues or queues set using --queue option.', - QueueSubscriberInterface::class - )); - } - - $extensions = $this->getLimitsExtensions($input, $output); - array_unshift($extensions, new LoggerExtension(new ConsoleLogger($output))); - - $runtimeExtensions = new ChainExtension($extensions); - - foreach ($queues as $queue) { - $this->consumer->bind($queue, $processor); - } - - $this->consumer->consume($runtimeExtensions); - } -} diff --git a/pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php b/pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php index 00f75f0ea..c20e129b1 100644 --- a/pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php +++ b/pkg/enqueue/Symfony/Consumption/QueueConsumerOptionsCommandTrait.php @@ -14,7 +14,7 @@ trait QueueConsumerOptionsCommandTrait protected function configureQueueConsumerOptions() { $this - ->addOption('idle-timeout', null, InputOption::VALUE_REQUIRED, 'The time in milliseconds queue consumer idle if no message has been received.') + ->addOption('idle-time', null, InputOption::VALUE_REQUIRED, 'The time in milliseconds queue consumer idle if no message has been received.') ->addOption('receive-timeout', null, InputOption::VALUE_REQUIRED, 'The time in milliseconds queue consumer waits for a message.') ; } @@ -25,7 +25,7 @@ protected function configureQueueConsumerOptions() */ protected function setQueueConsumerOptions(QueueConsumerInterface $consumer, InputInterface $input) { - if (null !== $idleTimeout = $input->getOption('idle-timeout')) { + if (null !== $idleTimeout = $input->getOption('idle-time')) { $consumer->setIdleTimeout((float) $idleTimeout); } diff --git a/pkg/enqueue/Symfony/Client/ContainerProcessorRegistry.php b/pkg/enqueue/Symfony/ContainerProcessorRegistry.php similarity index 88% rename from pkg/enqueue/Symfony/Client/ContainerProcessorRegistry.php rename to pkg/enqueue/Symfony/ContainerProcessorRegistry.php index 4031e423e..b259d2379 100644 --- a/pkg/enqueue/Symfony/Client/ContainerProcessorRegistry.php +++ b/pkg/enqueue/Symfony/ContainerProcessorRegistry.php @@ -1,8 +1,8 @@ name = $transportName; + } + + public function process(ContainerBuilder $container): void + { + $extensionsId = sprintf('enqueue.transport.%s.consumption_extensions', $this->name); + if (false == $container->hasDefinition($extensionsId)) { + return; + } + + $tags = $container->findTaggedServiceIds('enqueue.transport.consumption_extension'); + + $groupByPriority = []; + foreach ($tags as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $transport = $tagAttribute['transport'] ?? 'default'; + + if ($transport !== $this->name && 'all' !== $transport) { + continue; + } + + $priority = (int) ($tagAttribute['priority'] ?? 0); + + $groupByPriority[$priority][] = new Reference($serviceId); + } + } + + krsort($groupByPriority, SORT_NUMERIC); + + $flatExtensions = []; + foreach ($groupByPriority as $extension) { + $flatExtensions = array_merge($flatExtensions, $extension); + } + + $extensionsService = $container->getDefinition($extensionsId); + $extensionsService->replaceArgument(0, array_merge( + $extensionsService->getArgument(0), + $flatExtensions + )); + } +} diff --git a/pkg/enqueue/Symfony/DependencyInjection/BuildProcessorRegistryPass.php b/pkg/enqueue/Symfony/DependencyInjection/BuildProcessorRegistryPass.php new file mode 100644 index 000000000..8493dc7f9 --- /dev/null +++ b/pkg/enqueue/Symfony/DependencyInjection/BuildProcessorRegistryPass.php @@ -0,0 +1,52 @@ +name = $transportName; + } + + public function process(ContainerBuilder $container): void + { + $processorRegistryId = sprintf('enqueue.transport.%s.processor_registry', $this->name); + if (false == $container->hasDefinition($processorRegistryId)) { + return; + } + + $tag = 'enqueue.transport.processor'; + $map = []; + foreach ($container->findTaggedServiceIds($tag) as $serviceId => $tagAttributes) { + foreach ($tagAttributes as $tagAttribute) { + $transport = $tagAttribute['transport'] ?? 'default'; + + if ($transport !== $this->name && 'all' !== $transport) { + continue; + } + + $processor = $tagAttribute['processor'] ?? $serviceId; + + $map[$processor] = new Reference($serviceId); + } + } + + $registry = $container->getDefinition($processorRegistryId); + $registry->setArgument(0, ServiceLocatorTagPass::register($container, $map, $processorRegistryId)); + } +} diff --git a/pkg/enqueue/Symfony/DependencyInjection/FormatClientNameTrait.php b/pkg/enqueue/Symfony/DependencyInjection/FormatClientNameTrait.php new file mode 100644 index 000000000..8065fc3c4 --- /dev/null +++ b/pkg/enqueue/Symfony/DependencyInjection/FormatClientNameTrait.php @@ -0,0 +1,17 @@ +getName()); + + return $parameter ? "%$fullName%" : $fullName; + } +} diff --git a/pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php b/pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php new file mode 100644 index 000000000..17a41d0b9 --- /dev/null +++ b/pkg/enqueue/Symfony/DependencyInjection/FormatTransportNameTrait.php @@ -0,0 +1,17 @@ +getName()); + + return $parameter ? "%$fullName%" : $fullName; + } +} diff --git a/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php b/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php index 7a80eea41..12cc5faa2 100644 --- a/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php +++ b/pkg/enqueue/Symfony/DependencyInjection/TransportFactory.php @@ -4,7 +4,13 @@ use Enqueue\ConnectionFactoryFactory; use Enqueue\ConnectionFactoryFactoryInterface; +use Enqueue\Consumption\ChainExtension; +use Enqueue\Consumption\QueueConsumer; +use Enqueue\Consumption\QueueConsumerInterface; use Enqueue\Resources; +use Enqueue\Rpc\RpcClient; +use Enqueue\Rpc\RpcFactory; +use Enqueue\Symfony\ContainerProcessorRegistry; use Interop\Queue\ConnectionFactory; use Interop\Queue\Context; use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; @@ -16,6 +22,8 @@ */ final class TransportFactory { + use FormatTransportNameTrait; + /** * @var string */ @@ -30,7 +38,7 @@ public function __construct(string $name) $this->name = $name; } - public function addConfiguration(ArrayNodeDefinition $builder): void + public function addTransportConfiguration(ArrayNodeDefinition $builder): void { $knownSchemes = array_keys(Resources::getKnownSchemes()); $availableSchemes = array_keys(Resources::getAvailableSchemes()); @@ -86,11 +94,33 @@ public function addConfiguration(ArrayNodeDefinition $builder): void ; } - public function createConnectionFactory(ContainerBuilder $container, array $config): string + public function addQueueConsumerConfiguration(ArrayNodeDefinition $builder): void + { + $builder + ->addDefaultsIfNotSet()->children() + ->integerNode('idle_time') + ->min(0) + ->defaultValue(0) + ->info('the time in milliseconds queue consumer waits if no message received') + ->end() + ->integerNode('receive_timeout') + ->min(0) + ->defaultValue(10000) + ->info('the time in milliseconds queue consumer waits for a message (100 ms by default)') + ->end() + ; + } + + public function getName(): string + { + return $this->name; + } + + public function buildConnectionFactory(ContainerBuilder $container, array $config): void { - $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); - $factoryFactoryId = sprintf('enqueue.transport.%s.connection_factory_factory', $this->getName()); + $factoryId = $this->format('connection_factory'); + $factoryFactoryId = $this->format('connection_factory_factory'); $container->register($factoryFactoryId, $config['factory_class'] ?? ConnectionFactoryFactory::class); $factoryFactoryService = new Reference( @@ -113,23 +143,85 @@ public function createConnectionFactory(ContainerBuilder $container, array $conf ; } - return $factoryId; + if ('default' === $this->name) { + $container->setAlias(ConnectionFactory::class, $this->format('connection_factory')); + } } - public function createContext(ContainerBuilder $container, array $config): string + public function buildContext(ContainerBuilder $container, array $config): void { - $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); - $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $factoryId = $this->format('connection_factory'); + $this->assertServiceExists($container, $factoryId); + + $contextId = $this->format('context'); $container->register($contextId, Context::class) ->setFactory([new Reference($factoryId), 'createContext']) ; - return $contextId; + if ('default' === $this->name) { + $container->setAlias(Context::class, $this->format('context')); + } } - public function getName(): string + public function buildQueueConsumer(ContainerBuilder $container, array $config): void { - return $this->name; + $contextId = $this->format('context'); + $this->assertServiceExists($container, $contextId); + + $container->setParameter($this->format('idle_time'), $config['idle_time'] ?? 0); + $container->setParameter($this->format('receive_timeout'), $config['receive_timeout'] ?? 10000); + + $container->register($this->format('consumption_extensions'), ChainExtension::class) + ->addArgument([]) + ; + + $container->register($this->format('queue_consumer'), QueueConsumer::class) + ->addArgument(new Reference($contextId)) + ->addArgument(new Reference($this->format('consumption_extensions'))) + ->addArgument($this->format('idle_time', true)) + ->addArgument($this->format('receive_timeout', true)) + ; + + $container->register($this->format('processor_registry'), ContainerProcessorRegistry::class); + + $locatorId = 'enqueue.locator'; + if ($container->hasDefinition($locatorId)) { + $locator = $container->getDefinition($locatorId); + $locator->replaceArgument(0, array_replace($locator->getArgument(0), [ + $this->format('queue_consumer') => new Reference($this->format('queue_consumer')), + $this->format('processor_registry') => new Reference($this->format('processor_registry')), + ])); + } + + if ('default' === $this->name) { + $container->setAlias(QueueConsumerInterface::class, $this->format('queue_consumer')); + } + } + + public function buildRpcClient(ContainerBuilder $container, array $config): void + { + $contextId = $this->format('context'); + $this->assertServiceExists($container, $contextId); + + $container->register($this->format('rpc_factory'), RpcFactory::class) + ->addArgument(new Reference($contextId)) + ; + + $container->register($this->format('rpc_client'), RpcClient::class) + ->addArgument(new Reference($contextId)) + ->addArgument(new Reference($this->format('rpc_factory'))) + ; + + if ('default' === $this->name) { + $container->setAlias(RpcClient::class, $this->format('rpc_client')); + } + } + + private function assertServiceExists(ContainerBuilder $container, string $serviceId): void + { + if (false == $container->hasDefinition($serviceId)) { + throw new \InvalidArgumentException(sprintf('The service "%s" does not exist.', $serviceId)); + } } } diff --git a/pkg/enqueue/Tests/Client/ArrayProcessorRegistryTest.php b/pkg/enqueue/Tests/ArrayProcessorRegistryTest.php similarity index 92% rename from pkg/enqueue/Tests/Client/ArrayProcessorRegistryTest.php rename to pkg/enqueue/Tests/ArrayProcessorRegistryTest.php index f49ec8ff8..50c802582 100644 --- a/pkg/enqueue/Tests/Client/ArrayProcessorRegistryTest.php +++ b/pkg/enqueue/Tests/ArrayProcessorRegistryTest.php @@ -1,9 +1,9 @@ setExpectedException( - \LogicException::class, - 'Got message without required parameter: "enqueue.processor"' - ); + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Got message without required parameter: "enqueue.processor"'); $processor = new DelegateProcessor($this->createProcessorRegistryMock()); $processor->process(new NullMessage(), $this->createContextMock()); diff --git a/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/ConsumeCommandTest.php similarity index 65% rename from pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php rename to pkg/enqueue/Tests/Symfony/Client/ConsumeCommandTest.php index a33905a8e..54e15edbd 100644 --- a/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/ConsumeCommandTest.php @@ -9,73 +9,49 @@ use Enqueue\Client\RouteCollection; use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\QueueConsumerInterface; +use Enqueue\Container\Container; use Enqueue\Null\NullQueue; -use Enqueue\Symfony\Client\ConsumeMessagesCommand; -use Interop\Queue\Context; +use Enqueue\Symfony\Client\ConsumeCommand; use PHPUnit\Framework\TestCase; +use Psr\Container\ContainerInterface; use Symfony\Component\Console\Tester\CommandTester; -class ConsumeMessagesCommandTest extends TestCase +class ConsumeCommandTest extends TestCase { public function testCouldBeConstructedWithRequiredAttributes() { - new ConsumeMessagesCommand( - $this->createQueueConsumerMock(), - $this->createDelegateProcessorMock(), - $this->createDriverStub() - ); + new ConsumeCommand($this->createMock(ContainerInterface::class)); } public function testShouldHaveCommandName() { - $command = new ConsumeMessagesCommand( - $this->createQueueConsumerMock(), - $this->createDelegateProcessorMock(), - $this->createDriverStub() - ); + $command = new ConsumeCommand($this->createMock(ContainerInterface::class)); $this->assertEquals('enqueue:consume', $command->getName()); } - public function testShouldHaveCommandAliases() - { - $command = new ConsumeMessagesCommand( - $this->createQueueConsumerMock(), - $this->createDelegateProcessorMock(), - $this->createDriverStub() - ); - - $this->assertEquals(['enq:c'], $command->getAliases()); - } - public function testShouldHaveExpectedOptions() { - $command = new ConsumeMessagesCommand( - $this->createQueueConsumerMock(), - $this->createDelegateProcessorMock(), - $this->createDriverStub() - ); + $command = new ConsumeCommand($this->createMock(ContainerInterface::class)); $options = $command->getDefinition()->getOptions(); - $this->assertCount(8, $options); + $this->assertCount(10, $options); $this->assertArrayHasKey('memory-limit', $options); $this->assertArrayHasKey('message-limit', $options); $this->assertArrayHasKey('time-limit', $options); - $this->assertArrayHasKey('setup-broker', $options); - $this->assertArrayHasKey('idle-timeout', $options); + $this->assertArrayHasKey('idle-time', $options); $this->assertArrayHasKey('receive-timeout', $options); - $this->assertArrayHasKey('skip', $options); $this->assertArrayHasKey('niceness', $options); + $this->assertArrayHasKey('client', $options); + $this->assertArrayHasKey('logger', $options); + $this->assertArrayHasKey('skip', $options); + $this->assertArrayHasKey('setup-broker', $options); } - public function testShouldHaveExpectedArguments() + public function testShouldHaveExpectedAttributes() { - $command = new ConsumeMessagesCommand( - $this->createQueueConsumerMock(), - $this->createDelegateProcessorMock(), - $this->createDriverStub() - ); + $command = new ConsumeCommand($this->createMock(ContainerInterface::class)); $arguments = $command->getDefinition()->getArguments(); @@ -91,12 +67,6 @@ public function testShouldBindDefaultQueueOnly() $processor = $this->createDelegateProcessorMock(); - $context = $this->createContextMock(); - $context - ->expects($this->never()) - ->method('close') - ; - $consumer = $this->createQueueConsumerMock(); $consumer ->expects($this->once()) @@ -117,28 +87,125 @@ public function testShouldBindDefaultQueueOnly() ->willReturn($queue) ; - $command = new ConsumeMessagesCommand($consumer, $processor, $driver); + $command = new ConsumeCommand(new Container([ + 'enqueue.client.default.queue_consumer' => $consumer, + 'enqueue.client.default.driver' => $driver, + 'enqueue.client.default.delegate_processor' => $processor, + ])); $tester = new CommandTester($command); $tester->execute([]); } - public function testShouldBindDefaultQueueIfRouteDoesNotDefineQueue() + public function testShouldUseRequestedClient() { + $defaultProcessor = $this->createDelegateProcessorMock(); + + $defaultConsumer = $this->createQueueConsumerMock(); + $defaultConsumer + ->expects($this->never()) + ->method('bind') + ; + $defaultConsumer + ->expects($this->never()) + ->method('consume') + ->with($this->isInstanceOf(ChainExtension::class)) + ; + + $defaultDriver = $this->createDriverStub(new RouteCollection([])); + $defaultDriver + ->expects($this->never()) + ->method('createQueue') + ; + $queue = new NullQueue(''); - $routeCollection = new RouteCollection([ - new Route('topic', Route::TOPIC, 'processor'), + $routeCollection = new RouteCollection([]); + + $fooProcessor = $this->createDelegateProcessorMock(); + + $fooConsumer = $this->createQueueConsumerMock(); + $fooConsumer + ->expects($this->once()) + ->method('bind') + ->with($this->identicalTo($queue), $this->identicalTo($fooProcessor)) + ; + $fooConsumer + ->expects($this->once()) + ->method('consume') + ->with($this->isInstanceOf(ChainExtension::class)) + ; + + $fooDriver = $this->createDriverStub($routeCollection); + $fooDriver + ->expects($this->once()) + ->method('createQueue') + ->with('default') + ->willReturn($queue) + ; + + $command = new ConsumeCommand(new Container([ + 'enqueue.client.default.queue_consumer' => $defaultConsumer, + 'enqueue.client.default.driver' => $defaultDriver, + 'enqueue.client.default.delegate_processor' => $defaultProcessor, + 'enqueue.client.foo.queue_consumer' => $fooConsumer, + 'enqueue.client.foo.driver' => $fooDriver, + 'enqueue.client.foo.delegate_processor' => $fooProcessor, + ])); + + $tester = new CommandTester($command); + $tester->execute([ + '--client' => 'foo', ]); + } + + public function testThrowIfNotDefinedClientRequested() + { + $routeCollection = new RouteCollection([]); $processor = $this->createDelegateProcessorMock(); - $context = $this->createContextMock(); - $context + $consumer = $this->createQueueConsumerMock(); + $consumer + ->expects($this->never()) + ->method('bind') + ; + $consumer ->expects($this->never()) - ->method('close') + ->method('consume') ; + $driver = $this->createDriverStub($routeCollection); + $driver + ->expects($this->never()) + ->method('createQueue') + ; + + $command = new ConsumeCommand(new Container([ + 'enqueue.client.default.queue_consumer' => $consumer, + 'enqueue.client.default.driver' => $driver, + 'enqueue.client.default.delegate_processor' => $processor, + ])); + + $tester = new CommandTester($command); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Client "not-defined" is not supported.'); + $tester->execute([ + '--client' => 'not-defined', + ]); + } + + public function testShouldBindDefaultQueueIfRouteUseDifferentQueue() + { + $queue = new NullQueue(''); + + $routeCollection = new RouteCollection([ + new Route('topic', Route::TOPIC, 'processor'), + ]); + + $processor = $this->createDelegateProcessorMock(); + $consumer = $this->createQueueConsumerMock(); $consumer ->expects($this->once()) @@ -159,7 +226,11 @@ public function testShouldBindDefaultQueueIfRouteDoesNotDefineQueue() ->willReturn($queue) ; - $command = new ConsumeMessagesCommand($consumer, $processor, $driver); + $command = new ConsumeCommand(new Container([ + 'enqueue.client.default.queue_consumer' => $consumer, + 'enqueue.client.default.driver' => $driver, + 'enqueue.client.default.delegate_processor' => $processor, + ])); $tester = new CommandTester($command); $tester->execute([]); @@ -207,7 +278,11 @@ public function testShouldBindCustomExecuteConsumptionAndUseCustomClientDestinat ->with($this->isInstanceOf(ChainExtension::class)) ; - $command = new ConsumeMessagesCommand($consumer, $processor, $driver); + $command = new ConsumeCommand(new Container([ + 'enqueue.client.default.queue_consumer' => $consumer, + 'enqueue.client.default.driver' => $driver, + 'enqueue.client.default.delegate_processor' => $processor, + ])); $tester = new CommandTester($command); $tester->execute([]); @@ -243,7 +318,11 @@ public function testShouldBindUserProvidedQueues() ->with($this->isInstanceOf(ChainExtension::class)) ; - $command = new ConsumeMessagesCommand($consumer, $processor, $driver); + $command = new ConsumeCommand(new Container([ + 'enqueue.client.default.queue_consumer' => $consumer, + 'enqueue.client.default.driver' => $driver, + 'enqueue.client.default.delegate_processor' => $processor, + ])); $tester = new CommandTester($command); $tester->execute([ @@ -295,12 +374,14 @@ public function testShouldBindQueuesOnlyOnce() ->with($this->isInstanceOf(ChainExtension::class)) ; - $command = new ConsumeMessagesCommand($consumer, $processor, $driver); + $command = new ConsumeCommand(new Container([ + 'enqueue.client.default.queue_consumer' => $consumer, + 'enqueue.client.default.driver' => $driver, + 'enqueue.client.default.delegate_processor' => $processor, + ])); $tester = new CommandTester($command); - $tester->execute([ -// 'client-queue-names' => ['non-default-queue'], - ]); + $tester->execute([]); } public function testShouldSkipQueueConsumptionAndUseCustomClientDestinationName() @@ -309,12 +390,6 @@ public function testShouldSkipQueueConsumptionAndUseCustomClientDestinationName( $processor = $this->createDelegateProcessorMock(); - $context = $this->createContextMock(); - $context - ->expects($this->never()) - ->method('close') - ; - $consumer = $this->createQueueConsumerMock(); $consumer ->expects($this->exactly(3)) @@ -352,7 +427,11 @@ public function testShouldSkipQueueConsumptionAndUseCustomClientDestinationName( ->willReturn($queue) ; - $command = new ConsumeMessagesCommand($consumer, $processor, $driver); + $command = new ConsumeCommand(new Container([ + 'enqueue.client.default.queue_consumer' => $consumer, + 'enqueue.client.default.driver' => $driver, + 'enqueue.client.default.delegate_processor' => $processor, + ])); $tester = new CommandTester($command); $tester->execute([ @@ -360,14 +439,6 @@ public function testShouldSkipQueueConsumptionAndUseCustomClientDestinationName( ]); } - /** - * @return \PHPUnit_Framework_MockObject_MockObject|Context - */ - private function createContextMock() - { - return $this->createMock(Context::class); - } - /** * @return \PHPUnit_Framework_MockObject_MockObject|DelegateProcessor */ diff --git a/pkg/enqueue/Tests/Symfony/Client/ProduceCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/ProduceCommandTest.php new file mode 100644 index 000000000..51756b48a --- /dev/null +++ b/pkg/enqueue/Tests/Symfony/Client/ProduceCommandTest.php @@ -0,0 +1,253 @@ +createMock(ContainerInterface::class)); + } + + public function testShouldHaveCommandName() + { + $command = new ProduceCommand($this->createMock(ContainerInterface::class)); + + $this->assertEquals('enqueue:produce', $command->getName()); + } + + public function testShouldHaveExpectedOptions() + { + $command = new ProduceCommand($this->createMock(ContainerInterface::class)); + + $options = $command->getDefinition()->getOptions(); + $this->assertCount(3, $options); + $this->assertArrayHasKey('client', $options); + $this->assertArrayHasKey('topic', $options); + $this->assertArrayHasKey('command', $options); + } + + public function testShouldHaveExpectedAttributes() + { + $command = new ProduceCommand($this->createMock(ContainerInterface::class)); + + $arguments = $command->getDefinition()->getArguments(); + $this->assertCount(1, $arguments); + + $this->assertArrayHasKey('message', $arguments); + } + + public function testThrowIfNeitherTopicNorCommandOptionsAreSet() + { + $producerMock = $this->createProducerMock(); + $producerMock + ->expects($this->never()) + ->method('sendEvent') + ; + $producerMock + ->expects($this->never()) + ->method('sendCommand') + ; + + $command = new ProduceCommand(new Container([ + 'enqueue.client.default.producer' => $producerMock, + ])); + + $tester = new CommandTester($command); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Either topic or command option should be set, none is set.'); + $tester->execute([ + 'message' => 'theMessage', + ]); + } + + public function testThrowIfBothTopicAndCommandOptionsAreSet() + { + $producerMock = $this->createProducerMock(); + $producerMock + ->expects($this->never()) + ->method('sendEvent') + ; + $producerMock + ->expects($this->never()) + ->method('sendCommand') + ; + + $command = new ProduceCommand(new Container([ + 'enqueue.client.default.producer' => $producerMock, + ])); + + $tester = new CommandTester($command); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Either topic or command option should be set, both are set.'); + $tester->execute([ + 'message' => 'theMessage', + '--topic' => 'theTopic', + '--command' => 'theCommand', + ]); + } + + public function testShouldSendEventToDefaultTransport() + { + $producerMock = $this->createProducerMock(); + $producerMock + ->expects($this->once()) + ->method('sendEvent') + ->with('theTopic', 'theMessage') + ; + $producerMock + ->expects($this->never()) + ->method('sendCommand') + ; + + $command = new ProduceCommand(new Container([ + 'enqueue.client.default.producer' => $producerMock, + ])); + + $tester = new CommandTester($command); + $tester->execute([ + 'message' => 'theMessage', + '--topic' => 'theTopic', + ]); + } + + public function testShouldSendCommandToDefaultTransport() + { + $producerMock = $this->createProducerMock(); + $producerMock + ->expects($this->once()) + ->method('sendCommand') + ->with('theCommand', 'theMessage') + ; + $producerMock + ->expects($this->never()) + ->method('sendEvent') + ; + + $command = new ProduceCommand(new Container([ + 'enqueue.client.default.producer' => $producerMock, + ])); + + $tester = new CommandTester($command); + $tester->execute([ + 'message' => 'theMessage', + '--command' => 'theCommand', + ]); + } + + public function testShouldSendEventToFooTransport() + { + $defaultProducerMock = $this->createProducerMock(); + $defaultProducerMock + ->expects($this->never()) + ->method('sendEvent') + ; + $defaultProducerMock + ->expects($this->never()) + ->method('sendCommand') + ; + + $fooProducerMock = $this->createProducerMock(); + $fooProducerMock + ->expects($this->once()) + ->method('sendEvent') + ->with('theTopic', 'theMessage') + ; + $fooProducerMock + ->expects($this->never()) + ->method('sendCommand') + ; + + $command = new ProduceCommand(new Container([ + 'enqueue.client.default.producer' => $defaultProducerMock, + 'enqueue.client.foo.producer' => $fooProducerMock, + ])); + + $tester = new CommandTester($command); + $tester->execute([ + 'message' => 'theMessage', + '--topic' => 'theTopic', + '--client' => 'foo', + ]); + } + + public function testShouldSendCommandToFooTransport() + { + $defaultProducerMock = $this->createProducerMock(); + $defaultProducerMock + ->expects($this->never()) + ->method('sendEvent') + ; + $defaultProducerMock + ->expects($this->never()) + ->method('sendCommand') + ; + + $fooProducerMock = $this->createProducerMock(); + $fooProducerMock + ->expects($this->once()) + ->method('sendCommand') + ->with('theCommand', 'theMessage') + ; + $fooProducerMock + ->expects($this->never()) + ->method('sendEvent') + ; + + $command = new ProduceCommand(new Container([ + 'enqueue.client.default.producer' => $defaultProducerMock, + 'enqueue.client.foo.producer' => $fooProducerMock, + ])); + + $tester = new CommandTester($command); + $tester->execute([ + 'message' => 'theMessage', + '--command' => 'theCommand', + '--client' => 'foo', + ]); + } + + public function testThrowIfClientNotFound() + { + $defaultProducerMock = $this->createProducerMock(); + $defaultProducerMock + ->expects($this->never()) + ->method('sendEvent') + ; + $defaultProducerMock + ->expects($this->never()) + ->method('sendCommand') + ; + + $command = new ProduceCommand(new Container([ + 'enqueue.client.default.producer' => $defaultProducerMock, + ])); + + $tester = new CommandTester($command); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Client "bar" is not supported.'); + $tester->execute([ + 'message' => 'theMessage', + '--command' => 'theCommand', + '--client' => 'bar', + ]); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|ProducerInterface + */ + private function createProducerMock() + { + return $this->createMock(ProducerInterface::class); + } +} diff --git a/pkg/enqueue/Tests/Symfony/Client/ProduceMessageCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/ProduceMessageCommandTest.php deleted file mode 100644 index 2cd80952e..000000000 --- a/pkg/enqueue/Tests/Symfony/Client/ProduceMessageCommandTest.php +++ /dev/null @@ -1,75 +0,0 @@ -createProducerMock()); - } - - public function testShouldHaveCommandName() - { - $command = new ProduceMessageCommand($this->createProducerMock()); - - $this->assertEquals('enqueue:produce', $command->getName()); - } - - public function testShouldHaveCommandAliases() - { - $command = new ProduceMessageCommand($this->createProducerMock()); - - $this->assertEquals(['enq:p'], $command->getAliases()); - } - - public function testShouldHaveExpectedOptions() - { - $command = new ProduceMessageCommand($this->createProducerMock()); - - $options = $command->getDefinition()->getOptions(); - $this->assertCount(0, $options); - } - - public function testShouldHaveExpectedAttributes() - { - $command = new ProduceMessageCommand($this->createProducerMock()); - - $arguments = $command->getDefinition()->getArguments(); - $this->assertCount(2, $arguments); - - $this->assertArrayHasKey('topic', $arguments); - $this->assertArrayHasKey('message', $arguments); - } - - public function testShouldExecuteConsumptionAndUseDefaultQueueName() - { - $producerMock = $this->createProducerMock(); - $producerMock - ->expects($this->once()) - ->method('sendEvent') - ->with('theTopic', 'theMessage') - ; - - $command = new ProduceMessageCommand($producerMock); - - $tester = new CommandTester($command); - $tester->execute([ - 'topic' => 'theTopic', - 'message' => 'theMessage', - ]); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|ProducerInterface - */ - private function createProducerMock() - { - return $this->createMock(ProducerInterface::class); - } -} diff --git a/pkg/enqueue/Tests/Symfony/Client/RoutesCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/RoutesCommandTest.php index 6100b64e2..4f518848d 100644 --- a/pkg/enqueue/Tests/Symfony/Client/RoutesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/RoutesCommandTest.php @@ -3,11 +3,14 @@ namespace Enqueue\Tests\Symfony\Client; use Enqueue\Client\Config; +use Enqueue\Client\DriverInterface; use Enqueue\Client\Route; use Enqueue\Client\RouteCollection; +use Enqueue\Container\Container; use Enqueue\Symfony\Client\RoutesCommand; use Enqueue\Test\ClassExtensionTrait; use PHPUnit\Framework\TestCase; +use Psr\Container\ContainerInterface; use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Tester\CommandTester; @@ -27,36 +30,37 @@ public function testShouldBeFinal() public function testCouldBeConstructedWithConfigAndRouteCollectionAsArguments() { - new RoutesCommand(Config::create(), new RouteCollection([])); + new RoutesCommand($this->createMock(ContainerInterface::class)); } public function testShouldHaveCommandName() { - $command = new RoutesCommand(Config::create(), new RouteCollection([])); + $command = new RoutesCommand($this->createMock(ContainerInterface::class)); $this->assertEquals('enqueue:routes', $command->getName()); } public function testShouldHaveCommandAliases() { - $command = new RoutesCommand(Config::create(), new RouteCollection([])); + $command = new RoutesCommand($this->createMock(ContainerInterface::class)); $this->assertEquals(['debug:enqueue:routes'], $command->getAliases()); } public function testShouldHaveExpectedOptions() { - $command = new RoutesCommand(Config::create(), new RouteCollection([])); + $command = new RoutesCommand($this->createMock(ContainerInterface::class)); $options = $command->getDefinition()->getOptions(); - $this->assertCount(1, $options); + $this->assertCount(2, $options); $this->assertArrayHasKey('show-route-options', $options); + $this->assertArrayHasKey('client', $options); } public function testShouldHaveExpectedAttributes() { - $command = new RoutesCommand(Config::create(), new RouteCollection([])); + $command = new RoutesCommand($this->createMock(ContainerInterface::class)); $arguments = $command->getDefinition()->getArguments(); $this->assertCount(0, $arguments); @@ -66,7 +70,9 @@ public function testShouldOutputEmptyRouteCollection() { $routeCollection = new RouteCollection([]); - $command = new RoutesCommand(Config::create(), $routeCollection); + $command = new RoutesCommand(new Container([ + 'enqueue.client.default.driver' => $this->createDriverStub(Config::create(), $routeCollection), + ])); $tester = new CommandTester($command); @@ -81,6 +87,64 @@ public function testShouldOutputEmptyRouteCollection() $this->assertCommandOutput($expectedOutput, $tester); } + public function testShouldUseFooDriver() + { + $routeCollection = new RouteCollection([ + new Route('fooTopic', Route::TOPIC, 'processor'), + ]); + + $defaultDriverMock = $this->createMock(DriverInterface::class); + $defaultDriverMock + ->expects($this->never()) + ->method('getConfig') + ; + + $defaultDriverMock + ->expects($this->never()) + ->method('getRouteCollection') + ; + + $fooDriverMock = $this->createDriverStub(Config::create(), $routeCollection); + + $command = new RoutesCommand(new Container([ + 'enqueue.client.default.driver' => $defaultDriverMock, + 'enqueue.client.foo.driver' => $fooDriverMock, + ])); + + $tester = new CommandTester($command); + $tester->execute([ + '--client' => 'foo', + ]); + + $this->assertContains('Found 1 routes', $tester->getDisplay()); + } + + public function testThrowIfClientNotFound() + { + $defaultDriverMock = $this->createMock(DriverInterface::class); + $defaultDriverMock + ->expects($this->never()) + ->method('getConfig') + ; + + $defaultDriverMock + ->expects($this->never()) + ->method('getRouteCollection') + ; + + $command = new RoutesCommand(new Container([ + 'enqueue.client.default.driver' => $defaultDriverMock, + ])); + + $tester = new CommandTester($command); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Client "foo" is not supported.'); + $tester->execute([ + '--client' => 'foo', + ]); + } + public function testShouldOutputTopicRouteInfo() { $routeCollection = new RouteCollection([ @@ -88,7 +152,9 @@ public function testShouldOutputTopicRouteInfo() new Route('barTopic', Route::TOPIC, 'processor'), ]); - $command = new RoutesCommand(Config::create(), $routeCollection); + $command = new RoutesCommand(new Container([ + 'enqueue.client.default.driver' => $this->createDriverStub(Config::create(), $routeCollection), + ])); $tester = new CommandTester($command); @@ -116,7 +182,9 @@ public function testShouldOutputCommandRouteInfo() new Route('barCommand', Route::COMMAND, 'processor', ['foo' => 'fooVal', 'bar' => 'barVal']), ]); - $command = new RoutesCommand(Config::create(), $routeCollection); + $command = new RoutesCommand(new Container([ + 'enqueue.client.default.driver' => $this->createDriverStub(Config::create(), $routeCollection), + ])); $tester = new CommandTester($command); @@ -144,7 +212,9 @@ public function testShouldCorrectlyOutputPrefixedCustomQueue() new Route('barTopic', Route::TOPIC, 'processor', ['queue' => 'bar']), ]); - $command = new RoutesCommand(Config::create(), $routeCollection); + $command = new RoutesCommand(new Container([ + 'enqueue.client.default.driver' => $this->createDriverStub(Config::create(), $routeCollection), + ])); $tester = new CommandTester($command); @@ -173,7 +243,9 @@ public function testShouldCorrectlyOutputNotPrefixedCustomQueue() new Route('barTopic', Route::TOPIC, 'processor', ['queue' => 'bar', 'prefix_queue' => false]), ]); - $command = new RoutesCommand(Config::create(), $routeCollection); + $command = new RoutesCommand(new Container([ + 'enqueue.client.default.driver' => $this->createDriverStub(Config::create(), $routeCollection), + ])); $tester = new CommandTester($command); @@ -201,7 +273,9 @@ public function testShouldCorrectlyOutputExternalRoute() new Route('barTopic', Route::TOPIC, 'processor', ['external' => true]), ]); - $command = new RoutesCommand(Config::create(), $routeCollection); + $command = new RoutesCommand(new Container([ + 'enqueue.client.default.driver' => $this->createDriverStub(Config::create(), $routeCollection), + ])); $tester = new CommandTester($command); @@ -229,7 +303,9 @@ public function testShouldOutputRouteOptions() new Route('barTopic', Route::TOPIC, 'processor', ['bar' => 'barVal']), ]); - $command = new RoutesCommand(Config::create(), $routeCollection); + $command = new RoutesCommand(new Container([ + 'enqueue.client.default.driver' => $this->createDriverStub(Config::create(), $routeCollection), + ])); $tester = new CommandTester($command); @@ -254,6 +330,27 @@ public function testShouldOutputRouteOptions() $this->assertCommandOutput($expectedOutput, $tester); } + /** + * @return \PHPUnit_Framework_MockObject_MockObject + */ + private function createDriverStub(Config $config, RouteCollection $routeCollection): DriverInterface + { + $driverMock = $this->createMock(DriverInterface::class); + $driverMock + ->expects($this->any()) + ->method('getConfig') + ->willReturn($config) + ; + + $driverMock + ->expects($this->any()) + ->method('getRouteCollection') + ->willReturn($routeCollection) + ; + + return $driverMock; + } + private function assertCommandOutput(string $expected, CommandTester $tester): void { $this->assertSame(0, $tester->getStatusCode()); diff --git a/pkg/enqueue/Tests/Symfony/Client/SetupBrokerCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/SetupBrokerCommandTest.php index 740d6d1c2..65ee3063f 100644 --- a/pkg/enqueue/Tests/Symfony/Client/SetupBrokerCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/SetupBrokerCommandTest.php @@ -3,32 +3,53 @@ namespace Enqueue\Tests\Symfony\Client; use Enqueue\Client\DriverInterface; +use Enqueue\Container\Container; use Enqueue\Symfony\Client\SetupBrokerCommand; use PHPUnit\Framework\TestCase; +use Psr\Container\ContainerInterface; use Symfony\Component\Console\Tester\CommandTester; class SetupBrokerCommandTest extends TestCase { - public function testCouldBeConstructedWithRequiredAttributes() + public function testCouldBeConstructedWithContainerAsFirstArgument() { - new \Enqueue\Symfony\Client\SetupBrokerCommand($this->createClientDriverMock()); + new SetupBrokerCommand($this->createMock(ContainerInterface::class)); } public function testShouldHaveCommandName() { - $command = new SetupBrokerCommand($this->createClientDriverMock()); + $command = new SetupBrokerCommand($this->createMock(ContainerInterface::class)); $this->assertEquals('enqueue:setup-broker', $command->getName()); } public function testShouldHaveCommandAliases() { - $command = new SetupBrokerCommand($this->createClientDriverMock()); + $command = new SetupBrokerCommand($this->createMock(ContainerInterface::class)); $this->assertEquals(['enq:sb'], $command->getAliases()); } - public function testShouldCreateQueues() + public function testShouldHaveExpectedOptions() + { + $command = new SetupBrokerCommand($this->createMock(ContainerInterface::class)); + + $options = $command->getDefinition()->getOptions(); + + $this->assertCount(1, $options); + $this->assertArrayHasKey('client', $options); + } + + public function testShouldHaveExpectedAttributes() + { + $command = new SetupBrokerCommand($this->createMock(ContainerInterface::class)); + + $arguments = $command->getDefinition()->getArguments(); + + $this->assertCount(0, $arguments); + } + + public function testShouldCallDriverSetupBrokerMethod() { $driver = $this->createClientDriverMock(); $driver @@ -36,12 +57,62 @@ public function testShouldCreateQueues() ->method('setupBroker') ; - $command = new SetupBrokerCommand($driver); + $command = new SetupBrokerCommand(new Container([ + 'enqueue.client.default.driver' => $driver, + ])); $tester = new CommandTester($command); $tester->execute([]); - $this->assertContains('Setup Broker', $tester->getDisplay()); + $this->assertContains('Broker set up', $tester->getDisplay()); + } + + public function testShouldCallRequestedClientDriverSetupBrokerMethod() + { + $defaultDriver = $this->createClientDriverMock(); + $defaultDriver + ->expects($this->never()) + ->method('setupBroker') + ; + + $fooDriver = $this->createClientDriverMock(); + $fooDriver + ->expects($this->once()) + ->method('setupBroker') + ; + + $command = new SetupBrokerCommand(new Container([ + 'enqueue.client.default.driver' => $defaultDriver, + 'enqueue.client.foo.driver' => $fooDriver, + ])); + + $tester = new CommandTester($command); + $tester->execute([ + '--client' => 'foo', + ]); + + $this->assertContains('Broker set up', $tester->getDisplay()); + } + + public function testShouldThrowIfClientNotFound() + { + $defaultDriver = $this->createClientDriverMock(); + $defaultDriver + ->expects($this->never()) + ->method('setupBroker') + ; + + $command = new SetupBrokerCommand(new Container([ + 'enqueue.client.default.driver' => $defaultDriver, + ])); + + $tester = new CommandTester($command); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Client "foo" is not supported.'); + $tester->execute([ + '--client' => 'foo', + ]); } /** diff --git a/pkg/enqueue/Tests/Symfony/Consumption/ConfigurableConsumeCommandTest.php b/pkg/enqueue/Tests/Symfony/Consumption/ConfigurableConsumeCommandTest.php new file mode 100644 index 000000000..ef50ad228 --- /dev/null +++ b/pkg/enqueue/Tests/Symfony/Consumption/ConfigurableConsumeCommandTest.php @@ -0,0 +1,286 @@ +createMock(ContainerInterface::class)); + } + + public function testShouldHaveCommandName() + { + $command = new ConfigurableConsumeCommand($this->createMock(ContainerInterface::class)); + + $this->assertEquals('enqueue:transport:consume', $command->getName()); + } + + public function testShouldHaveExpectedOptions() + { + $command = new ConfigurableConsumeCommand($this->createMock(ContainerInterface::class)); + + $options = $command->getDefinition()->getOptions(); + + $this->assertCount(8, $options); + $this->assertArrayHasKey('memory-limit', $options); + $this->assertArrayHasKey('message-limit', $options); + $this->assertArrayHasKey('time-limit', $options); + $this->assertArrayHasKey('idle-time', $options); + $this->assertArrayHasKey('receive-timeout', $options); + $this->assertArrayHasKey('niceness', $options); + $this->assertArrayHasKey('transport', $options); + $this->assertArrayHasKey('logger', $options); + } + + public function testShouldHaveExpectedAttributes() + { + $command = new ConfigurableConsumeCommand($this->createMock(ContainerInterface::class)); + + $arguments = $command->getDefinition()->getArguments(); + + $this->assertCount(2, $arguments); + $this->assertArrayHasKey('processor', $arguments); + $this->assertArrayHasKey('queues', $arguments); + } + + public function testThrowIfNeitherQueueOptionNorProcessorImplementsQueueSubscriberInterface() + { + $processor = $this->createProcessor(); + + $consumer = $this->createQueueConsumerMock(); + $consumer + ->expects($this->never()) + ->method('bind') + ; + $consumer + ->expects($this->never()) + ->method('consume') + ; + + $command = new ConfigurableConsumeCommand(new Container([ + 'enqueue.transport.default.queue_consumer' => $consumer, + 'enqueue.transport.default.processor_registry' => new ArrayProcessorRegistry(['aProcessor' => $processor]), + ])); + + $tester = new CommandTester($command); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The queue is not provided. The processor must implement "Enqueue\Consumption\QueueSubscriberInterface" interface and it must return not empty array of queues or a queue set using as a second argument.'); + $tester->execute([ + 'processor' => 'aProcessor', + ]); + } + + public function testShouldExecuteConsumptionWithExplicitlySetQueue() + { + $processor = $this->createProcessor(); + + $consumer = $this->createQueueConsumerMock(); + $consumer + ->expects($this->once()) + ->method('bind') + ->with('queue-name', $this->identicalTo($processor)) + ; + $consumer + ->expects($this->once()) + ->method('consume') + ->with($this->isInstanceOf(ChainExtension::class)) + ; + + $command = new ConfigurableConsumeCommand(new Container([ + 'enqueue.transport.default.queue_consumer' => $consumer, + 'enqueue.transport.default.processor_registry' => new ArrayProcessorRegistry(['processor-service' => $processor]), + ])); + + $tester = new CommandTester($command); + $tester->execute([ + 'processor' => 'processor-service', + 'queues' => ['queue-name'], + ]); + } + + public function testThrowIfTransportNotDefined() + { + $processor = $this->createProcessor(); + + $consumer = $this->createQueueConsumerMock(); + $consumer + ->expects($this->never()) + ->method('bind') + ; + $consumer + ->expects($this->never()) + ->method('consume') + ; + + $command = new ConfigurableConsumeCommand(new Container([ + 'enqueue.transport.default.queue_consumer' => $consumer, + 'enqueue.transport.default.processor_registry' => new ArrayProcessorRegistry(['processor-service' => $processor]), + ])); + + $tester = new CommandTester($command); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Transport "not-defined" is not supported.'); + $tester->execute([ + 'processor' => 'processor-service', + 'queues' => ['queue-name'], + '--transport' => 'not-defined', + ]); + } + + public function testShouldExecuteConsumptionWithSeveralCustomQueues() + { + $processor = $this->createProcessor(); + + $consumer = $this->createQueueConsumerMock(); + $consumer + ->expects($this->at(0)) + ->method('bind') + ->with('queue-name', $this->identicalTo($processor)) + ; + $consumer + ->expects($this->at(1)) + ->method('bind') + ->with('another-queue-name', $this->identicalTo($processor)) + ; + $consumer + ->expects($this->at(2)) + ->method('consume') + ->with($this->isInstanceOf(ChainExtension::class)) + ; + + $command = new ConfigurableConsumeCommand(new Container([ + 'enqueue.transport.default.queue_consumer' => $consumer, + 'enqueue.transport.default.processor_registry' => new ArrayProcessorRegistry(['processor-service' => $processor]), + ])); + + $tester = new CommandTester($command); + $tester->execute([ + 'processor' => 'processor-service', + 'queues' => ['queue-name', 'another-queue-name'], + ]); + } + + public function testShouldExecuteConsumptionWhenProcessorImplementsQueueSubscriberInterface() + { + $processor = new class() implements Processor, QueueSubscriberInterface { + public function process(InteropMessage $message, Context $context) + { + } + + public static function getSubscribedQueues() + { + return ['fooSubscribedQueues', 'barSubscribedQueues']; + } + }; + + $consumer = $this->createQueueConsumerMock(); + $consumer + ->expects($this->at(0)) + ->method('bind') + ->with('fooSubscribedQueues', $this->identicalTo($processor)) + ; + $consumer + ->expects($this->at(1)) + ->method('bind') + ->with('barSubscribedQueues', $this->identicalTo($processor)) + ; + $consumer + ->expects($this->at(2)) + ->method('consume') + ->with($this->isInstanceOf(ChainExtension::class)) + ; + + $command = new ConfigurableConsumeCommand(new Container([ + 'enqueue.transport.default.queue_consumer' => $consumer, + 'enqueue.transport.default.processor_registry' => new ArrayProcessorRegistry(['processor-service' => $processor]), + ])); + + $tester = new CommandTester($command); + $tester->execute([ + 'processor' => 'processor-service', + ]); + } + + public function testShouldExecuteConsumptionWithCustomTransportExplicitlySetQueue() + { + $processor = $this->createProcessor(); + + $fooConsumer = $this->createQueueConsumerMock(); + $fooConsumer + ->expects($this->never()) + ->method('bind') + ; + $fooConsumer + ->expects($this->never()) + ->method('consume') + ->with($this->isInstanceOf(ChainExtension::class)) + ; + + $barConsumer = $this->createQueueConsumerMock(); + $barConsumer + ->expects($this->once()) + ->method('bind') + ->with('queue-name', $this->identicalTo($processor)) + ; + $barConsumer + ->expects($this->once()) + ->method('consume') + ->with($this->isInstanceOf(ChainExtension::class)) + ; + + $command = new ConfigurableConsumeCommand(new Container([ + 'enqueue.transport.foo.queue_consumer' => $fooConsumer, + 'enqueue.transport.foo.processor_registry' => new ArrayProcessorRegistry(['processor-service' => $processor]), + 'enqueue.transport.bar.queue_consumer' => $barConsumer, + 'enqueue.transport.bar.processor_registry' => new ArrayProcessorRegistry(['processor-service' => $processor]), + ])); + + $tester = new CommandTester($command); + $tester->execute([ + 'processor' => 'processor-service', + 'queues' => ['queue-name'], + '--transport' => 'bar', + ]); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|InteropQueue + */ + protected function createQueueMock() + { + return $this->createMock(InteropQueue::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|Processor + */ + protected function createProcessor() + { + return $this->createMock(Processor::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|QueueConsumerInterface + */ + protected function createQueueConsumerMock() + { + return $this->createMock(QueueConsumerInterface::class); + } +} diff --git a/pkg/enqueue/Tests/Symfony/Consumption/ConsumeCommandTest.php b/pkg/enqueue/Tests/Symfony/Consumption/ConsumeCommandTest.php new file mode 100644 index 000000000..70bc57251 --- /dev/null +++ b/pkg/enqueue/Tests/Symfony/Consumption/ConsumeCommandTest.php @@ -0,0 +1,120 @@ +createMock(ContainerInterface::class)); + } + + public function testShouldHaveCommandName() + { + $command = new ConsumeCommand($this->createMock(ContainerInterface::class)); + + $this->assertEquals('enqueue:transport:consume', $command->getName()); + } + + public function testShouldHaveExpectedOptions() + { + $command = new ConsumeCommand($this->createMock(ContainerInterface::class)); + + $options = $command->getDefinition()->getOptions(); + + $this->assertCount(8, $options); + $this->assertArrayHasKey('memory-limit', $options); + $this->assertArrayHasKey('message-limit', $options); + $this->assertArrayHasKey('time-limit', $options); + $this->assertArrayHasKey('idle-time', $options); + $this->assertArrayHasKey('receive-timeout', $options); + $this->assertArrayHasKey('niceness', $options); + $this->assertArrayHasKey('transport', $options); + $this->assertArrayHasKey('logger', $options); + } + + public function testShouldHaveExpectedAttributes() + { + $command = new ConsumeCommand($this->createMock(ContainerInterface::class)); + + $arguments = $command->getDefinition()->getArguments(); + + $this->assertCount(0, $arguments); + } + + public function testShouldExecuteDefaultConsumption() + { + $consumer = $this->createQueueConsumerMock(); + $consumer + ->expects($this->once()) + ->method('consume') + ->with($this->isInstanceOf(ChainExtension::class)) + ; + + $command = new ConsumeCommand(new Container([ + 'enqueue.transport.default.queue_consumer' => $consumer, + ])); + + $tester = new CommandTester($command); + $tester->execute([]); + } + + public function testShouldExecuteCustomConsumption() + { + $defaultConsumer = $this->createQueueConsumerMock(); + $defaultConsumer + ->expects($this->never()) + ->method('consume') + ; + + $customConsumer = $this->createQueueConsumerMock(); + $customConsumer + ->expects($this->once()) + ->method('consume') + ->with($this->isInstanceOf(ChainExtension::class)) + ; + + $command = new ConsumeCommand(new Container([ + 'enqueue.transport.default.queue_consumer' => $defaultConsumer, + 'enqueue.transport.custom.queue_consumer' => $customConsumer, + ])); + + $tester = new CommandTester($command); + $tester->execute(['--transport' => 'custom']); + } + + public function testThrowIfNotDefinedTransportRequested() + { + $defaultConsumer = $this->createQueueConsumerMock(); + $defaultConsumer + ->expects($this->never()) + ->method('consume') + ; + + $command = new ConsumeCommand(new Container([ + 'enqueue.transport.default.queue_consumer' => $defaultConsumer, + ])); + + $tester = new CommandTester($command); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Transport "not-defined" is not supported.'); + $tester->execute(['--transport' => 'not-defined']); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|QueueConsumerInterface + */ + private function createQueueConsumerMock() + { + return $this->createMock(QueueConsumerInterface::class); + } +} diff --git a/pkg/enqueue/Tests/Symfony/Consumption/ConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Consumption/ConsumeMessagesCommandTest.php deleted file mode 100644 index 2e47a05b5..000000000 --- a/pkg/enqueue/Tests/Symfony/Consumption/ConsumeMessagesCommandTest.php +++ /dev/null @@ -1,86 +0,0 @@ -createQueueConsumerMock()); - } - - public function testShouldHaveCommandName() - { - $command = new ConsumeMessagesCommand($this->createQueueConsumerMock()); - - $this->assertEquals('enqueue:transport:consume', $command->getName()); - } - - public function testShouldHaveExpectedOptions() - { - $command = new ConsumeMessagesCommand($this->createQueueConsumerMock()); - - $options = $command->getDefinition()->getOptions(); - - $this->assertCount(6, $options); - $this->assertArrayHasKey('memory-limit', $options); - $this->assertArrayHasKey('message-limit', $options); - $this->assertArrayHasKey('time-limit', $options); - $this->assertArrayHasKey('idle-timeout', $options); - $this->assertArrayHasKey('receive-timeout', $options); - $this->assertArrayHasKey('niceness', $options); - } - - public function testShouldHaveExpectedAttributes() - { - $command = new ConsumeMessagesCommand($this->createQueueConsumerMock()); - - $arguments = $command->getDefinition()->getArguments(); - - $this->assertCount(0, $arguments); - } - - public function testShouldExecuteConsumption() - { - $context = $this->createContextMock(); - $context - ->expects($this->never()) - ->method('close') - ; - - $consumer = $this->createQueueConsumerMock(); - $consumer - ->expects($this->once()) - ->method('consume') - ->with($this->isInstanceOf(ChainExtension::class)) - ; - - $command = new ConsumeMessagesCommand($consumer); - - $tester = new CommandTester($command); - $tester->execute([]); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|Context - */ - private function createContextMock() - { - return $this->createMock(Context::class); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|QueueConsumerInterface - */ - private function createQueueConsumerMock() - { - return $this->createMock(QueueConsumerInterface::class); - } -} diff --git a/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php deleted file mode 100644 index 2c04b0761..000000000 --- a/pkg/enqueue/Tests/Symfony/Consumption/ContainerAwareConsumeMessagesCommandTest.php +++ /dev/null @@ -1,208 +0,0 @@ -createQueueConsumerMock()); - } - - public function testShouldHaveCommandName() - { - $command = new ContainerAwareConsumeMessagesCommand($this->createQueueConsumerMock()); - - $this->assertEquals('enqueue:transport:consume', $command->getName()); - } - - public function testShouldHaveExpectedOptions() - { - $command = new ContainerAwareConsumeMessagesCommand($this->createQueueConsumerMock()); - - $options = $command->getDefinition()->getOptions(); - - $this->assertCount(7, $options); - $this->assertArrayHasKey('memory-limit', $options); - $this->assertArrayHasKey('message-limit', $options); - $this->assertArrayHasKey('time-limit', $options); - $this->assertArrayHasKey('queue', $options); - $this->assertArrayHasKey('idle-timeout', $options); - $this->assertArrayHasKey('receive-timeout', $options); - $this->assertArrayHasKey('niceness', $options); - } - - public function testShouldHaveExpectedAttributes() - { - $command = new ContainerAwareConsumeMessagesCommand($this->createQueueConsumerMock()); - - $arguments = $command->getDefinition()->getArguments(); - - $this->assertCount(1, $arguments); - $this->assertArrayHasKey('processor-service', $arguments); - } - - public function testShouldThrowExceptionIfProcessorInstanceHasWrongClass() - { - $container = new Container(); - $container->set('processor-service', new \stdClass()); - - $command = new ContainerAwareConsumeMessagesCommand($this->createQueueConsumerMock()); - $command->setContainer($container); - - $tester = new CommandTester($command); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('Invalid message processor service given. It must be an instance of Interop\Queue\Processor but stdClass'); - $tester->execute([ - 'processor-service' => 'processor-service', - '--queue' => ['queue-name'], - ]); - } - - public function testThrowIfNeitherQueueOptionNorProcessorImplementsQueueSubscriberInterface() - { - $processor = $this->createProcessor(); - - $consumer = $this->createQueueConsumerMock(); - $consumer - ->expects($this->never()) - ->method('bind') - ; - $consumer - ->expects($this->never()) - ->method('consume') - ; - - $container = new Container(); - $container->set('processor-service', $processor); - - $command = new ContainerAwareConsumeMessagesCommand($consumer); - $command->setContainer($container); - - $tester = new CommandTester($command); - - $this->expectException(\LogicException::class); - $this->expectExceptionMessage('The queues are not provided. The processor must implement "Enqueue\Consumption\QueueSubscriberInterface" interface and it must return not empty array of queues or queues set using --queue option.'); - $tester->execute([ - 'processor-service' => 'processor-service', - ]); - } - - public function testShouldExecuteConsumptionWithExplicitlySetQueueViaQueueOption() - { - $processor = $this->createProcessor(); - - $context = $this->createContextMock(); - $context - ->expects($this->never()) - ->method('close') - ; - - $consumer = $this->createQueueConsumerMock(); - $consumer - ->expects($this->once()) - ->method('bind') - ->with('queue-name', $this->identicalTo($processor)) - ; - $consumer - ->expects($this->once()) - ->method('consume') - ->with($this->isInstanceOf(ChainExtension::class)) - ; - - $container = new Container(); - $container->set('processor-service', $processor); - - $command = new ContainerAwareConsumeMessagesCommand($consumer); - $command->setContainer($container); - - $tester = new CommandTester($command); - $tester->execute([ - 'processor-service' => 'processor-service', - '--queue' => ['queue-name'], - ]); - } - - public function testShouldExecuteConsumptionWhenProcessorImplementsQueueSubscriberInterface() - { - $processor = new QueueSubscriberProcessor(); - - $context = $this->createContextMock(); - $context - ->expects($this->never()) - ->method('close') - ; - - $consumer = $this->createQueueConsumerMock(); - $consumer - ->expects($this->at(0)) - ->method('bind') - ->with('fooSubscribedQueues', $this->identicalTo($processor)) - ; - $consumer - ->expects($this->at(1)) - ->method('bind') - ->with('barSubscribedQueues', $this->identicalTo($processor)) - ; - $consumer - ->expects($this->at(2)) - ->method('consume') - ->with($this->isInstanceOf(ChainExtension::class)) - ; - - $container = new Container(); - $container->set('processor-service', $processor); - - $command = new ContainerAwareConsumeMessagesCommand($consumer); - $command->setContainer($container); - - $tester = new CommandTester($command); - $tester->execute([ - 'processor-service' => 'processor-service', - ]); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|Context - */ - protected function createContextMock() - { - return $this->createMock(Context::class); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|InteropQueue - */ - protected function createQueueMock() - { - return $this->createMock(InteropQueue::class); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|Processor - */ - protected function createProcessor() - { - return $this->createMock(Processor::class); - } - - /** - * @return \PHPUnit_Framework_MockObject_MockObject|QueueConsumerInterface - */ - protected function createQueueConsumerMock() - { - return $this->createMock(QueueConsumerInterface::class); - } -} diff --git a/pkg/enqueue/Tests/Symfony/Consumption/QueueConsumerOptionsCommandTraitTest.php b/pkg/enqueue/Tests/Symfony/Consumption/QueueConsumerOptionsCommandTraitTest.php index f26324c1d..74791725d 100644 --- a/pkg/enqueue/Tests/Symfony/Consumption/QueueConsumerOptionsCommandTraitTest.php +++ b/pkg/enqueue/Tests/Symfony/Consumption/QueueConsumerOptionsCommandTraitTest.php @@ -16,7 +16,7 @@ public function testShouldAddExtensionsOptions() $options = $trait->getDefinition()->getOptions(); $this->assertCount(2, $options); - $this->assertArrayHasKey('idle-timeout', $options); + $this->assertArrayHasKey('idle-time', $options); $this->assertArrayHasKey('receive-timeout', $options); } @@ -38,7 +38,7 @@ public function testShouldSetQueueConsumerOptions() $tester = new CommandTester($trait); $tester->execute([ - '--idle-timeout' => '123.1', + '--idle-time' => '123.1', '--receive-timeout' => '456.1', ]); } diff --git a/pkg/enqueue/Tests/Symfony/ContainerProcessorRegistryTest.php b/pkg/enqueue/Tests/Symfony/ContainerProcessorRegistryTest.php new file mode 100644 index 000000000..fe84c2e20 --- /dev/null +++ b/pkg/enqueue/Tests/Symfony/ContainerProcessorRegistryTest.php @@ -0,0 +1,108 @@ +assertClassImplements(ProcessorRegistryInterface::class, ContainerProcessorRegistry::class); + } + + public function testShouldBeFinal() + { + $this->assertClassFinal(ContainerProcessorRegistry::class); + } + + public function testCouldBeConstructedWithContainerAsFirstArgument() + { + new ContainerProcessorRegistry($this->createContainerMock()); + } + + public function testShouldAllowGetProcessor() + { + $processorMock = $this->createProcessorMock(); + + $containerMock = $this->createContainerMock(); + $containerMock + ->expects($this->once()) + ->method('has') + ->with('processor-name') + ->willReturn(true) + ; + $containerMock + ->expects($this->once()) + ->method('get') + ->with('processor-name') + ->willReturn($processorMock) + ; + + $registry = new ContainerProcessorRegistry($containerMock); + $this->assertSame($processorMock, $registry->get('processor-name')); + } + + public function testThrowErrorIfServiceDoesNotImplementProcessorReturnType() + { + $containerMock = $this->createContainerMock(); + $containerMock + ->expects($this->once()) + ->method('has') + ->with('processor-name') + ->willReturn(true) + ; + $containerMock + ->expects($this->once()) + ->method('get') + ->with('processor-name') + ->willReturn(new \stdClass()) + ; + + $registry = new ContainerProcessorRegistry($containerMock); + + $this->expectException(\TypeError::class); + $this->expectExceptionMessage('Return value of Enqueue\Symfony\ContainerProcessorRegistry::get() must implement interface Interop\Queue\PsrProcessor, instance of stdClass returned'); + $registry->get('processor-name'); + } + + public function testShouldThrowExceptionIfProcessorIsNotSet() + { + $containerMock = $this->createContainerMock(); + $containerMock + ->expects($this->once()) + ->method('has') + ->with('processor-name') + ->willReturn(false) + ; + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Service locator does not have a processor with name "processor-name".'); + + $registry = new ContainerProcessorRegistry($containerMock); + $registry->get('processor-name'); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject + */ + private function createProcessorMock(): Processor + { + return $this->createMock(Processor::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject + */ + private function createContainerMock(): ContainerInterface + { + return $this->createMock(ContainerInterface::class); + } +} diff --git a/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php b/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php new file mode 100644 index 000000000..6a7e83fac --- /dev/null +++ b/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildConsumptionExtensionsPassTest.php @@ -0,0 +1,240 @@ +assertClassImplements(CompilerPassInterface::class, BuildConsumptionExtensionsPass::class); + } + + public function testShouldBeFinal() + { + $this->assertClassFinal(BuildConsumptionExtensionsPass::class); + } + + public function testCouldBeConstructedWithName() + { + $pass = new BuildConsumptionExtensionsPass('aName'); + + $this->assertAttributeSame('aName', 'name', $pass); + } + + public function testThrowIfNameEmptyOnConstruct() + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('The name could not be empty.'); + new BuildConsumptionExtensionsPass(''); + } + + public function testShouldDoNothingIfExtensionsServiceIsNotRegistered() + { + $container = new ContainerBuilder(); + + //guard + $this->assertFalse($container->hasDefinition('enqueue.transport.aName.consumption_extensions')); + + $pass = new BuildConsumptionExtensionsPass('aName'); + $pass->process($container); + } + + public function testShouldRegisterTransportExtension() + { + $extensions = new Definition(); + $extensions->addArgument([]); + + $container = new ContainerBuilder(); + $container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions); + + $container->register('aFooExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension', ['transport' => 'aName']) + ; + $container->register('aBarExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension', ['transport' => 'aName']) + ; + + $pass = new BuildConsumptionExtensionsPass('aName'); + $pass->process($container); + + $this->assertInternalType('array', $extensions->getArgument(0)); + $this->assertEquals([ + new Reference('aFooExtension'), + new Reference('aBarExtension'), + ], $extensions->getArgument(0)); + } + + public function testShouldIgnoreOtherTransportExtensions() + { + $extensions = new Definition(); + $extensions->addArgument([]); + + $container = new ContainerBuilder(); + $container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions); + + $container->register('aFooExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension', ['transport' => 'aName']) + ; + $container->register('aBarExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension', ['transport' => 'anotherName']) + ; + + $pass = new BuildConsumptionExtensionsPass('aName'); + $pass->process($container); + + $this->assertInternalType('array', $extensions->getArgument(0)); + $this->assertEquals([ + new Reference('aFooExtension'), + ], $extensions->getArgument(0)); + } + + public function testShouldAddExtensionIfTransportAll() + { + $extensions = new Definition(); + $extensions->addArgument([]); + + $container = new ContainerBuilder(); + $container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions); + + $container->register('aFooExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension', ['transport' => 'all']) + ; + $container->register('aBarExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension', ['transport' => 'anotherName']) + ; + + $pass = new BuildConsumptionExtensionsPass('aName'); + $pass->process($container); + + $this->assertInternalType('array', $extensions->getArgument(0)); + $this->assertEquals([ + new Reference('aFooExtension'), + ], $extensions->getArgument(0)); + } + + public function testShouldTreatTagsWithoutTransportAsDefaultTransport() + { + $extensions = new Definition(); + $extensions->addArgument([]); + + $container = new ContainerBuilder(); + $container->setDefinition('enqueue.transport.default.consumption_extensions', $extensions); + + $container->register('aFooExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension') + ; + $container->register('aBarExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension') + ; + + $pass = new BuildConsumptionExtensionsPass('default'); + $pass->process($container); + + $this->assertInternalType('array', $extensions->getArgument(0)); + $this->assertEquals([ + new Reference('aFooExtension'), + new Reference('aBarExtension'), + ], $extensions->getArgument(0)); + } + + public function testShouldOrderExtensionsByPriority() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.transport.default.consumption_extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.transport.consumption_extension', ['priority' => 6]); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.transport.consumption_extension', ['priority' => -5]); + $container->setDefinition('bar_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.transport.consumption_extension', ['priority' => 2]); + $container->setDefinition('baz_extension', $extension); + + $pass = new BuildConsumptionExtensionsPass('default'); + $pass->process($container); + + $orderedExtensions = $extensions->getArgument(0); + + $this->assertCount(3, $orderedExtensions); + $this->assertEquals(new Reference('foo_extension'), $orderedExtensions[0]); + $this->assertEquals(new Reference('baz_extension'), $orderedExtensions[1]); + $this->assertEquals(new Reference('bar_extension'), $orderedExtensions[2]); + } + + public function testShouldAssumePriorityZeroIfPriorityIsNotSet() + { + $container = new ContainerBuilder(); + + $extensions = new Definition(); + $extensions->addArgument([]); + $container->setDefinition('enqueue.transport.default.consumption_extensions', $extensions); + + $extension = new Definition(); + $extension->addTag('enqueue.transport.consumption_extension'); + $container->setDefinition('foo_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.transport.consumption_extension', ['priority' => 1]); + $container->setDefinition('bar_extension', $extension); + + $extension = new Definition(); + $extension->addTag('enqueue.transport.consumption_extension', ['priority' => -1]); + $container->setDefinition('baz_extension', $extension); + + $pass = new BuildConsumptionExtensionsPass('default'); + $pass->process($container); + + $orderedExtensions = $extensions->getArgument(0); + + $this->assertCount(3, $orderedExtensions); + $this->assertEquals(new Reference('bar_extension'), $orderedExtensions[0]); + $this->assertEquals(new Reference('foo_extension'), $orderedExtensions[1]); + $this->assertEquals(new Reference('baz_extension'), $orderedExtensions[2]); + } + + public function testShouldMergeWithAddedPreviously() + { + $extensions = new Definition(); + $extensions->addArgument([ + 'aBarExtension' => 'aBarServiceIdAddedPreviously', + 'aOloloExtension' => 'aOloloServiceIdAddedPreviously', + ]); + + $container = new ContainerBuilder(); + $container->setDefinition('enqueue.transport.aName.consumption_extensions', $extensions); + + $container->register('aFooExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension') + ; + $container->register('aBarExtension', ExtensionInterface::class) + ->addTag('enqueue.transport.consumption_extension') + ; + + $pass = new BuildConsumptionExtensionsPass('aName'); + $pass->process($container); + + $this->assertInternalType('array', $extensions->getArgument(0)); + $this->assertEquals([ + 'aBarExtension' => 'aBarServiceIdAddedPreviously', + 'aOloloExtension' => 'aOloloServiceIdAddedPreviously', + ], $extensions->getArgument(0)); + } +} diff --git a/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildProcessorRegistryPassTest.php b/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildProcessorRegistryPassTest.php new file mode 100644 index 000000000..039024f7a --- /dev/null +++ b/pkg/enqueue/Tests/Symfony/DependencyInjection/BuildProcessorRegistryPassTest.php @@ -0,0 +1,169 @@ +assertClassImplements(CompilerPassInterface::class, BuildProcessorRegistryPass::class); + } + + public function testShouldBeFinal() + { + $this->assertClassFinal(BuildProcessorRegistryPass::class); + } + + public function testCouldBeConstructedWithName() + { + $pass = new BuildProcessorRegistryPass('aName'); + + $this->assertAttributeSame('aName', 'name', $pass); + } + + public function testThrowIfNameEmptyOnConstruct() + { + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('The name could not be empty.'); + new BuildProcessorRegistryPass(''); + } + + public function testShouldDoNothingIfProcessorRegistryServiceIsNotRegistered() + { + $pass = new BuildProcessorRegistryPass('aName'); + $pass->process(new ContainerBuilder()); + } + + public function testShouldRegisterProcessorWithMatchedName() + { + $registry = new Definition(ProcessorRegistryInterface::class); + $registry->addArgument([]); + + $container = new ContainerBuilder(); + $container->setDefinition('enqueue.transport.foo.processor_registry', $registry); + $container->register('aFooProcessor', 'aProcessorClass') + ->addTag('enqueue.transport.processor', ['transport' => 'foo']) + ; + $container->register('aProcessor', 'aProcessorClass') + ->addTag('enqueue.transport.processor', ['transport' => 'bar']) + ; + + $pass = new BuildProcessorRegistryPass('foo'); + + $pass->process($container); + + $this->assertInstanceOf(Reference::class, $registry->getArgument(0)); + + $this->assertLocatorServices($container, $registry->getArgument(0), [ + 'aFooProcessor' => 'aFooProcessor', + ]); + } + + public function testShouldRegisterProcessorWithoutNameToDefaultTransport() + { + $registry = new Definition(ProcessorRegistryInterface::class); + $registry->addArgument(null); + + $container = new ContainerBuilder(); + $container->setDefinition('enqueue.transport.default.processor_registry', $registry); + $container->register('aFooProcessor', 'aProcessorClass') + ->addTag('enqueue.transport.processor', []) + ; + $container->register('aProcessor', 'aProcessorClass') + ->addTag('enqueue.transport.processor', ['transport' => 'bar']) + ; + + $pass = new BuildProcessorRegistryPass('default'); + + $pass->process($container); + + $this->assertInstanceOf(Reference::class, $registry->getArgument(0)); + + $this->assertLocatorServices($container, $registry->getArgument(0), [ + 'aFooProcessor' => 'aFooProcessor', + ]); + } + + public function testShouldRegisterProcessorIfTransportNameEqualsAll() + { + $registry = new Definition(ProcessorRegistryInterface::class); + $registry->addArgument(null); + + $container = new ContainerBuilder(); + $container->setDefinition('enqueue.transport.default.processor_registry', $registry); + $container->register('aFooProcessor', 'aProcessorClass') + ->addTag('enqueue.transport.processor', ['transport' => 'all']) + ; + $container->register('aProcessor', 'aProcessorClass') + ->addTag('enqueue.transport.processor', ['transport' => 'bar']) + ; + + $pass = new BuildProcessorRegistryPass('default'); + + $pass->process($container); + + $this->assertInstanceOf(Reference::class, $registry->getArgument(0)); + + $this->assertLocatorServices($container, $registry->getArgument(0), [ + 'aFooProcessor' => 'aFooProcessor', + ]); + } + + public function testShouldRegisterWithCustomProcessorName() + { + $registry = new Definition(ProcessorRegistryInterface::class); + $registry->addArgument(null); + + $container = new ContainerBuilder(); + $container->setDefinition('enqueue.transport.default.processor_registry', $registry); + $container->register('aFooProcessor', 'aProcessorClass') + ->addTag('enqueue.transport.processor', ['processor' => 'customProcessorName']) + ; + + $pass = new BuildProcessorRegistryPass('default'); + + $pass->process($container); + + $this->assertInstanceOf(Reference::class, $registry->getArgument(0)); + + $this->assertLocatorServices($container, $registry->getArgument(0), [ + 'customProcessorName' => 'aFooProcessor', + ]); + } + + private function assertLocatorServices(ContainerBuilder $container, $locatorId, array $locatorServices) + { + $this->assertInstanceOf(Reference::class, $locatorId); + $locatorId = (string) $locatorId; + + $this->assertTrue($container->hasDefinition($locatorId)); + $this->assertRegExp('/service_locator\..*?\.enqueue\./', $locatorId); + + $match = []; + if (false == preg_match('/(service_locator\..*?)\.enqueue\./', $locatorId, $match)) { + $this->fail('preg_match should not failed'); + } + + $this->assertTrue($container->hasDefinition($match[1])); + $locator = $container->getDefinition($match[1]); + + $this->assertContainsOnly(ServiceClosureArgument::class, $locator->getArgument(0)); + $actualServices = array_map(function (ServiceClosureArgument $value) { + return (string) $value->getValues()[0]; + }, $locator->getArgument(0)); + + $this->assertEquals($locatorServices, $actualServices); + } +} diff --git a/pkg/enqueue/Tests/Symfony/DependencyInjection/TransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DependencyInjection/TransportFactoryTest.php index 8c2cc3240..993c2b365 100644 --- a/pkg/enqueue/Tests/Symfony/DependencyInjection/TransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/DependencyInjection/TransportFactoryTest.php @@ -2,8 +2,14 @@ namespace Enqueue\Tests\Symfony\DependencyInjection; +use Enqueue\Consumption\ChainExtension; +use Enqueue\Consumption\QueueConsumer; +use Enqueue\Rpc\RpcClient; +use Enqueue\Rpc\RpcFactory; use Enqueue\Symfony\DependencyInjection\TransportFactory; use Enqueue\Test\ClassExtensionTrait; +use Interop\Queue\ConnectionFactory; +use Interop\Queue\Context; use PHPUnit\Framework\TestCase; use Symfony\Component\Config\Definition\Builder\TreeBuilder; use Symfony\Component\Config\Definition\Exception\InvalidConfigurationException; @@ -41,7 +47,7 @@ public function testShouldAllowAddConfigurationAsStringDsn() $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $config = $processor->process($tb->buildTree(), ['dsn://']); @@ -59,7 +65,7 @@ public function testShouldAllowAddConfigurationAsDsnWithoutSlashes() $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $config = $processor->process($tb->buildTree(), ['dsn:']); @@ -72,7 +78,7 @@ public function testShouldSetNullTransportIfNullGiven() $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $config = $processor->process($tb->buildTree(), [null]); @@ -85,7 +91,7 @@ public function testShouldSetNullTransportIfEmptyStringGiven() $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $config = $processor->process($tb->buildTree(), ['']); @@ -98,7 +104,7 @@ public function testShouldSetNullTransportIfEmptyArrayGiven() $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $config = $processor->process($tb->buildTree(), [[]]); @@ -111,7 +117,7 @@ public function testThrowIfEmptyDsnGiven() $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $this->expectException(InvalidConfigurationException::class); @@ -125,7 +131,7 @@ public function testThrowIfFactoryClassAndFactoryServiceSetAtTheSameTime() $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $this->expectException(\LogicException::class); @@ -143,7 +149,7 @@ public function testThrowIfConnectionFactoryClassUsedWithFactoryClassAtTheSameTi $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $this->expectException(\LogicException::class); @@ -161,7 +167,7 @@ public function testThrowIfConnectionFactoryClassUsedWithFactoryServiceAtTheSame $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $this->expectException(\LogicException::class); @@ -179,7 +185,7 @@ public function testShouldAllowSetFactoryClass() $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $config = $processor->process($tb->buildTree(), [[ @@ -197,7 +203,7 @@ public function testShouldAllowSetFactoryService() $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $config = $processor->process($tb->buildTree(), [[ @@ -215,7 +221,7 @@ public function testShouldAllowSetConnectionFactoryClass() $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $config = $processor->process($tb->buildTree(), [[ @@ -233,7 +239,7 @@ public function testThrowIfExtraOptionGiven() $tb = new TreeBuilder(); $rootNode = $tb->root('foo'); - $transport->addConfiguration($rootNode); + $transport->addTransportConfiguration($rootNode); $processor = new Processor(); $config = $processor->process($tb->buildTree(), [['dsn' => 'foo:', 'extraOption' => 'aVal']]); @@ -243,15 +249,13 @@ public function testThrowIfExtraOptionGiven() ); } - public function testShouldCreateConnectionFactoryFromDSN() + public function testShouldBuildConnectionFactoryFromDSN() { $container = new ContainerBuilder(); $transport = new TransportFactory('default'); - $serviceId = $transport->createConnectionFactory($container, ['dsn' => 'foo://bar/baz']); - - $this->assertEquals('enqueue.transport.default.connection_factory', $serviceId); + $transport->buildConnectionFactory($container, ['dsn' => 'foo://bar/baz']); $this->assertTrue($container->hasDefinition('enqueue.transport.default.connection_factory')); @@ -266,15 +270,13 @@ public function testShouldCreateConnectionFactoryFromDSN() ; } - public function testShouldCreateConnectionFactoryUsingCustomFactoryClass() + public function testShouldBuildConnectionFactoryUsingCustomFactoryClass() { $container = new ContainerBuilder(); $transport = new TransportFactory('default'); - $serviceId = $transport->createConnectionFactory($container, ['dsn' => 'foo:', 'factory_class' => 'theFactoryClass']); - - $this->assertEquals('enqueue.transport.default.connection_factory', $serviceId); + $transport->buildConnectionFactory($container, ['dsn' => 'foo:', 'factory_class' => 'theFactoryClass']); $this->assertTrue($container->hasDefinition('enqueue.transport.default.connection_factory_factory')); $this->assertSame( @@ -295,15 +297,13 @@ public function testShouldCreateConnectionFactoryUsingCustomFactoryClass() ; } - public function testShouldCreateConnectionFactoryUsingCustomFactoryService() + public function testShouldBuildConnectionFactoryUsingCustomFactoryService() { $container = new ContainerBuilder(); $transport = new TransportFactory('default'); - $serviceId = $transport->createConnectionFactory($container, ['dsn' => 'foo:', 'factory_service' => 'theFactoryService']); - - $this->assertEquals('enqueue.transport.default.connection_factory', $serviceId); + $transport->buildConnectionFactory($container, ['dsn' => 'foo:', 'factory_service' => 'theFactoryService']); $this->assertTrue($container->hasDefinition('enqueue.transport.default.connection_factory')); @@ -318,15 +318,13 @@ public function testShouldCreateConnectionFactoryUsingCustomFactoryService() ; } - public function testShouldCreateConnectionFactoryUsingConnectionFactoryClassWithoutFactory() + public function testShouldBuildConnectionFactoryUsingConnectionFactoryClassWithoutFactory() { $container = new ContainerBuilder(); $transport = new TransportFactory('default'); - $serviceId = $transport->createConnectionFactory($container, ['dsn' => 'foo:', 'connection_factory_class' => 'theFactoryClass']); - - $this->assertEquals('enqueue.transport.default.connection_factory', $serviceId); + $transport->buildConnectionFactory($container, ['dsn' => 'foo:', 'connection_factory_class' => 'theFactoryClass']); $this->assertTrue($container->hasDefinition('enqueue.transport.default.connection_factory')); @@ -338,15 +336,14 @@ public function testShouldCreateConnectionFactoryUsingConnectionFactoryClassWith ; } - public function testShouldCreateContextFromDsn() + public function testShouldBuildContext() { $container = new ContainerBuilder(); + $container->register('enqueue.transport.default.connection_factory', ConnectionFactory::class); $transport = new TransportFactory('default'); - $serviceId = $transport->createContext($container, ['dsn' => 'foo://bar/baz']); - - $this->assertEquals('enqueue.transport.default.context', $serviceId); + $transport->buildContext($container, []); $this->assertNotEmpty($container->getDefinition('enqueue.transport.default.context')->getFactory()); $this->assertEquals( @@ -358,4 +355,99 @@ public function testShouldCreateContextFromDsn() $container->getDefinition('enqueue.transport.default.context')->getArguments()) ; } + + public function testThrowIfBuildContextCalledButConnectionFactoryServiceDoesNotExist() + { + $container = new ContainerBuilder(); + + $transport = new TransportFactory('default'); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('The service "enqueue.transport.default.connection_factory" does not exist.'); + $transport->buildContext($container, []); + } + + public function testShouldBuildQueueConsumerWithDefaultOptions() + { + $container = new ContainerBuilder(); + $container->register('enqueue.transport.default.context', Context::class); + + $transport = new TransportFactory('default'); + + $transport->buildQueueConsumer($container, []); + + $this->assertSame(0, $container->getParameter('enqueue.transport.default.idle_time')); + $this->assertSame(10000, $container->getParameter('enqueue.transport.default.receive_timeout')); + + $this->assertTrue($container->hasDefinition('enqueue.transport.default.consumption_extensions')); + $this->assertSame(ChainExtension::class, $container->getDefinition('enqueue.transport.default.consumption_extensions')->getClass()); + $this->assertSame([[]], $container->getDefinition('enqueue.transport.default.consumption_extensions')->getArguments()); + + $this->assertTrue($container->hasDefinition('enqueue.transport.default.queue_consumer')); + $this->assertSame(QueueConsumer::class, $container->getDefinition('enqueue.transport.default.queue_consumer')->getClass()); + $this->assertEquals([ + new Reference('enqueue.transport.default.context'), + new Reference('enqueue.transport.default.consumption_extensions'), + '%enqueue.transport.default.idle_time%', + '%enqueue.transport.default.receive_timeout%', + ], $container->getDefinition('enqueue.transport.default.queue_consumer')->getArguments()); + } + + public function testShouldBuildQueueConsumerWithCustomOptions() + { + $container = new ContainerBuilder(); + $container->register('enqueue.transport.default.context', Context::class); + + $transport = new TransportFactory('default'); + + $transport->buildQueueConsumer($container, [ + 'idle_time' => 123, + 'receive_timeout' => 567, + ]); + + $this->assertSame(123, $container->getParameter('enqueue.transport.default.idle_time')); + $this->assertSame(567, $container->getParameter('enqueue.transport.default.receive_timeout')); + } + + public function testThrowIfBuildQueueConsumerCalledButContextServiceDoesNotExist() + { + $container = new ContainerBuilder(); + + $transport = new TransportFactory('default'); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('The service "enqueue.transport.default.context" does not exist.'); + $transport->buildQueueConsumer($container, []); + } + + public function testShouldBuildRpcClientWithDefaultOptions() + { + $container = new ContainerBuilder(); + $container->register('enqueue.transport.default.context', Context::class); + + $transport = new TransportFactory('default'); + + $transport->buildRpcClient($container, []); + + $this->assertTrue($container->hasDefinition('enqueue.transport.default.rpc_factory')); + $this->assertSame(RpcFactory::class, $container->getDefinition('enqueue.transport.default.rpc_factory')->getClass()); + + $this->assertTrue($container->hasDefinition('enqueue.transport.default.rpc_client')); + $this->assertSame(RpcClient::class, $container->getDefinition('enqueue.transport.default.rpc_client')->getClass()); + $this->assertEquals([ + new Reference('enqueue.transport.default.context'), + new Reference('enqueue.transport.default.rpc_factory'), + ], $container->getDefinition('enqueue.transport.default.rpc_client')->getArguments()); + } + + public function testThrowIfBuildRpcClientCalledButContextServiceDoesNotExist() + { + $container = new ContainerBuilder(); + + $transport = new TransportFactory('default'); + + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('The service "enqueue.transport.default.context" does not exist.'); + $transport->buildRpcClient($container, []); + } } diff --git a/pkg/enqueue/composer.json b/pkg/enqueue/composer.json index 08c14be77..d8d1a48d8 100644 --- a/pkg/enqueue/composer.json +++ b/pkg/enqueue/composer.json @@ -11,7 +11,8 @@ "queue-interop/queue-interop": "0.7.x-dev", "enqueue/null": "0.9.x-dev", "ramsey/uuid": "^2|^3.5", - "psr/log": "^1" + "psr/log": "^1", + "psr/container": "^1" }, "require-dev": { "phpunit/phpunit": "~5.5", diff --git a/pkg/simple-client/SimpleClient.php b/pkg/simple-client/SimpleClient.php index 1f0d1c12a..ea7f3c7ab 100644 --- a/pkg/simple-client/SimpleClient.php +++ b/pkg/simple-client/SimpleClient.php @@ -2,7 +2,7 @@ namespace Enqueue\SimpleClient; -use Enqueue\Client\ArrayProcessorRegistry; +use Enqueue\ArrayProcessorRegistry; use Enqueue\Client\ChainExtension as ClientChainExtensions; use Enqueue\Client\Config; use Enqueue\Client\ConsumptionExtension\DelayRedeliveredMessageExtension; @@ -289,7 +289,7 @@ private function createConfiguration(): NodeInterface }); $transportNode = $rootNode->children()->arrayNode('transport'); - (new TransportFactory('default'))->addConfiguration($transportNode); + (new TransportFactory('default'))->addTransportConfiguration($transportNode); $rootNode->children() ->arrayNode('client')