Skip to content

Commit b9464ec

Browse files
committed
Add the broker setup for the rdkafka driver
1 parent 574f96b commit b9464ec

File tree

1 file changed

+15
-0
lines changed

1 file changed

+15
-0
lines changed

pkg/rdkafka/Client/RdKafkaDriver.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,21 @@ public function setupBroker(LoggerInterface $logger = null)
131131
{
132132
$logger = $logger ?: new NullLogger();
133133
$logger->debug('[RdKafkaDriver] setup broker');
134+
$log = function ($text, ...$args) use ($logger) {
135+
$logger->debug(sprintf('[RdKafkaDriver] '.$text, ...$args));
136+
};
137+
138+
// setup router
139+
$routerQueue = $this->createQueue($this->config->getRouterQueueName());
140+
$log('Create router queue: %s', $routerQueue->getQueueName());
141+
$this->context->createConsumer($routerQueue);
142+
143+
// setup queues
144+
foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) {
145+
$queue = $this->createQueue($meta->getClientName());
146+
$log('Create processor queue: %s', $queue->getQueueName());
147+
$this->context->createConsumer($queue);
148+
}
134149
}
135150

136151
/**

0 commit comments

Comments
 (0)