Skip to content

Commit 6b4bbb2

Browse files
authored
Merge pull request #432 from dheineman/kafka-symfony-transport
Kafka symfony transport
2 parents a8ccaaf + b10c626 commit 6b4bbb2

9 files changed

+930
-1
lines changed

pkg/enqueue-bundle/EnqueueBundle.php

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
use Enqueue\Fs\Symfony\FsTransportFactory;
2222
use Enqueue\Gps\GpsConnectionFactory;
2323
use Enqueue\Gps\Symfony\GpsTransportFactory;
24+
use Enqueue\RdKafka\RdKafkaConnectionFactory;
25+
use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
2426
use Enqueue\Redis\RedisConnectionFactory;
2527
use Enqueue\Redis\Symfony\RedisTransportFactory;
2628
use Enqueue\Sqs\SqsConnectionFactory;
@@ -104,6 +106,12 @@ class_exists(AmqpLibConnectionFactory::class)
104106
$extension->setTransportFactory(new MissingTransportFactory('gps', ['enqueue/gps']));
105107
}
106108

109+
if (class_exists(RdKafkaConnectionFactory::class)) {
110+
$extension->setTransportFactory(new RdKafkaTransportFactory('rdkafka'));
111+
} else {
112+
$extension->setTransportFactory(new MissingTransportFactory('rdkafka', ['enqueue/rdkafka']));
113+
}
114+
107115
$container->addCompilerPass(new AsyncEventsPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
108116
$container->addCompilerPass(new AsyncTransformersPass(), PassConfig::TYPE_BEFORE_OPTIMIZATION, 100);
109117
}

pkg/enqueue/Symfony/DefaultTransportFactory.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
use Enqueue\Gps\Symfony\GpsTransportFactory;
1111
use Enqueue\Null\NullConnectionFactory;
1212
use Enqueue\Null\Symfony\NullTransportFactory;
13+
use Enqueue\RdKafka\RdKafkaConnectionFactory;
14+
use Enqueue\RdKafka\Symfony\RdKafkaTransportFactory;
1315
use Enqueue\Redis\RedisConnectionFactory;
1416
use Enqueue\Redis\Symfony\RedisTransportFactory;
1517
use Enqueue\Sqs\SqsConnectionFactory;
@@ -209,6 +211,10 @@ private function findFactory($dsn)
209211
return new StompTransportFactory('default_stomp');
210212
}
211213

214+
if ($factory instanceof RdKafkaConnectionFactory) {
215+
return new RdKafkaTransportFactory('default_kafka');
216+
}
217+
212218
throw new \LogicException(sprintf(
213219
'There is no supported transport factory for the connection factory "%s" created from DSN "%s"',
214220
get_class($factory),

pkg/enqueue/Tests/Symfony/DefaultTransportFactoryTest.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -289,5 +289,7 @@ public static function provideDSNs()
289289
yield ['redis:', 'default_redis'];
290290

291291
yield ['stomp:', 'default_stomp'];
292+
293+
yield ['kafka:', 'default_kafka'];
292294
}
293295
}

