Skip to content

Kafka symfony transport #432

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
May 1, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/enqueue/Symfony/DefaultTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
use Enqueue\Gps\Symfony\GpsTransportFactory;
use Enqueue\Null\NullConnectionFactory;
use Enqueue\Null\Symfony\NullTransportFactory;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
use Enqueue\Redis\RedisConnectionFactory;
use Enqueue\Redis\Symfony\RedisTransportFactory;
use Enqueue\Sqs\SqsConnectionFactory;
Expand Down Expand Up @@ -209,6 +211,10 @@ private function findFactory($dsn)
return new StompTransportFactory('default_stomp');
}

if ($factory instanceof RdKafkaConnectionFactory) {
return new RdKafkaTransportFactory('default_kafka');
}

throw new \LogicException(sprintf(
'There is no supported transport factory for the connection factory "%s" created from DSN "%s"',
get_class($factory),
Expand Down
2 changes: 2 additions & 0 deletions pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -289,5 +289,7 @@ public static function provideDSNs()
yield ['redis:', 'default_redis'];

yield ['stomp:', 'default_stomp'];

yield ['kafka:', 'default_kafka'];
}
}
154 changes: 154 additions & 0 deletions pkg/rdkafka/Client/RdKafkaDriver.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
<?php

namespace Enqueue\RdKafka\Client;

use Enqueue\Client\Config;
use Enqueue\Client\DriverInterface;
use Enqueue\Client\Message;
use Enqueue\Client\Meta\QueueMetaRegistry;
use Enqueue\RdKafka\RdKafkaContext;
use Interop\Queue\PsrMessage;
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;

class RdKafkaDriver implements DriverInterface
{
/**
* @var RdKafkaContext
*/
private $context;

/**
* @var Config
*/
private $config;

/**
* @var QueueMetaRegistry
*/
private $queueMetaRegistry;

/**
* @param RdKafkaContext $context
* @param Config $config
* @param QueueMetaRegistry $queueMetaRegistry
*/
public function __construct(RdKafkaContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
{
$this->context = $context;
$this->config = $config;
$this->queueMetaRegistry = $queueMetaRegistry;
}

/**
* {@inheritdoc}
*/
public function createTransportMessage(Message $message)
{
$headers = $message->getHeaders();
$headers['content_type'] = $message->getContentType();

$transportMessage = $this->context->createMessage();
$transportMessage->setBody($message->getBody());
$transportMessage->setHeaders($headers);
$transportMessage->setProperties($message->getProperties());
$transportMessage->setMessageId($message->getMessageId());
$transportMessage->setTimestamp($message->getTimestamp());
$transportMessage->setReplyTo($message->getReplyTo());
$transportMessage->setCorrelationId($message->getCorrelationId());

return $transportMessage;
}

/**
* {@inheritdoc}
*/
public function createClientMessage(PsrMessage $message)
{
$clientMessage = new Message();
$clientMessage->setBody($message->getBody());
$clientMessage->setHeaders($message->getHeaders());
$clientMessage->setProperties($message->getProperties());

$clientMessage->setTimestamp($message->getTimestamp());
$clientMessage->setMessageId($message->getMessageId());
$clientMessage->setReplyTo($message->getReplyTo());
$clientMessage->setCorrelationId($message->getCorrelationId());

if ($contentType = $message->getHeader('content_type')) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does not have to be checked.
The default of the field $contentType is null and the default return value of $message->getHeader('content_type') is also null if the header content_type not set.

$clientMessage->setContentType($contentType);
}

return $clientMessage;
}

/**
* {@inheritdoc}
*/
public function sendToRouter(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
throw new \LogicException('Topic name parameter is required but is not set');
}

$topic = $this->createRouterTopic();
$transportMessage = $this->createTransportMessage($message);

$this->context->createProducer()->send($topic, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function sendToProcessor(Message $message)
{
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
throw new \LogicException('Processor name parameter is required but is not set');
}

if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
throw new \LogicException('Queue name parameter is required but is not set');
}

$transportMessage = $this->createTransportMessage($message);
$destination = $this->createQueue($queueName);

$this->context->createProducer()->send($destination, $transportMessage);
}

/**
* {@inheritdoc}
*/
public function createQueue($queueName)
{
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName();

return $this->context->createQueue($transportName);
}

/**
* {@inheritdoc}
*/
public function setupBroker(LoggerInterface $logger = null)
{
$logger = $logger ?: new NullLogger();
$logger->debug('[RdKafkaDriver] setup broker');
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure if this is going to be needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will, keep it.

}

/**
* {@inheritdoc}
*/
public function getConfig()
{
return $this->config;
}

private function createRouterTopic()
{
$topic = $this->context->createTopic(
$this->config->createTransportRouterTopicName($this->config->getRouterTopicName())
);

return $topic;
}
}
139 changes: 139 additions & 0 deletions pkg/rdkafka/Symfony/RdKafkaTransportFactory.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
<?php

namespace Enqueue\RdKafka\Symfony;

