From 4912bbe3175cb2d6f4c907fdc8bd6b36a4d4f5be Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:03:47 +0200 Subject: [PATCH 01/16] Add symfony intergation for kafka transport --- .../Symfony/DefaultTransportFactory.php | 6 + .../Symfony/DefaultTransportFactoryTest.php | 2 + pkg/rdkafka/Client/RdKafkaDriver.php | 166 +++++++++ .../Symfony/RdKafkaTransportFactory.php | 148 ++++++++ .../Tests/Client/RdKafkaDriverTest.php | 340 ++++++++++++++++++ .../RdKafkaConnectionFactoryConfigTest.php | 92 +++++ .../Symfony/RdKafkaTransportFactoryTest.php | 146 ++++++++ pkg/rdkafka/composer.json | 4 +- 8 files changed, 903 insertions(+), 1 deletion(-) create mode 100644 pkg/rdkafka/Client/RdKafkaDriver.php create mode 100644 pkg/rdkafka/Symfony/RdKafkaTransportFactory.php create mode 100644 pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php create mode 100644 pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php create mode 100644 pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php diff --git a/pkg/enqueue/Symfony/DefaultTransportFactory.php b/pkg/enqueue/Symfony/DefaultTransportFactory.php index 20fa1c91a..65b877a30 100644 --- a/pkg/enqueue/Symfony/DefaultTransportFactory.php +++ b/pkg/enqueue/Symfony/DefaultTransportFactory.php @@ -10,6 +10,8 @@ use Enqueue\Gps\Symfony\GpsTransportFactory; use Enqueue\Null\NullConnectionFactory; use Enqueue\Null\Symfony\NullTransportFactory; +use Enqueue\RdKafka\RdKafkaConnectionFactory; +use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory; use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Redis\Symfony\RedisTransportFactory; use Enqueue\Sqs\SqsConnectionFactory; @@ -209,6 +211,10 @@ private function findFactory($dsn) return new StompTransportFactory('default_stomp'); } + if ($factory instanceof RdKafkaConnectionFactory) { + return new RdKafkaTransportFactory('default_kafka'); + } + throw new \LogicException(sprintf( 'There is no supported transport factory for the connection factory "%s" created from DSN "%s"', get_class($factory), diff --git a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php index 9a013fdb9..8f2df545b 100644 --- a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php @@ -289,5 +289,7 @@ public static function provideDSNs() yield ['redis:', 'default_redis']; yield ['stomp:', 'default_stomp']; + + yield ['kafka:', 'default_kafka']; } } diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php new file mode 100644 index 000000000..e3450b368 --- /dev/null +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -0,0 +1,166 @@ +context = $context; + $this->config = $config; + $this->queueMetaRegistry = $queueMetaRegistry; + } + + /** + * {@inheritdoc} + */ + public function createTransportMessage(Message $message) + { + $headers = $message->getHeaders(); + $headers['content_type'] = $message->getContentType(); + + $transportMessage = $this->context->createMessage(); + $transportMessage->setBody($message->getBody()); + $transportMessage->setHeaders($headers); + $transportMessage->setProperties($message->getProperties()); + $transportMessage->setMessageId($message->getMessageId()); + $transportMessage->setTimestamp($message->getTimestamp()); + $transportMessage->setReplyTo($message->getReplyTo()); + $transportMessage->setCorrelationId($message->getCorrelationId()); + + return $transportMessage; + } + + /** + * {@inheritdoc} + */ + public function createClientMessage(PsrMessage $message) + { + $clientMessage = new Message(); + $clientMessage->setBody($message->getBody()); + $clientMessage->setHeaders($message->getHeaders()); + $clientMessage->setProperties($message->getProperties()); + + $clientMessage->setTimestamp($message->getTimestamp()); + $clientMessage->setMessageId($message->getMessageId()); + $clientMessage->setReplyTo($message->getReplyTo()); + $clientMessage->setCorrelationId($message->getCorrelationId()); + + if ($contentType = $message->getHeader('content_type')) { + $clientMessage->setContentType($contentType); + } + + if ($expiration = $message->getHeader('expiration')) { + $clientMessage->setExpire($expiration); + } + + if ($delay = $message->getHeader('delay')) { + $clientMessage->setDelay($delay); + } + + if ($priority = $message->getHeader('priority')) { + $clientMessage->setPriority($priority); + } + + return $clientMessage; + } + + /** + * {@inheritdoc} + */ + public function sendToRouter(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { + throw new \LogicException('Topic name parameter is required but is not set'); + } + + $topic = $this->createRouterTopic(); + $transportMessage = $this->createTransportMessage($message); + + $this->context->createProducer()->send($topic, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function sendToProcessor(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { + throw new \LogicException('Processor name parameter is required but is not set'); + } + + if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { + throw new \LogicException('Queue name parameter is required but is not set'); + } + + $transportMessage = $this->createTransportMessage($message); + $destination = $this->createQueue($queueName); + + $this->context->createProducer()->send($destination, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function createQueue($queueName) + { + $transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName(); + + return $this->context->createQueue($transportName); + } + + /** + * {@inheritdoc} + */ + public function setupBroker(LoggerInterface $logger = null) + { + $logger = $logger ?: new NullLogger(); + $logger->debug('[RdKasfkaDriver] setup broker'); + } + + /** + * {@inheritdoc} + */ + public function getConfig() + { + return $this->config; + } + + private function createRouterTopic() + { + $topic = $this->context->createTopic( + $this->config->createTransportRouterTopicName($this->config->getRouterTopicName()) + ); + + return $topic; + } +} \ No newline at end of file diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php new file mode 100644 index 000000000..26e904631 --- /dev/null +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -0,0 +1,148 @@ +name = $name; + } + + /** + * @param ArrayNodeDefinition $builder + */ + public function addConfiguration(ArrayNodeDefinition $builder) + { + $builder + ->beforeNormalization() + ->ifString() + ->then(function ($v) { + return ['dsn' => $v]; + }) + ->end() + ->fixXmlConfig('topic') + ->children() + ->scalarNode('dsn') + ->info('The kafka DSN. Other parameters are ignored if set') + ->end() + ->arrayNode('global') + ->children() + ->scalarNode('metadata.broker.list')->end() + ->end() + ->end() + ->arrayNode('topics') + ->prototype('scalar')->end() + ->end() + ->scalarNode('dr_msq_cb') + ->info('todo') + ->end() + ->scalarNode('error_cb') + ->info('todo') + ->end() + ->scalarNode('rebalance_cb') + ->info('todo') + ->end() + ->enumNode('partitioner') + ->values(['RD_KAFKA_MSG_PARTITIONER_RANDOM', 'RD_KAFKA_MSG_PARTITIONER_CONSISTENT']) + ->info('todo') + ->end() + ->scalarNode('log_level') + ->info('todo') + ->end() + ->booleanNode('commit_async') + ->defaultFalse() + ->info('todo') + ->end() + ; + } + + /** + * @param ContainerBuilder $container + * @param array $config + * + * @return string The method must return a factory service id + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + if (false == empty($config['rdkafka'])) { + $config['rdkafka'] = new Reference($config['rdkafka']); + } + + $factory = new Definition(RdKafkaConnectionFactory::class); + $factory->setArguments([isset($config['dsn']) ? $config['dsn'] : $config]); + + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $container->setDefinition($factoryId, $factory); + + return $factoryId; + } + + /** + * @param ContainerBuilder $container + * @param array $config + * + * @return string The method must return a context service id + */ + public function createContext(ContainerBuilder $container, array $config) + { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + + $context = new Definition(RdKafkaContext::class); + $context->setPublic(true); + $context->setFactory([new Reference($factoryId), 'createContext']); + + $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); + $container->setDefinition($contextId, $context); + + return $contextId; + } + + /** + * @param ContainerBuilder $container + * @param array $config + * + * @return string The method must return a driver service id + */ + public function createDriver(ContainerBuilder $container, array $config) + { + $driver = new Definition(RdKafkaDriver::class); + $driver->setPublic(true); + $driver->setArguments([ + new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), + new Reference('enqueue.client.config'), + new Reference('enqueue.client.meta.queue_meta_registry'), + ]); + + $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); + $container->setDefinition($driverId, $driver); + + return $driverId; + } + + /** + * @return string + */ + public function getName() + { + return $this->name; + } +} \ No newline at end of file diff --git a/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php new file mode 100644 index 000000000..9c0b9f1ee --- /dev/null +++ b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php @@ -0,0 +1,340 @@ +assertClassImplements(DriverInterface::class, RdKafkaDriver::class); + } + + public function testCouldBeConstructedWithRequiredArguments() + { + new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + } + + public function testShouldReturnConfigObject() + { + $config = $this->createDummyConfig(); + + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $config, + $this->createDummyQueueMetaRegistry() + ); + + $this->assertSame($config, $driver->getConfig()); + } + + public function testShouldCreateAndReturnQueueInstance() + { + $expectedQueue = new RdKafkaTopic('aName'); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('aprefix.afooqueue') + ->willReturn($expectedQueue) + ; + + $driver = new RdKafkaDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); + + $queue = $driver->createQueue('aFooQueue'); + + $this->assertSame($expectedQueue, $queue); + } + + public function testShouldCreateAndReturnQueueInstanceWithHardcodedTransportName() + { + $expectedQueue = new RdKafkaTopic('aName'); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('aBarQueue') + ->willReturn($expectedQueue) + ; + + $driver = new RdKafkaDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); + + $queue = $driver->createQueue('aBarQueue'); + + $this->assertSame($expectedQueue, $queue); + } + + public function testShouldConvertTransportMessageToClientMessage() + { + $transportMessage = new RdKafkaMessage(); + $transportMessage->setBody('body'); + $transportMessage->setHeaders(['hkey' => 'hval']); + $transportMessage->setProperties(['key' => 'val']); + $transportMessage->setHeader('content_type', 'ContentType'); + $transportMessage->setMessageId('MessageId'); + $transportMessage->setTimestamp(1000); + + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $clientMessage = $driver->createClientMessage($transportMessage); + + $this->assertInstanceOf(Message::class, $clientMessage); + $this->assertSame('body', $clientMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + ], $clientMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $clientMessage->getProperties()); + $this->assertSame('MessageId', $clientMessage->getMessageId()); + $this->assertSame('ContentType', $clientMessage->getContentType()); + $this->assertSame(1000, $clientMessage->getTimestamp()); + + $this->assertNull($clientMessage->getExpire()); + } + + public function testShouldConvertClientMessageToTransportMessage() + { + $clientMessage = new Message(); + $clientMessage->setBody('body'); + $clientMessage->setHeaders(['hkey' => 'hval']); + $clientMessage->setProperties(['key' => 'val']); + $clientMessage->setContentType('ContentType'); + $clientMessage->setExpire(123); + $clientMessage->setMessageId('MessageId'); + $clientMessage->setTimestamp(1000); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn(new RdKafkaMessage()) + ; + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $transportMessage = $driver->createTransportMessage($clientMessage); + + $this->assertInstanceOf(RdKafkaMessage::class, $transportMessage); + $this->assertSame('body', $transportMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + 'reply_to' => null, + 'correlation_id' => '', + ], $transportMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $transportMessage->getProperties()); + $this->assertSame('MessageId', $transportMessage->getMessageId()); + $this->assertSame(1000, $transportMessage->getTimestamp()); + } + + public function testShouldSendMessageToRouter() + { + $topic = new RdKafkaTopic('queue-name'); + $transportMessage = new RdKafkaMessage(); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($topic), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createTopic') + ->with('aprefix.router') + ->willReturn($topic) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic'); + + $driver->sendToRouter($message); + } + + public function testShouldThrowExceptionIfTopicParameterIsNotSet() + { + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Topic name parameter is required but is not set'); + + $driver->sendToRouter(new Message()); + } + + public function testShouldSendMessageToProcessor() + { + $queue = new RdKafkaTopic('queue-name'); + $transportMessage = new RdKafkaMessage(); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aFooQueue'); + + $driver->sendToProcessor($message); + } + + public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet() + { + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Processor name parameter is required but is not set'); + + $driver->sendToProcessor(new Message()); + } + + public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet() + { + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Queue name parameter is required but is not set'); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + + $driver->sendToProcessor($message); + } + + public function testShouldSetupBroker() + { + $context = $this->createPsrContextMock(); + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $driver->setupBroker(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|RdKafkaContext + */ + private function createPsrContextMock() + { + return $this->createMock(RdKafkaContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrProducer + */ + private function createPsrProducerMock() + { + return $this->createMock(PsrProducer::class); + } + + /** + * @return QueueMetaRegistry + */ + private function createDummyQueueMetaRegistry() + { + $registry = new QueueMetaRegistry($this->createDummyConfig(), []); + $registry->add('default'); + $registry->add('aFooQueue'); + $registry->add('aBarQueue', 'aBarQueue'); + + return $registry; + } + + /** + * @return Config + */ + private function createDummyConfig() + { + return Config::create('aPrefix'); + } +} diff --git a/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php new file mode 100644 index 000000000..179599455 --- /dev/null +++ b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php @@ -0,0 +1,92 @@ +expectException(\LogicException::class); + $this->expectExceptionMessage('The config must be either an array of options, a DSN string or null'); + + new RdKafkaConnectionFactory(new \stdClass()); + } + + public function testThrowIfSchemeIsNotSupported() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be "kafka" only.'); + + new RdKafkaConnectionFactory('http://example.com'); + } + + /** + * @dataProvider provideConfigs + * + * @param mixed $config + * @param mixed $expectedConfig + */ + public function testShouldParseConfigurationAsExpected($config, $expectedConfig) + { + $factory = new RdKafkaConnectionFactory($config); + + $config = $this->getObjectAttribute($factory, 'config'); + + $this->assertNotEmpty($config['global']['group.id']); + + $config['global']['group.id'] = 'group-id'; + $this->assertSame($expectedConfig, $config); + } + + public static function provideConfigs() + { + yield [ + null, + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'localhost:9092' + ] + ], + ]; + + yield [ + 'kafka:', + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'localhost:9092' + ] + ], + ]; + + yield [ + 'kafka://user:pass@host:10000/db', + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'host:10000' + ] + ], + ]; + + yield [ + [], + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'localhost:9092' + ] + ], + ]; + } +} diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php new file mode 100644 index 000000000..e81a125e5 --- /dev/null +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -0,0 +1,146 @@ +assertClassImplements(TransportFactoryInterface::class, RdKafkaTransportFactory::class); + } + + public function testCouldBeConstructedWithDefaultName() + { + $transport = new RdKafkaTransportFactory(); + + $this->assertEquals('rdkafka', $transport->getName()); + } + + public function testCouldBeConstructedWithCustomName() + { + $transport = new RdKafkaTransportFactory('theCustomName'); + + $this->assertEquals('theCustomName', $transport->getName()); + } + + public function testShouldAllowAddConfiguration() + { + $transport = new RdKafkaTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[ + ]]); + + $this->assertEquals([ + 'topics' => [], + 'commit_async' => false + ], $config); + } + + public function testShouldAllowAddConfigurationAsString() + { + $transport = new RdKafkaTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), ['fileDSN']); + + $this->assertEquals([ + 'dsn' => 'fileDSN', + 'topics' => [], + 'commit_async' => false + ], $config); + } + + public function testShouldCreateConnectionFactory() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(RdKafkaConnectionFactory::class, $factory->getClass()); + $this->assertSame([[ + ]], $factory->getArguments()); + } + + public function testShouldCreateConnectionFactoryFromDsnString() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + 'dsn' => 'theFileDSN', + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(RdKafkaConnectionFactory::class, $factory->getClass()); + $this->assertSame(['theFileDSN'], $factory->getArguments()); + } + + public function testShouldCreateContext() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createContext($container, [ + ]); + + $this->assertEquals('enqueue.transport.rdkafka.context', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $context = $container->getDefinition('enqueue.transport.rdkafka.context'); + $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.rdkafka.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('createContext', $context->getFactory()[1]); + } + + public function testShouldCreateDriver() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createDriver($container, []); + + $this->assertEquals('enqueue.client.rdkafka.driver', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $driver = $container->getDefinition($serviceId); + $this->assertSame(RdKafkaDriver::class, $driver->getClass()); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(0)); + $this->assertEquals('enqueue.transport.rdkafka.context', (string) $driver->getArgument(0)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(1)); + $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(2)); + $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2)); + } +} diff --git a/pkg/rdkafka/composer.json b/pkg/rdkafka/composer.json index 5339cd980..09909b592 100644 --- a/pkg/rdkafka/composer.json +++ b/pkg/rdkafka/composer.json @@ -16,7 +16,9 @@ "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", "queue-interop/queue-spec": "^0.5.3@dev", - "kwn/php-rdkafka-stubs": "^1.0.2" + "kwn/php-rdkafka-stubs": "^1.0.2", + "symfony/dependency-injection": "^2.8|^3|^4", + "symfony/config": "^2.8|^3|^4" }, "support": { "email": "opensource@forma-pro.com", From d8d51e6b3d0c47a69788ba2d7aede1f3812468fc Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:07:35 +0200 Subject: [PATCH 02/16] Remove expiration, delay and priority from the RdKafka driver --- pkg/rdkafka/Client/RdKafkaDriver.php | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index e3450b368..fcb72051a 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -79,18 +79,6 @@ public function createClientMessage(PsrMessage $message) $clientMessage->setContentType($contentType); } - if ($expiration = $message->getHeader('expiration')) { - $clientMessage->setExpire($expiration); - } - - if ($delay = $message->getHeader('delay')) { - $clientMessage->setDelay($delay); - } - - if ($priority = $message->getHeader('priority')) { - $clientMessage->setPriority($priority); - } - return $clientMessage; } From 728b3d577d4ce1ca52ea97d229faefc5b6f0b693 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:21:15 +0200 Subject: [PATCH 03/16] Fixed a typo --- pkg/rdkafka/Client/RdKafkaDriver.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index fcb72051a..ffd19fbfd 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -132,7 +132,7 @@ public function createQueue($queueName) public function setupBroker(LoggerInterface $logger = null) { $logger = $logger ?: new NullLogger(); - $logger->debug('[RdKasfkaDriver] setup broker'); + $logger->debug('[RdKafkaDriver] setup broker'); } /** From c0be06668c5c4f9e1ed18099816cf0166c1dbf79 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:24:19 +0200 Subject: [PATCH 04/16] Renamed the dsn in the kafka transport factory test --- pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php index e81a125e5..802a966a3 100644 --- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -61,10 +61,10 @@ public function testShouldAllowAddConfigurationAsString() $transport->addConfiguration($rootNode); $processor = new Processor(); - $config = $processor->process($tb->buildTree(), ['fileDSN']); + $config = $processor->process($tb->buildTree(), ['kafkaDSN']); $this->assertEquals([ - 'dsn' => 'fileDSN', + 'dsn' => 'kafkaDSN', 'topics' => [], 'commit_async' => false ], $config); From 86b56d891fc55111a0979b6305c3bc74826b7852 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:24:52 +0200 Subject: [PATCH 05/16] Added a new line to the end of the kafka driver and transport factory --- pkg/rdkafka/Client/RdKafkaDriver.php | 2 +- pkg/rdkafka/Symfony/RdKafkaTransportFactory.php | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index ffd19fbfd..4530dc52d 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -151,4 +151,4 @@ private function createRouterTopic() return $topic; } -} \ No newline at end of file +} diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php index 26e904631..6a903aedb 100644 --- a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -145,4 +145,4 @@ public function getName() { return $this->name; } -} \ No newline at end of file +} From 161418ab5a4331cebfb41452c01bc01d0da0a82f Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:26:14 +0200 Subject: [PATCH 06/16] Renamed another dsn in the kafka transport factory test --- pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php index 802a966a3..a98a514e7 100644 --- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -93,13 +93,13 @@ public function testShouldCreateConnectionFactoryFromDsnString() $transport = new RdKafkaTransportFactory(); $serviceId = $transport->createConnectionFactory($container, [ - 'dsn' => 'theFileDSN', + 'dsn' => 'theKafkaDSN', ]); $this->assertTrue($container->hasDefinition($serviceId)); $factory = $container->getDefinition($serviceId); $this->assertEquals(RdKafkaConnectionFactory::class, $factory->getClass()); - $this->assertSame(['theFileDSN'], $factory->getArguments()); + $this->assertSame(['theKafkaDSN'], $factory->getArguments()); } public function testShouldCreateContext() From 311abf9eb2040fa6b8e76a370b3dc5e45da34467 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 15:52:09 +0200 Subject: [PATCH 07/16] Use inheritdoc in the kafka transport factory --- pkg/rdkafka/Symfony/RdKafkaTransportFactory.php | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php index 6a903aedb..88088c2ff 100644 --- a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -28,7 +28,7 @@ public function __construct($name = 'rdkafka') } /** - * @param ArrayNodeDefinition $builder + * {@inheritdoc} */ public function addConfiguration(ArrayNodeDefinition $builder) { @@ -76,10 +76,7 @@ public function addConfiguration(ArrayNodeDefinition $builder) } /** - * @param ContainerBuilder $container - * @param array $config - * - * @return string The method must return a factory service id + * {@inheritdoc} */ public function createConnectionFactory(ContainerBuilder $container, array $config) { @@ -97,10 +94,7 @@ public function createConnectionFactory(ContainerBuilder $container, array $conf } /** - * @param ContainerBuilder $container - * @param array $config - * - * @return string The method must return a context service id + * {@inheritdoc} */ public function createContext(ContainerBuilder $container, array $config) { @@ -117,10 +111,7 @@ public function createContext(ContainerBuilder $container, array $config) } /** - * @param ContainerBuilder $container - * @param array $config - * - * @return string The method must return a driver service id + * {@inheritdoc} */ public function createDriver(ContainerBuilder $container, array $config) { From 81c23e087ebd982b056c77fbf14359cbfbe35293 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Thu, 26 Apr 2018 16:01:48 +0200 Subject: [PATCH 08/16] Fixed some code styling --- pkg/rdkafka/Client/RdKafkaDriver.php | 4 ++-- .../Tests/RdKafkaConnectionFactoryConfigTest.php | 16 ++++++++-------- .../Symfony/RdKafkaTransportFactoryTest.php | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index 4530dc52d..7a5fd4a9a 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -29,8 +29,8 @@ class RdKafkaDriver implements DriverInterface private $queueMetaRegistry; /** - * @param RdKafkaContext $context - * @param Config $config + * @param RdKafkaContext $context + * @param Config $config * @param QueueMetaRegistry $queueMetaRegistry */ public function __construct(RdKafkaContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry) diff --git a/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php index 179599455..d3a6a5dab 100644 --- a/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php +++ b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php @@ -54,8 +54,8 @@ public static function provideConfigs() [ 'global' => [ 'group.id' => 'group-id', - 'metadata.broker.list' => 'localhost:9092' - ] + 'metadata.broker.list' => 'localhost:9092', + ], ], ]; @@ -64,8 +64,8 @@ public static function provideConfigs() [ 'global' => [ 'group.id' => 'group-id', - 'metadata.broker.list' => 'localhost:9092' - ] + 'metadata.broker.list' => 'localhost:9092', + ], ], ]; @@ -74,8 +74,8 @@ public static function provideConfigs() [ 'global' => [ 'group.id' => 'group-id', - 'metadata.broker.list' => 'host:10000' - ] + 'metadata.broker.list' => 'host:10000', + ], ], ]; @@ -84,8 +84,8 @@ public static function provideConfigs() [ 'global' => [ 'group.id' => 'group-id', - 'metadata.broker.list' => 'localhost:9092' - ] + 'metadata.broker.list' => 'localhost:9092', + ], ], ]; } diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php index a98a514e7..b49f643e9 100644 --- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -49,7 +49,7 @@ public function testShouldAllowAddConfiguration() $this->assertEquals([ 'topics' => [], - 'commit_async' => false + 'commit_async' => false, ], $config); } @@ -66,7 +66,7 @@ public function testShouldAllowAddConfigurationAsString() $this->assertEquals([ 'dsn' => 'kafkaDSN', 'topics' => [], - 'commit_async' => false + 'commit_async' => false, ], $config); } From 23f203807afbe398ee7728b6b93ccab1a386f1b6 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Fri, 27 Apr 2018 16:45:53 +0200 Subject: [PATCH 09/16] Updated the description on some of the kafka symfony configuration nodes --- pkg/rdkafka/Symfony/RdKafkaTransportFactory.php | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php index 88088c2ff..260cac337 100644 --- a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -52,25 +52,26 @@ public function addConfiguration(ArrayNodeDefinition $builder) ->arrayNode('topics') ->prototype('scalar')->end() ->end() - ->scalarNode('dr_msq_cb') - ->info('todo') + ->scalarNode('dr_msg_cb') + ->info('Delivery report callback') ->end() ->scalarNode('error_cb') - ->info('todo') + ->info('Error callback') ->end() ->scalarNode('rebalance_cb') - ->info('todo') + ->info('Called after consumer group has been rebalanced') ->end() ->enumNode('partitioner') ->values(['RD_KAFKA_MSG_PARTITIONER_RANDOM', 'RD_KAFKA_MSG_PARTITIONER_CONSISTENT']) - ->info('todo') + ->info('Which partitioner to use') ->end() - ->scalarNode('log_level') - ->info('todo') + ->integerNode('log_level') + ->info('Logging level (syslog(3) levels)') + ->min(0)->max(7) ->end() ->booleanNode('commit_async') ->defaultFalse() - ->info('todo') + ->info('Commit asynchronous') ->end() ; } From ea2a998cc7e55a464165efe7333e9ffee83e2d85 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Fri, 27 Apr 2018 17:21:00 +0200 Subject: [PATCH 10/16] No need to check as the default return value is null --- pkg/rdkafka/Client/RdKafkaDriver.php | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index 7a5fd4a9a..a881d6d70 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -70,15 +70,13 @@ public function createClientMessage(PsrMessage $message) $clientMessage->setHeaders($message->getHeaders()); $clientMessage->setProperties($message->getProperties()); + $clientMessage->setContentType($message->getHeader('content_type')); + $clientMessage->setTimestamp($message->getTimestamp()); $clientMessage->setMessageId($message->getMessageId()); $clientMessage->setReplyTo($message->getReplyTo()); $clientMessage->setCorrelationId($message->getCorrelationId()); - if ($contentType = $message->getHeader('content_type')) { - $clientMessage->setContentType($contentType); - } - return $clientMessage; } From a4becbef003a5c37614169c375f1d57084a0dbbd Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Mon, 30 Apr 2018 09:04:02 +0200 Subject: [PATCH 11/16] Add the rdkafka transport factory to the EnqueueBundle class --- pkg/enqueue-bundle/EnqueueBundle.php | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index e007b97dc..a62f2db2b 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -21,6 +21,8 @@ use Enqueue\Fs\Symfony\FsTransportFactory; use Enqueue\Gps\GpsConnectionFactory; use Enqueue\Gps\Symfony\GpsTransportFactory; +use Enqueue\RdKafka\RdKafkaConnectionFactory; +use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory; use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Redis\Symfony\RedisTransportFactory; use Enqueue\Sqs\SqsConnectionFactory; @@ -104,6 +106,12 @@ class_exists(AmqpLibConnectionFactory::class) $extension->setTransportFactory(new MissingTransportFactory('gps', ['enqueue/gps'])); } + if (class_exists(RdKafkaConnectionFactory::class)) { + $extension->setTransportFactory(new RdKafkaTransportFactory('rdkafka')); + } else { + $extension->setTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka'])); + } + $container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); $container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); } From d636cab60df254dbcf82f1bf513f60b75dc47426 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Mon, 30 Apr 2018 09:08:50 +0200 Subject: [PATCH 12/16] Made the rdkafka global config node a variableNode with an empty array as default --- pkg/rdkafka/Symfony/RdKafkaTransportFactory.php | 6 +----- pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php | 2 ++ 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php index 260cac337..042dfe4a6 100644 --- a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -44,11 +44,7 @@ public function addConfiguration(ArrayNodeDefinition $builder) ->scalarNode('dsn') ->info('The kafka DSN. Other parameters are ignored if set') ->end() - ->arrayNode('global') - ->children() - ->scalarNode('metadata.broker.list')->end() - ->end() - ->end() + ->variableNode('global')->defaultValue([])->end() ->arrayNode('topics') ->prototype('scalar')->end() ->end() diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php index b49f643e9..42e697a58 100644 --- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -50,6 +50,7 @@ public function testShouldAllowAddConfiguration() $this->assertEquals([ 'topics' => [], 'commit_async' => false, + 'global' => [], ], $config); } @@ -67,6 +68,7 @@ public function testShouldAllowAddConfigurationAsString() 'dsn' => 'kafkaDSN', 'topics' => [], 'commit_async' => false, + 'global' => [], ], $config); } From 2eaed0f19e1966af590032c262df7a4a8aabd714 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Mon, 30 Apr 2018 18:00:15 +0200 Subject: [PATCH 13/16] Updated the kafka symfony configuration --- pkg/rdkafka/Symfony/RdKafkaTransportFactory.php | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php index 042dfe4a6..f03971fba 100644 --- a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -39,14 +39,17 @@ public function addConfiguration(ArrayNodeDefinition $builder) return ['dsn' => $v]; }) ->end() - ->fixXmlConfig('topic') ->children() ->scalarNode('dsn') ->info('The kafka DSN. Other parameters are ignored if set') ->end() - ->variableNode('global')->defaultValue([])->end() - ->arrayNode('topics') - ->prototype('scalar')->end() + ->variableNode('global') + ->defaultValue([]) + ->info('The kafka global configuration properties') + ->end() + ->variableNode('topic') + ->defaultValue([]) + ->info('The kafka topic configuration properties') ->end() ->scalarNode('dr_msg_cb') ->info('Delivery report callback') From 574f96bfffaf78d0b6d3b824e4a61cb9c8dc9979 Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Tue, 1 May 2018 10:21:20 +0200 Subject: [PATCH 14/16] Expect the correct response in rdkafka tests --- pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php index 42e697a58..950034205 100644 --- a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -48,7 +48,7 @@ public function testShouldAllowAddConfiguration() ]]); $this->assertEquals([ - 'topics' => [], + 'topic' => [], 'commit_async' => false, 'global' => [], ], $config); @@ -66,7 +66,7 @@ public function testShouldAllowAddConfigurationAsString() $this->assertEquals([ 'dsn' => 'kafkaDSN', - 'topics' => [], + 'topic' => [], 'commit_async' => false, 'global' => [], ], $config); From b9464ec3960795955cbe70bc44d771f054feff3b Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Tue, 1 May 2018 13:36:35 +0200 Subject: [PATCH 15/16] Add the broker setup for the rdkafka driver --- pkg/rdkafka/Client/RdKafkaDriver.php | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php index a881d6d70..416725366 100644 --- a/pkg/rdkafka/Client/RdKafkaDriver.php +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -131,6 +131,21 @@ public function setupBroker(LoggerInterface $logger = null) { $logger = $logger ?: new NullLogger(); $logger->debug('[RdKafkaDriver] setup broker'); + $log = function ($text, ...$args) use ($logger) { + $logger->debug(sprintf('[RdKafkaDriver] '.$text, ...$args)); + }; + + // setup router + $routerQueue = $this->createQueue($this->config->getRouterQueueName()); + $log('Create router queue: %s', $routerQueue->getQueueName()); + $this->context->createConsumer($routerQueue); + + // setup queues + foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) { + $queue = $this->createQueue($meta->getClientName()); + $log('Create processor queue: %s', $queue->getQueueName()); + $this->context->createConsumer($queue); + } } /** From b97bc5343fec39df5c27d98a6932a3e14ade482f Mon Sep 17 00:00:00 2001 From: Dave Heineman Date: Tue, 1 May 2018 13:55:06 +0200 Subject: [PATCH 16/16] Fix the rdkafka should setupd driver test --- .../Tests/Client/RdKafkaDriverTest.php | 27 ++++++++++++++++++- 1 file changed, 26 insertions(+), 1 deletion(-) diff --git a/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php index 9c0b9f1ee..c0cfab616 100644 --- a/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php +++ b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php @@ -290,12 +290,37 @@ public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet() public function testShouldSetupBroker() { + $routerTopic = new RdKafkaTopic(''); + $routerQueue = new RdKafkaTopic(''); + + $processorTopic = new RdKafkaTopic(''); + $context = $this->createPsrContextMock(); + $context + ->expects($this->at(0)) + ->method('createQueue') + ->willReturn($routerTopic) + ; + $context + ->expects($this->at(1)) + ->method('createQueue') + ->willReturn($routerQueue) + ; + $context + ->expects($this->at(2)) + ->method('createQueue') + ->willReturn($processorTopic) + ; + + $meta = new QueueMetaRegistry($this->createDummyConfig(), [ + 'default' => [], + ]); + $driver = new RdKafkaDriver( $context, $this->createDummyConfig(), - $this->createDummyQueueMetaRegistry() + $meta ); $driver->setupBroker();