pkg/rdkafka/Client/RdKafkaDriver.php

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
<?php
2+
3+
namespace Enqueue\RdKafka\Client;
4+
5+
use Enqueue\Client\Config;
6+
use Enqueue\Client\DriverInterface;
7+
use Enqueue\Client\Message;
8+
use Enqueue\Client\Meta\QueueMetaRegistry;
9+
use Enqueue\RdKafka\RdKafkaContext;
10+
use Interop\Queue\PsrMessage;
11+
use Psr\Log\LoggerInterface;
12+
use Psr\Log\NullLogger;
13+
14+
class RdKafkaDriver implements DriverInterface
15+
{
16+
/**
17+
* @var RdKafkaContext
18+
*/
19+
private $context;
20+
21+
/**
22+
* @var Config
23+
*/
24+
private $config;
25+
26+
/**
27+
* @var QueueMetaRegistry
28+
*/
29+
private $queueMetaRegistry;
30+
31+
/**
32+
* @param RdKafkaContext $context
33+
* @param Config $config
34+
* @param QueueMetaRegistry $queueMetaRegistry
35+
*/
36+
public function __construct(RdKafkaContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry)
37+
{
38+
$this->context = $context;
39+
$this->config = $config;
40+
$this->queueMetaRegistry = $queueMetaRegistry;
41+
}
42+
43+
/**
44+
* {@inheritdoc}
45+
*/
46+
public function createTransportMessage(Message $message)
47+
{
48+
$headers = $message->getHeaders();
49+
$headers['content_type'] = $message->getContentType();
50+
51+
$transportMessage = $this->context->createMessage();
52+
$transportMessage->setBody($message->getBody());
53+
$transportMessage->setHeaders($headers);
54+
$transportMessage->setProperties($message->getProperties());
55+
$transportMessage->setMessageId($message->getMessageId());
56+
$transportMessage->setTimestamp($message->getTimestamp());
57+
$transportMessage->setReplyTo($message->getReplyTo());
58+
$transportMessage->setCorrelationId($message->getCorrelationId());
59+
60+
return $transportMessage;
61+
}
62+
63+
/**
64+
* {@inheritdoc}
65+
*/
66+
public function createClientMessage(PsrMessage $message)
67+
{
68+
$clientMessage = new Message();
69+
$clientMessage->setBody($message->getBody());
70+
$clientMessage->setHeaders($message->getHeaders());
71+
$clientMessage->setProperties($message->getProperties());
72+
73+
$clientMessage->setContentType($message->getHeader('content_type'));
74+
75+
$clientMessage->setTimestamp($message->getTimestamp());
76+
$clientMessage->setMessageId($message->getMessageId());
77+
$clientMessage->setReplyTo($message->getReplyTo());
78+
$clientMessage->setCorrelationId($message->getCorrelationId());
79+
80+
return $clientMessage;
81+
}
82+
83+
/**
84+
* {@inheritdoc}
85+
*/
86+
public function sendToRouter(Message $message)
87+
{
88+
if (false == $message->getProperty(Config::PARAMETER_TOPIC_NAME)) {
89+
throw new \LogicException('Topic name parameter is required but is not set');
90+
}
91+
92+
$topic = $this->createRouterTopic();
93+
$transportMessage = $this->createTransportMessage($message);
94+
95+
$this->context->createProducer()->send($topic, $transportMessage);
96+
}
97+
98+
/**
99+
* {@inheritdoc}
100+
*/
101+
public function sendToProcessor(Message $message)
102+
{
103+
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
104+
throw new \LogicException('Processor name parameter is required but is not set');
105+
}
106+
107+
if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
108+
throw new \LogicException('Queue name parameter is required but is not set');
109+
}
110+
111+
$transportMessage = $this->createTransportMessage($message);
112+
$destination = $this->createQueue($queueName);
113+
114+
$this->context->createProducer()->send($destination, $transportMessage);
115+
}
116+
117+
/**
118+
* {@inheritdoc}
119+
*/
120+
public function createQueue($queueName)
121+
{
122+
$transportName = $this->queueMetaRegistry->getQueueMeta($queueName)->getTransportName();
123+
124+
return $this->context->createQueue($transportName);
125+
}
126+
127+
/**
128+
* {@inheritdoc}
129+
*/
130+
public function setupBroker(LoggerInterface $logger = null)
131+
{
132+
$logger = $logger ?: new NullLogger();
133+
$logger->debug('[RdKafkaDriver] setup broker');
134+
$log = function ($text, ...$args) use ($logger) {
135+
$logger->debug(sprintf('[RdKafkaDriver] '.$text, ...$args));
136+
};
137+
138+
// setup router
139+
$routerQueue = $this->createQueue($this->config->getRouterQueueName());
140+
$log('Create router queue: %s', $routerQueue->getQueueName());
141+
$this->context->createConsumer($routerQueue);
142+
143+
// setup queues
144+
foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) {
145+
$queue = $this->createQueue($meta->getClientName());
146+
$log('Create processor queue: %s', $queue->getQueueName());
147+
$this->context->createConsumer($queue);
148+
}
149+
}
150+
151+
/**
152+
* {@inheritdoc}
153+
*/
154+
public function getConfig()
155+
{
156+
return $this->config;
157+
}
158+
159+
private function createRouterTopic()
160+
{
161+
$topic = $this->context->createTopic(
162+
$this->config->createTransportRouterTopicName($this->config->getRouterTopicName())
163+
);
164+
165+
return $topic;
166+
}
167+
}
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
<?php
2+
3+
namespace Enqueue\RdKafka\Symfony;
4+
5+
use Enqueue\RdKafka\Client\RdKafkaDriver;
6+
use Enqueue\RdKafka\RdKafkaConnectionFactory;
7+
use Enqueue\RdKafka\RdKafkaContext;
8+
use Enqueue\Symfony\DriverFactoryInterface;
9+
use Enqueue\Symfony\TransportFactoryInterface;
10+
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
11+
use Symfony\Component\DependencyInjection\ContainerBuilder;
12+
use Symfony\Component\DependencyInjection\Definition;
13+
use Symfony\Component\DependencyInjection\Reference;
14+
15+
class RdKafkaTransportFactory implements TransportFactoryInterface, DriverFactoryInterface
16+
{
17+
/**
18+
* @var string
19+
*/
20+
private $name;
21+
22+
/**
23+
* @param string $name
24+
*/
25+
public function __construct($name = 'rdkafka')
26+
{
27+
$this->name = $name;
28+
}
29+
30+
/**
31+
* {@inheritdoc}
32+
*/
33+
public function addConfiguration(ArrayNodeDefinition $builder)
34+
{
35+
$builder
36+
->beforeNormalization()
37+
->ifString()
38+
->then(function ($v) {
39+
return ['dsn' => $v];
40+
})
41+
->end()
42+
->children()
43+
->scalarNode('dsn')
44+
->info('The kafka DSN. Other parameters are ignored if set')
45+
->end()
46+
->variableNode('global')
47+
->defaultValue([])
48+
->info('The kafka global configuration properties')
49+
->end()
50+
->variableNode('topic')
51+
->defaultValue([])
52+
->info('The kafka topic configuration properties')
53+
->end()
54+
->scalarNode('dr_msg_cb')
55+
->info('Delivery report callback')
56+
->end()
57+
->scalarNode('error_cb')
58+
->info('Error callback')
59+
->end()
60+
->scalarNode('rebalance_cb')
61+
->info('Called after consumer group has been rebalanced')
62+
->end()
63+
->enumNode('partitioner')
64+
->values(['RD_KAFKA_MSG_PARTITIONER_RANDOM', 'RD_KAFKA_MSG_PARTITIONER_CONSISTENT'])
65+
->info('Which partitioner to use')
66+
->end()
67+
->integerNode('log_level')
68+
->info('Logging level (syslog(3) levels)')
69+
->min(0)->max(7)
70+
->end()
71+
->booleanNode('commit_async')
72+
->defaultFalse()
73+
->info('Commit asynchronous')
74+
->end()
75+
;
76+
}
77+
78+
/**
79+
* {@inheritdoc}
80+
*/
81+
public function createConnectionFactory(ContainerBuilder $container, array $config)
82+
{
83+
if (false == empty($config['rdkafka'])) {
84+
$config['rdkafka'] = new Reference($config['rdkafka']);
85+
}
86+
87+
$factory = new Definition(RdKafkaConnectionFactory::class);
88+
$factory->setArguments([isset($config['dsn']) ? $config['dsn'] : $config]);
89+
90+
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
91+
$container->setDefinition($factoryId, $factory);
92+
93+
return $factoryId;
94+
}
95+
96+
/**
97+
* {@inheritdoc}
98+
*/
99+
public function createContext(ContainerBuilder $container, array $config)
100+
{
101+
$factoryId = sprintf('enqueue.transport.%s.connection_factory', $this->getName());
102+
103+
$context = new Definition(RdKafkaContext::class);
104+
$context->setPublic(true);
105+
$context->setFactory([new Reference($factoryId), 'createContext']);
106+
107+
$contextId = sprintf('enqueue.transport.%s.context', $this->getName());
108+
$container->setDefinition($contextId, $context);
109+
110+
return $contextId;
111+
}
112+
113+
/**
114+
* {@inheritdoc}
115+
*/
116+
public function createDriver(ContainerBuilder $container, array $config)
117+
{
118+
$driver = new Definition(RdKafkaDriver::class);
119+
$driver->setPublic(true);
120+
$driver->setArguments([
121+
new Reference(sprintf('enqueue.transport.%s.context', $this->getName())),
122+
new Reference('enqueue.client.config'),
123+
new Reference('enqueue.client.meta.queue_meta_registry'),
124+
]);
125+
126+
$driverId = sprintf('enqueue.client.%s.driver', $this->getName());
127+
$container->setDefinition($driverId, $driver);
128+
129+
return $driverId;
130+
}
131+
132+
/**
133+
* @return string
134+
*/
135+
public function getName()
136+
{
137+
return $this->name;
138+
}
139+
}

0 commit comments

Comments
 (0)