-
Notifications
You must be signed in to change notification settings - Fork 442
Kafka symfony transport #432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 7 commits
4912bbe
d8d51e6
728b3d5
c0be066
86b56d8
161418a
311abf9
81c23e0
23f2038
ea2a998
a4becbe
d636cab
2eaed0f
574f96b
b9464ec
b97bc53
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
<?php | ||
|
||
namespace Enqueue\RdKafka\Client; | ||
|
||
use Enqueue\Client\Config; | ||
use Enqueue\Client\DriverInterface; | ||
use Enqueue\Client\Message; | ||
use Enqueue\Client\Meta\QueueMetaRegistry; | ||
use Enqueue\RdKafka\RdKafkaContext; | ||
use Interop\Queue\PsrMessage; | ||
use Psr\Log\LoggerInterface; | ||
use Psr\Log\NullLogger; | ||
|
||
class RdKafkaDriver implements DriverInterface | ||
{ | ||
/** | ||
* @var RdKafkaContext | ||
*/ | ||
private $context; | ||
|
||
/** | ||
* @var Config | ||
*/ | ||
private $config; | ||
|
||
/** | ||
* @var QueueMetaRegistry | ||
*/ | ||
private $queueMetaRegistry; | ||
|
||
/** | ||
* @param RdKafkaContext $context | ||
* @param Config $config | ||
* @param QueueMetaRegistry $queueMetaRegistry | ||
*/ | ||
public function __construct(RdKafkaContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry) | ||
{ | ||
$this->context = $context; | ||
$this->config = $config; | ||
$this->queueMetaRegistry = $queueMetaRegistry; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createTransportMessage(Message $message) | ||
{ | ||
$headers = $message->getHeaders(); | ||
$headers['content_type'] = $message->getContentType(); | ||
|
||
$transportMessage = $this->context->createMessage(); | ||
$transportMessage->setBody($message->getBody()); | ||
$transportMessage->setHeaders($headers); | ||
$transportMessage->setProperties($message->getProperties()); | ||
$transportMessage->setMessageId($message->getMessageId()); | ||
$transportMessage->setTimestamp($message->getTimestamp()); | ||
$transportMessage->setReplyTo($message->getReplyTo()); | ||
$transportMessage->setCorrelationId($message->getCorrelationId()); | ||
|
||
return $transportMessage; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createClientMessage(PsrMessage $message) | ||
{ | ||
$clientMessage = new Message(); | ||
$clientMessage->setBody($message->getBody()); | ||
$clientMessage->setHeaders($message->getHeaders()); | ||
$clientMessage->setProperties($message->getProperties()); | ||
|
||
$clientMessage->setTimestamp($message->getTimestamp()); | ||
$clientMessage->setMessageId($message->getMessageId()); | ||
$clientMessage->setReplyTo($message->getReplyTo()); | ||
$clientMessage->setCorrelationId($message->getCorrelationId()); | ||
|
||
if ($contentType = $message->getHeader('content_type')) { | ||
$clientMessage->setContentType($contentType); | ||
} | ||
|
||
return $clientMessage; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function sendToRouter(Message $message) | ||
{ | ||
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) { | ||
throw new \LogicException('Topic name parameter is required but is not set'); | ||
} | ||
|
||
$topic = $this->createRouterTopic(); | ||
$transportMessage = $this->createTransportMessage($message); | ||
|
||
$this->context->createProducer()->send($topic, $transportMessage); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function sendToProcessor(Message $message) | ||
{ | ||
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) { | ||
throw new \LogicException('Processor name parameter is required but is not set'); | ||
} | ||
|
||
if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) { | ||
throw new \LogicException('Queue name parameter is required but is not set'); | ||
} | ||
|
||
$transportMessage = $this->createTransportMessage($message); | ||
$destination = $this->createQueue($queueName); | ||
|
||
$this->context->createProducer()->send($destination, $transportMessage); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createQueue($queueName) | ||
{ | ||
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName(); | ||
|
||
return $this->context->createQueue($transportName); | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function setupBroker(LoggerInterface $logger = null) | ||
{ | ||
$logger = $logger ?: new NullLogger(); | ||
$logger->debug('[RdKafkaDriver] setup broker'); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not sure if this is going to be needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will, keep it. |
||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function getConfig() | ||
{ | ||
return $this->config; | ||
} | ||
|
||
private function createRouterTopic() | ||
{ | ||
$topic = $this->context->createTopic( | ||
$this->config->createTransportRouterTopicName($this->config->getRouterTopicName()) | ||
); | ||
|
||
return $topic; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,139 @@ | ||
<?php | ||
|
||
namespace Enqueue\RdKafka\Symfony; | ||
|
||
use Enqueue\RdKafka\Client\RdKafkaDriver; | ||
use Enqueue\RdKafka\RdKafkaConnectionFactory; | ||
use Enqueue\RdKafka\RdKafkaContext; | ||
use Enqueue\Symfony\DriverFactoryInterface; | ||
use Enqueue\Symfony\TransportFactoryInterface; | ||
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition; | ||
use Symfony\Component\DependencyInjection\ContainerBuilder; | ||
use Symfony\Component\DependencyInjection\Definition; | ||
use Symfony\Component\DependencyInjection\Reference; | ||
|
||
class RdKafkaTransportFactory implements TransportFactoryInterface, DriverFactoryInterface | ||
{ | ||
/** | ||
* @var string | ||
*/ | ||
private $name; | ||
|
||
/** | ||
* @param string $name | ||
*/ | ||
public function __construct($name = 'rdkafka') | ||
{ | ||
$this->name = $name; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function addConfiguration(ArrayNodeDefinition $builder) | ||
{ | ||
$builder | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The entire configuration needs more work/checking |
||
->beforeNormalization() | ||
->ifString() | ||
->then(function ($v) { | ||
return ['dsn' => $v]; | ||
}) | ||
->end() | ||
->fixXmlConfig('topic') | ||
->children() | ||
->scalarNode('dsn') | ||
->info('The kafka DSN. Other parameters are ignored if set') | ||
->end() | ||
->arrayNode('global') | ||
->children() | ||
->scalarNode('metadata.broker.list')->end() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Symfony does not allow dots in configuration nodes but the librdkafka configuration keys do contain them, need to figure out how to maybe normalize them? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. you can define a variable node (which allows everything to be put in it). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why just do you not use underscores? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as i can tell even a variable node requires a name and will throw an exception when it contains any dots. I can just remove the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By default you can configure the broker over the config in this way:
So I think it is a good solution to provide a alternative config way like this:
The easier way is to use There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd suggest moving all the broker options to a dedicated configuration option. The option should be a variable node (no validation) like this one for example Developers are free to put there whatever the broker supports. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another solution I found is to add a parser in the RdKafkaConnectionFactory class like the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I am not in favor of it. If you use factory in plain php you can pass doted options. Symfony does not allow it so it has to be fixed there. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But is not the configuration to validate the better option? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I went with the variable node for now as parsing or normalizing the configuration nodes would require maintaining a map of all rdkafka configuration properties (as it consists of keys with both dots and underscores). And i am not particularly in favour of this. |
||
->end() | ||
->end() | ||
->arrayNode('topics') | ||
->prototype('scalar')->end() | ||
->end() | ||
->scalarNode('dr_msq_cb') | ||
->info('todo') | ||
->end() | ||
->scalarNode('error_cb') | ||
->info('todo') | ||
->end() | ||
->scalarNode('rebalance_cb') | ||
->info('todo') | ||
->end() | ||
->enumNode('partitioner') | ||
->values(['RD_KAFKA_MSG_PARTITIONER_RANDOM', 'RD_KAFKA_MSG_PARTITIONER_CONSISTENT']) | ||
->info('todo') | ||
->end() | ||
->scalarNode('log_level') | ||
->info('todo') | ||
->end() | ||
->booleanNode('commit_async') | ||
->defaultFalse() | ||
->info('todo') | ||
->end() | ||
; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createConnectionFactory(ContainerBuilder $container, array $config) | ||
{ | ||
if (false == empty($config['rdkafka'])) { | ||
$config['rdkafka'] = new Reference($config['rdkafka']); | ||
} | ||
|
||
$factory = new Definition(RdKafkaConnectionFactory::class); | ||
$factory->setArguments([isset($config['dsn']) ? $config['dsn'] : $config]); | ||
|
||
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); | ||
$container->setDefinition($factoryId, $factory); | ||
|
||
return $factoryId; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createContext(ContainerBuilder $container, array $config) | ||
{ | ||
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName()); | ||
|
||
$context = new Definition(RdKafkaContext::class); | ||
$context->setPublic(true); | ||
$context->setFactory([new Reference($factoryId), 'createContext']); | ||
|
||
$contextId = sprintf('enqueue.transport.%s.context', $this->getName()); | ||
$container->setDefinition($contextId, $context); | ||
|
||
return $contextId; | ||
} | ||
|
||
/** | ||
* {@inheritdoc} | ||
*/ | ||
public function createDriver(ContainerBuilder $container, array $config) | ||
{ | ||
$driver = new Definition(RdKafkaDriver::class); | ||
$driver->setPublic(true); | ||
$driver->setArguments([ | ||
new Reference(sprintf('enqueue.transport.%s.context', $this->getName())), | ||
new Reference('enqueue.client.config'), | ||
new Reference('enqueue.client.meta.queue_meta_registry'), | ||
]); | ||
|
||
$driverId = sprintf('enqueue.client.%s.driver', $this->getName()); | ||
$container->setDefinition($driverId, $driver); | ||
|
||
return $driverId; | ||
} | ||
|
||
/** | ||
* @return string | ||
*/ | ||
public function getName() | ||
{ | ||
return $this->name; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not have to be checked.
The default of the field
$contentType
is null and the default return value of$message->getHeader('content_type')
is also null if the headercontent_type
not set.