diff --git a/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php b/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php index 2ec98e2b3..a3817b3e5 100644 --- a/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php +++ b/pkg/enqueue/Symfony/Client/ConsumeMessagesCommand.php @@ -4,6 +4,7 @@ use Enqueue\Client\DelegateProcessor; use Enqueue\Client\DriverInterface; +use Enqueue\Client\Meta\QueueMeta; use Enqueue\Client\Meta\QueueMetaRegistry; use Enqueue\Consumption\ChainExtension; use Enqueue\Consumption\Extension\LoggerExtension; @@ -13,6 +14,7 @@ use Symfony\Component\Console\Command\Command; use Symfony\Component\Console\Input\InputArgument; use Symfony\Component\Console\Input\InputInterface; +use Symfony\Component\Console\Input\InputOption; use Symfony\Component\Console\Logger\ConsoleLogger; use Symfony\Component\Console\Output\OutputInterface; @@ -78,6 +80,7 @@ protected function configure() 'By default it connects to default queue. '. 'It select an appropriate message processor based on a message headers') ->addArgument('client-queue-names', InputArgument::IS_ARRAY, 'Queues to consume messages from') + ->addOption('skip', null, InputOption::VALUE_IS_ARRAY | InputOption::VALUE_OPTIONAL, 'Queues to skip consumption of messages from', []) ; } @@ -94,7 +97,14 @@ protected function execute(InputInterface $input, OutputInterface $output) $queueMetas[] = $this->queueMetaRegistry->getQueueMeta($clientQueueName); } } else { - $queueMetas = $this->queueMetaRegistry->getQueuesMeta(); + /** @var QueueMeta[] $queueMetas */ + $queueMetas = iterator_to_array($this->queueMetaRegistry->getQueuesMeta()); + + foreach ($queueMetas as $index => $queueMeta) { + if (in_array($queueMeta->getClientName(), $input->getOption('skip'), true)) { + unset($queueMetas[$index]); + } + } } foreach ($queueMetas as $queueMeta) { diff --git a/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php b/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php index b6017abdc..e87f4df90 100644 --- a/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php +++ b/pkg/enqueue/Tests/Symfony/Client/ConsumeMessagesCommandTest.php @@ -61,16 +61,17 @@ public function testShouldHaveExpectedOptions() $options = $command->getDefinition()->getOptions(); - $this->assertCount(6, $options); + $this->assertCount(7, $options); $this->assertArrayHasKey('memory-limit', $options); $this->assertArrayHasKey('message-limit', $options); $this->assertArrayHasKey('time-limit', $options); $this->assertArrayHasKey('setup-broker', $options); $this->assertArrayHasKey('idle-timeout', $options); $this->assertArrayHasKey('receive-timeout', $options); + $this->assertArrayHasKey('skip', $options); } - public function testShouldHaveExpectedAttributes() + public function testShouldHaveExpectedArguments() { $command = new ConsumeMessagesCommand( $this->createQueueConsumerMock(), @@ -171,6 +172,63 @@ public function testShouldExecuteConsumptionAndUseCustomClientDestinationName() ]); } + public function testShouldSkipQueueConsumptionAndUseCustomClientDestinationName() + { + $queue = new NullQueue(''); + + $processor = $this->createDelegateProcessorMock(); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->never()) + ->method('close') + ; + + $consumer = $this->createQueueConsumerMock(); + $consumer + ->expects($this->exactly(2)) + ->method('bind') + ; + $consumer + ->expects($this->once()) + ->method('consume') + ->with($this->isInstanceOf(ChainExtension::class)) + ; + + $queueMetaRegistry = $this->createQueueMetaRegistry([ + 'fooQueue' => [ + 'transportName' => 'fooTransportQueue', + ], + 'barQueue' => [ + 'transportName' => 'barTransportQueue', + ], + 'ololoQueue' => [ + 'transportName' => 'ololoTransportQueue', + ], + ]); + + $driver = $this->createDriverMock(); + $driver + ->expects($this->at(0)) + ->method('createQueue') + ->with('fooQueue') + ->willReturn($queue) + ; + $driver + ->expects($this->at(1)) + ->method('createQueue') + ->with('ololoQueue') + ->willReturn($queue) + ; + + $command = new ConsumeMessagesCommand($consumer, $processor, $queueMetaRegistry, $driver); + + $tester = new CommandTester($command); + $tester->execute([ + '--skip' => ['barQueue'], + ]); + } + /** * @param array $destinationNames *