use Enqueue\RdKafka\Client\RdKafkaDriver;
use Enqueue\RdKafka\RdKafkaConnectionFactory;
use Enqueue\RdKafka\RdKafkaContext;
use Enqueue\Symfony\DriverFactoryInterface;
use Enqueue\Symfony\TransportFactoryInterface;
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
use Symfony\Component\DependencyInjection\ContainerBuilder;
use Symfony\Component\DependencyInjection\Definition;
use Symfony\Component\DependencyInjection\Reference;

class RdKafkaTransportFactory implements TransportFactoryInterface, DriverFactoryInterface
{
/**
* @var string
*/
private $name;

/**
* @param string $name
*/
public function __construct($name = 'rdkafka')
{
$this->name = $name;
}

/**
* {@inheritdoc}
*/
public function addConfiguration(ArrayNodeDefinition $builder)
{
$builder
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The entire configuration needs more work/checking

->beforeNormalization()
->ifString()
->then(function ($v) {
return ['dsn' => $v];
})
->end()
->fixXmlConfig('topic')
->children()
->scalarNode('dsn')
->info('The kafka DSN. Other parameters are ignored if set')
->end()
->arrayNode('global')
->children()
->scalarNode('metadata.broker.list')->end()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Symfony does not allow dots in configuration nodes but the librdkafka configuration keys do contain them, need to figure out how to maybe normalize them?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can define a variable node (which allows everything to be put in it).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why just do you not use underscores?

Copy link
Contributor Author

@dheineman dheineman Apr 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As far as i can tell even a variable node requires a name and will throw an exception when it contains any dots.

I can just remove the metadata.broker.list scalarNode and just leave the global ArrayNode. Any suggestions?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By default you can configure the broker over the config in this way:

enqueue:
    transport:
        default: 'kafka://localhost:9092'

So I think it is a good solution to provide a alternative config way like this:

enqueue:
    transport:
        kafka:
            metadata_broker_list: localhost:9092
            ...

The easier way is to use ->scalarNode('metadata_broker_list')->end()

Copy link
Member

@makasim makasim Apr 27, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest moving all the broker options to a dedicated configuration option. The option should be a variable node (no validation) like this one for example

Developers are free to put there whatever the broker supports.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another solution I found is to add a parser in the RdKafkaConnectionFactory class like the parseDsn method and translate the config properties where the dot is needed.

    public function __construct($config = 'kafka:')
    {
        if (empty($config) || 'kafka:' === $config) {
            $config = [];
        } elseif (is_string($config)) {
            $config = $this->parseDsn($config);
        } elseif (is_array($config)) {
            $config = $this->parseDots($config); // or some other naming ;)
        } else {
            throw new \LogicException('The config must be either an array of options, a DSN string or null');
        }

        $this->config = array_replace($this->defaultConfig(), $config);
    }

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another solution I found is to add a parser in the RdKafkaConnectionFactory class like the parseDsn method and translate the config properties where the dot is needed.

I am not in favor of it. If you use factory in plain php you can pass doted options. Symfony does not allow it so it has to be fixed there.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But is not the configuration to validate the better option?
I think it is a valid choice for a library like this that supports many different libraries to use a self defined config format and map it to the specific library formats.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went with the variable node for now as parsing or normalizing the configuration nodes would require maintaining a map of all rdkafka configuration properties (as it consists of keys with both dots and underscores). And i am not particularly in favour of this.

->end()
->end()
->arrayNode('topics')
->prototype('scalar')->end()
->end()
->scalarNode('dr_msq_cb')
->info('todo')
->end()
->scalarNode('error_cb')
->info('todo')
->end()
->scalarNode('rebalance_cb')
->info('todo')
->end()
->enumNode('partitioner')
->values(['RD_KAFKA_MSG_PARTITIONER_RANDOM', 'RD_KAFKA_MSG_PARTITIONER_CONSISTENT'])
->info('todo')
->end()
->scalarNode('log_level')
->info('todo')
->end()
->booleanNode('commit_async')
->defaultFalse()
->info('todo')
->end()
;
}

/**
* {@inheritdoc}
*/
public function createConnectionFactory(ContainerBuilder $container, array $config)
{
if (false == empty($config['rdkafka'])) {
$config['rdkafka'] = new Reference($config['rdkafka']);
}

$factory = new Definition(RdKafkaConnectionFactory::class);
$factory->setArguments([isset($config['dsn']) ? $config['dsn'] : $config]);

$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
$container->setDefinition($factoryId, $factory);

return $factoryId;
}

/**
* {@inheritdoc}
*/
public function createContext(ContainerBuilder $container, array $config)
{
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());

$context = new Definition(RdKafkaContext::class);
$context->setPublic(true);
$context->setFactory([new Reference($factoryId), 'createContext']);

$contextId = sprintf('enqueue.transport.%s.context', $this->getName());
$container->setDefinition($contextId, $context);

return $contextId;
}

/**
* {@inheritdoc}
*/
public function createDriver(ContainerBuilder $container, array $config)
{
$driver = new Definition(RdKafkaDriver::class);
$driver->setPublic(true);
$driver->setArguments([
new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
new Reference('enqueue.client.config'),
new Reference('enqueue.client.meta.queue_meta_registry'),
]);

$driverId = sprintf('enqueue.client.%s.driver', $this->getName());
$container->setDefinition($driverId, $driver);

return $driverId;
}

/**
* @return string
*/
public function getName()
{
return $this->name;
}
}
Loading