diff --git a/pkg/enqueue-bundle/EnqueueBundle.php b/pkg/enqueue-bundle/EnqueueBundle.php index e007b97dc..a62f2db2b 100644 --- a/pkg/enqueue-bundle/EnqueueBundle.php +++ b/pkg/enqueue-bundle/EnqueueBundle.php @@ -21,6 +21,8 @@ use Enqueue\Fs\Symfony\FsTransportFactory; use Enqueue\Gps\GpsConnectionFactory; use Enqueue\Gps\Symfony\GpsTransportFactory; +use Enqueue\RdKafka\RdKafkaConnectionFactory; +use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory; use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Redis\Symfony\RedisTransportFactory; use Enqueue\Sqs\SqsConnectionFactory; @@ -104,6 +106,12 @@ class_exists(AmqpLibConnectionFactory::class) $extension->setTransportFactory(new MissingTransportFactory('gps', ['enqueue/gps'])); } + if (class_exists(RdKafkaConnectionFactory::class)) { + $extension->setTransportFactory(new RdKafkaTransportFactory('rdkafka')); + } else { + $extension->setTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka'])); + } + $container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); $container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100); } diff --git a/pkg/enqueue/Symfony/DefaultTransportFactory.php b/pkg/enqueue/Symfony/DefaultTransportFactory.php index 20fa1c91a..65b877a30 100644 --- a/pkg/enqueue/Symfony/DefaultTransportFactory.php +++ b/pkg/enqueue/Symfony/DefaultTransportFactory.php @@ -10,6 +10,8 @@ use Enqueue\Gps\Symfony\GpsTransportFactory; use Enqueue\Null\NullConnectionFactory; use Enqueue\Null\Symfony\NullTransportFactory; +use Enqueue\RdKafka\RdKafkaConnectionFactory; +use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory; use Enqueue\Redis\RedisConnectionFactory; use Enqueue\Redis\Symfony\RedisTransportFactory; use Enqueue\Sqs\SqsConnectionFactory; @@ -209,6 +211,10 @@ private function findFactory($dsn) return new StompTransportFactory('default_stomp'); } + if ($factory instanceof RdKafkaConnectionFactory) { + return new RdKafkaTransportFactory('default_kafka'); + } + throw new \LogicException(sprintf( 'There is no supported transport factory for the connection factory "%s" created from DSN "%s"', get_class($factory), diff --git a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php index 9a013fdb9..8f2df545b 100644 --- a/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php +++ b/pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php @@ -289,5 +289,7 @@ public static function provideDSNs() yield ['redis:', 'default_redis']; yield ['stomp:', 'default_stomp']; + + yield ['kafka:', 'default_kafka']; } } diff --git a/pkg/rdkafka/Client/RdKafkaDriver.php b/pkg/rdkafka/Client/RdKafkaDriver.php new file mode 100644 index 000000000..416725366 --- /dev/null +++ b/pkg/rdkafka/Client/RdKafkaDriver.php @@ -0,0 +1,167 @@ +context = $context; + $this->config = $config; + $this->queueMetaRegistry = $queueMetaRegistry; + } + + /** + * {@inheritdoc} + */ + public function createTransportMessage(Message $message) + { + $headers = $message->getHeaders(); + $headers['content_type'] = $message->getContentType(); + + $transportMessage = $this->context->createMessage(); + $transportMessage->setBody($message->getBody()); + $transportMessage->setHeaders($headers); + $transportMessage->setProperties($message->getProperties()); + $transportMessage->setMessageId($message->getMessageId()); + $transportMessage->setTimestamp($message->getTimestamp()); + $transportMessage->setReplyTo($message->getReplyTo()); + $transportMessage->setCorrelationId($message->getCorrelationId()); + + return $transportMessage; + } + + /** + * {@inheritdoc} + */ + public function createClientMessage(PsrMessage $message) + { + $clientMessage = new Message(); + $clientMessage->setBody($message->getBody()); + $clientMessage->setHeaders($message->getHeaders()); + $clientMessage->setProperties($message->getProperties()); + + $clientMessage->setContentType($message->getHeader('content_type')); + + $clientMessage->setTimestamp($message->getTimestamp()); + $clientMessage->setMessageId($message->getMessageId()); + $clientMessage->setReplyTo($message->getReplyTo()); + $clientMessage->setCorrelationId($message->getCorrelationId()); + + return $clientMessage; + } + + /** + * {@inheritdoc} + */ + public function sendToRouter(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { + throw new \LogicException('Topic name parameter is required but is not set'); + } + + $topic = $this->createRouterTopic(); + $transportMessage = $this->createTransportMessage($message); + + $this->context->createProducer()->send($topic, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function sendToProcessor(Message $message) + { + if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { + throw new \LogicException('Processor name parameter is required but is not set'); + } + + if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { + throw new \LogicException('Queue name parameter is required but is not set'); + } + + $transportMessage = $this->createTransportMessage($message); + $destination = $this->createQueue($queueName); + + $this->context->createProducer()->send($destination, $transportMessage); + } + + /** + * {@inheritdoc} + */ + public function createQueue($queueName) + { + $transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName(); + + return $this->context->createQueue($transportName); + } + + /** + * {@inheritdoc} + */ + public function setupBroker(LoggerInterface $logger = null) + { + $logger = $logger ?: new NullLogger(); + $logger->debug('[RdKafkaDriver] setup broker'); + $log = function ($text, ...$args) use ($logger) { + $logger->debug(sprintf('[RdKafkaDriver] '.$text, ...$args)); + }; + + // setup router + $routerQueue = $this->createQueue($this->config->getRouterQueueName()); + $log('Create router queue: %s', $routerQueue->getQueueName()); + $this->context->createConsumer($routerQueue); + + // setup queues + foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) { + $queue = $this->createQueue($meta->getClientName()); + $log('Create processor queue: %s', $queue->getQueueName()); + $this->context->createConsumer($queue); + } + } + + /** + * {@inheritdoc} + */ + public function getConfig() + { + return $this->config; + } + + private function createRouterTopic() + { + $topic = $this->context->createTopic( + $this->config->createTransportRouterTopicName($this->config->getRouterTopicName()) + ); + + return $topic; + } +} diff --git a/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php new file mode 100644 index 000000000..f03971fba --- /dev/null +++ b/pkg/rdkafka/Symfony/RdKafkaTransportFactory.php @@ -0,0 +1,139 @@ +name = $name; + } + + /** + * {@inheritdoc} + */ + public function addConfiguration(ArrayNodeDefinition $builder) + { + $builder + ->beforeNormalization() + ->ifString() + ->then(function ($v) { + return ['dsn' => $v]; + }) + ->end() + ->children() + ->scalarNode('dsn') + ->info('The kafka DSN. Other parameters are ignored if set') + ->end() + ->variableNode('global') + ->defaultValue([]) + ->info('The kafka global configuration properties') + ->end() + ->variableNode('topic') + ->defaultValue([]) + ->info('The kafka topic configuration properties') + ->end() + ->scalarNode('dr_msg_cb') + ->info('Delivery report callback') + ->end() + ->scalarNode('error_cb') + ->info('Error callback') + ->end() + ->scalarNode('rebalance_cb') + ->info('Called after consumer group has been rebalanced') + ->end() + ->enumNode('partitioner') + ->values(['RD_KAFKA_MSG_PARTITIONER_RANDOM', 'RD_KAFKA_MSG_PARTITIONER_CONSISTENT']) + ->info('Which partitioner to use') + ->end() + ->integerNode('log_level') + ->info('Logging level (syslog(3) levels)') + ->min(0)->max(7) + ->end() + ->booleanNode('commit_async') + ->defaultFalse() + ->info('Commit asynchronous') + ->end() + ; + } + + /** + * {@inheritdoc} + */ + public function createConnectionFactory(ContainerBuilder $container, array $config) + { + if (false == empty($config['rdkafka'])) { + $config['rdkafka'] = new Reference($config['rdkafka']); + } + + $factory = new Definition(RdKafkaConnectionFactory::class); + $factory->setArguments([isset($config['dsn']) ? $config['dsn'] : $config]); + + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + $container->setDefinition($factoryId, $factory); + + return $factoryId; + } + + /** + * {@inheritdoc} + */ + public function createContext(ContainerBuilder $container, array $config) + { + $factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); + + $context = new Definition(RdKafkaContext::class); + $context->setPublic(true); + $context->setFactory([new Reference($factoryId), 'createContext']); + + $contextId = sprintf('enqueue.transport.%s.context', $this->getName()); + $container->setDefinition($contextId, $context); + + return $contextId; + } + + /** + * {@inheritdoc} + */ + public function createDriver(ContainerBuilder $container, array $config) + { + $driver = new Definition(RdKafkaDriver::class); + $driver->setPublic(true); + $driver->setArguments([ + new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), + new Reference('enqueue.client.config'), + new Reference('enqueue.client.meta.queue_meta_registry'), + ]); + + $driverId = sprintf('enqueue.client.%s.driver', $this->getName()); + $container->setDefinition($driverId, $driver); + + return $driverId; + } + + /** + * @return string + */ + public function getName() + { + return $this->name; + } +} diff --git a/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php new file mode 100644 index 000000000..c0cfab616 --- /dev/null +++ b/pkg/rdkafka/Tests/Client/RdKafkaDriverTest.php @@ -0,0 +1,365 @@ +assertClassImplements(DriverInterface::class, RdKafkaDriver::class); + } + + public function testCouldBeConstructedWithRequiredArguments() + { + new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + } + + public function testShouldReturnConfigObject() + { + $config = $this->createDummyConfig(); + + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $config, + $this->createDummyQueueMetaRegistry() + ); + + $this->assertSame($config, $driver->getConfig()); + } + + public function testShouldCreateAndReturnQueueInstance() + { + $expectedQueue = new RdKafkaTopic('aName'); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('aprefix.afooqueue') + ->willReturn($expectedQueue) + ; + + $driver = new RdKafkaDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); + + $queue = $driver->createQueue('aFooQueue'); + + $this->assertSame($expectedQueue, $queue); + } + + public function testShouldCreateAndReturnQueueInstanceWithHardcodedTransportName() + { + $expectedQueue = new RdKafkaTopic('aName'); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->with('aBarQueue') + ->willReturn($expectedQueue) + ; + + $driver = new RdKafkaDriver($context, $this->createDummyConfig(), $this->createDummyQueueMetaRegistry()); + + $queue = $driver->createQueue('aBarQueue'); + + $this->assertSame($expectedQueue, $queue); + } + + public function testShouldConvertTransportMessageToClientMessage() + { + $transportMessage = new RdKafkaMessage(); + $transportMessage->setBody('body'); + $transportMessage->setHeaders(['hkey' => 'hval']); + $transportMessage->setProperties(['key' => 'val']); + $transportMessage->setHeader('content_type', 'ContentType'); + $transportMessage->setMessageId('MessageId'); + $transportMessage->setTimestamp(1000); + + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $clientMessage = $driver->createClientMessage($transportMessage); + + $this->assertInstanceOf(Message::class, $clientMessage); + $this->assertSame('body', $clientMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + ], $clientMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $clientMessage->getProperties()); + $this->assertSame('MessageId', $clientMessage->getMessageId()); + $this->assertSame('ContentType', $clientMessage->getContentType()); + $this->assertSame(1000, $clientMessage->getTimestamp()); + + $this->assertNull($clientMessage->getExpire()); + } + + public function testShouldConvertClientMessageToTransportMessage() + { + $clientMessage = new Message(); + $clientMessage->setBody('body'); + $clientMessage->setHeaders(['hkey' => 'hval']); + $clientMessage->setProperties(['key' => 'val']); + $clientMessage->setContentType('ContentType'); + $clientMessage->setExpire(123); + $clientMessage->setMessageId('MessageId'); + $clientMessage->setTimestamp(1000); + + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn(new RdKafkaMessage()) + ; + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $transportMessage = $driver->createTransportMessage($clientMessage); + + $this->assertInstanceOf(RdKafkaMessage::class, $transportMessage); + $this->assertSame('body', $transportMessage->getBody()); + $this->assertSame([ + 'hkey' => 'hval', + 'content_type' => 'ContentType', + 'message_id' => 'MessageId', + 'timestamp' => 1000, + 'reply_to' => null, + 'correlation_id' => '', + ], $transportMessage->getHeaders()); + $this->assertSame([ + 'key' => 'val', + ], $transportMessage->getProperties()); + $this->assertSame('MessageId', $transportMessage->getMessageId()); + $this->assertSame(1000, $transportMessage->getTimestamp()); + } + + public function testShouldSendMessageToRouter() + { + $topic = new RdKafkaTopic('queue-name'); + $transportMessage = new RdKafkaMessage(); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($topic), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createTopic') + ->with('aprefix.router') + ->willReturn($topic) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_TOPIC_NAME, 'topic'); + + $driver->sendToRouter($message); + } + + public function testShouldThrowExceptionIfTopicParameterIsNotSet() + { + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Topic name parameter is required but is not set'); + + $driver->sendToRouter(new Message()); + } + + public function testShouldSendMessageToProcessor() + { + $queue = new RdKafkaTopic('queue-name'); + $transportMessage = new RdKafkaMessage(); + + $producer = $this->createPsrProducerMock(); + $producer + ->expects($this->once()) + ->method('send') + ->with($this->identicalTo($queue), $this->identicalTo($transportMessage)) + ; + $context = $this->createPsrContextMock(); + $context + ->expects($this->once()) + ->method('createQueue') + ->willReturn($queue) + ; + $context + ->expects($this->once()) + ->method('createProducer') + ->willReturn($producer) + ; + $context + ->expects($this->once()) + ->method('createMessage') + ->willReturn($transportMessage) + ; + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + $message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, 'aFooQueue'); + + $driver->sendToProcessor($message); + } + + public function testShouldThrowExceptionIfProcessorNameParameterIsNotSet() + { + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Processor name parameter is required but is not set'); + + $driver->sendToProcessor(new Message()); + } + + public function testShouldThrowExceptionIfProcessorQueueNameParameterIsNotSet() + { + $driver = new RdKafkaDriver( + $this->createPsrContextMock(), + $this->createDummyConfig(), + $this->createDummyQueueMetaRegistry() + ); + + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('Queue name parameter is required but is not set'); + + $message = new Message(); + $message->setProperty(Config::PARAMETER_PROCESSOR_NAME, 'processor'); + + $driver->sendToProcessor($message); + } + + public function testShouldSetupBroker() + { + $routerTopic = new RdKafkaTopic(''); + $routerQueue = new RdKafkaTopic(''); + + $processorTopic = new RdKafkaTopic(''); + + $context = $this->createPsrContextMock(); + + $context + ->expects($this->at(0)) + ->method('createQueue') + ->willReturn($routerTopic) + ; + $context + ->expects($this->at(1)) + ->method('createQueue') + ->willReturn($routerQueue) + ; + $context + ->expects($this->at(2)) + ->method('createQueue') + ->willReturn($processorTopic) + ; + + $meta = new QueueMetaRegistry($this->createDummyConfig(), [ + 'default' => [], + ]); + + $driver = new RdKafkaDriver( + $context, + $this->createDummyConfig(), + $meta + ); + + $driver->setupBroker(); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|RdKafkaContext + */ + private function createPsrContextMock() + { + return $this->createMock(RdKafkaContext::class); + } + + /** + * @return \PHPUnit_Framework_MockObject_MockObject|PsrProducer + */ + private function createPsrProducerMock() + { + return $this->createMock(PsrProducer::class); + } + + /** + * @return QueueMetaRegistry + */ + private function createDummyQueueMetaRegistry() + { + $registry = new QueueMetaRegistry($this->createDummyConfig(), []); + $registry->add('default'); + $registry->add('aFooQueue'); + $registry->add('aBarQueue', 'aBarQueue'); + + return $registry; + } + + /** + * @return Config + */ + private function createDummyConfig() + { + return Config::create('aPrefix'); + } +} diff --git a/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php new file mode 100644 index 000000000..d3a6a5dab --- /dev/null +++ b/pkg/rdkafka/Tests/RdKafkaConnectionFactoryConfigTest.php @@ -0,0 +1,92 @@ +expectException(\LogicException::class); + $this->expectExceptionMessage('The config must be either an array of options, a DSN string or null'); + + new RdKafkaConnectionFactory(new \stdClass()); + } + + public function testThrowIfSchemeIsNotSupported() + { + $this->expectException(\LogicException::class); + $this->expectExceptionMessage('The given DSN scheme "http" is not supported. Could be "kafka" only.'); + + new RdKafkaConnectionFactory('http://example.com'); + } + + /** + * @dataProvider provideConfigs + * + * @param mixed $config + * @param mixed $expectedConfig + */ + public function testShouldParseConfigurationAsExpected($config, $expectedConfig) + { + $factory = new RdKafkaConnectionFactory($config); + + $config = $this->getObjectAttribute($factory, 'config'); + + $this->assertNotEmpty($config['global']['group.id']); + + $config['global']['group.id'] = 'group-id'; + $this->assertSame($expectedConfig, $config); + } + + public static function provideConfigs() + { + yield [ + null, + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'localhost:9092', + ], + ], + ]; + + yield [ + 'kafka:', + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'localhost:9092', + ], + ], + ]; + + yield [ + 'kafka://user:pass@host:10000/db', + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'host:10000', + ], + ], + ]; + + yield [ + [], + [ + 'global' => [ + 'group.id' => 'group-id', + 'metadata.broker.list' => 'localhost:9092', + ], + ], + ]; + } +} diff --git a/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php new file mode 100644 index 000000000..950034205 --- /dev/null +++ b/pkg/rdkafka/Tests/Symfony/RdKafkaTransportFactoryTest.php @@ -0,0 +1,148 @@ +assertClassImplements(TransportFactoryInterface::class, RdKafkaTransportFactory::class); + } + + public function testCouldBeConstructedWithDefaultName() + { + $transport = new RdKafkaTransportFactory(); + + $this->assertEquals('rdkafka', $transport->getName()); + } + + public function testCouldBeConstructedWithCustomName() + { + $transport = new RdKafkaTransportFactory('theCustomName'); + + $this->assertEquals('theCustomName', $transport->getName()); + } + + public function testShouldAllowAddConfiguration() + { + $transport = new RdKafkaTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), [[ + ]]); + + $this->assertEquals([ + 'topic' => [], + 'commit_async' => false, + 'global' => [], + ], $config); + } + + public function testShouldAllowAddConfigurationAsString() + { + $transport = new RdKafkaTransportFactory(); + $tb = new TreeBuilder(); + $rootNode = $tb->root('foo'); + + $transport->addConfiguration($rootNode); + $processor = new Processor(); + $config = $processor->process($tb->buildTree(), ['kafkaDSN']); + + $this->assertEquals([ + 'dsn' => 'kafkaDSN', + 'topic' => [], + 'commit_async' => false, + 'global' => [], + ], $config); + } + + public function testShouldCreateConnectionFactory() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(RdKafkaConnectionFactory::class, $factory->getClass()); + $this->assertSame([[ + ]], $factory->getArguments()); + } + + public function testShouldCreateConnectionFactoryFromDsnString() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createConnectionFactory($container, [ + 'dsn' => 'theKafkaDSN', + ]); + + $this->assertTrue($container->hasDefinition($serviceId)); + $factory = $container->getDefinition($serviceId); + $this->assertEquals(RdKafkaConnectionFactory::class, $factory->getClass()); + $this->assertSame(['theKafkaDSN'], $factory->getArguments()); + } + + public function testShouldCreateContext() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createContext($container, [ + ]); + + $this->assertEquals('enqueue.transport.rdkafka.context', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $context = $container->getDefinition('enqueue.transport.rdkafka.context'); + $this->assertInstanceOf(Reference::class, $context->getFactory()[0]); + $this->assertEquals('enqueue.transport.rdkafka.connection_factory', (string) $context->getFactory()[0]); + $this->assertEquals('createContext', $context->getFactory()[1]); + } + + public function testShouldCreateDriver() + { + $container = new ContainerBuilder(); + + $transport = new RdKafkaTransportFactory(); + + $serviceId = $transport->createDriver($container, []); + + $this->assertEquals('enqueue.client.rdkafka.driver', $serviceId); + $this->assertTrue($container->hasDefinition($serviceId)); + + $driver = $container->getDefinition($serviceId); + $this->assertSame(RdKafkaDriver::class, $driver->getClass()); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(0)); + $this->assertEquals('enqueue.transport.rdkafka.context', (string) $driver->getArgument(0)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(1)); + $this->assertEquals('enqueue.client.config', (string) $driver->getArgument(1)); + + $this->assertInstanceOf(Reference::class, $driver->getArgument(2)); + $this->assertEquals('enqueue.client.meta.queue_meta_registry', (string) $driver->getArgument(2)); + } +} diff --git a/pkg/rdkafka/composer.json b/pkg/rdkafka/composer.json index 5339cd980..09909b592 100644 --- a/pkg/rdkafka/composer.json +++ b/pkg/rdkafka/composer.json @@ -16,7 +16,9 @@ "enqueue/enqueue": "^0.8@dev", "enqueue/null": "^0.8@dev", "queue-interop/queue-spec": "^0.5.3@dev", - "kwn/php-rdkafka-stubs": "^1.0.2" + "kwn/php-rdkafka-stubs": "^1.0.2", + "symfony/dependency-injection": "^2.8|^3|^4", + "symfony/config": "^2.8|^3|^4" }, "support": { "email": "opensource@forma-pro.com",