Skip to content

Commit 202c6ed

Browse files
committed
[client] Extend RabbitMqStompDriver from GenericDriver.
1 parent 1ccf7d0 commit 202c6ed

File tree

6 files changed

+363
-491
lines changed

6 files changed

+363
-491
lines changed

pkg/enqueue/Client/Driver/RabbitMqStompDriver.php

Lines changed: 56 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -4,59 +4,30 @@
44

55
use Enqueue\Client\Config;
66
use Enqueue\Client\Message;
7-
use Enqueue\Client\MessagePriority;
8-
use Enqueue\Client\Meta\QueueMetaRegistry;
7+
use Enqueue\Client\RouteCollection;
98
use Enqueue\Stomp\StompContext;
109
use Enqueue\Stomp\StompDestination;
1110
use Enqueue\Stomp\StompMessage;
11+
use Enqueue\Stomp\StompProducer;
1212
use Interop\Queue\PsrMessage;
13+
use Interop\Queue\PsrProducer;
1314
use Interop\Queue\PsrQueue;
15+
use Interop\Queue\PsrTopic;
1416
use Psr\Log\LoggerInterface;
1517
use Psr\Log\NullLogger;
1618

1719
class RabbitMqStompDriver extends StompDriver
1820
{
19-
/**
20-
* @var StompContext
21-
*/
22-
private $context;
23-
24-
/**
25-
* @var Config
26-
*/
27-
private $config;
28-
29-
/**
30-
* @var array
31-
*/
32-
private $priorityMap;
33-
3421
/**
3522
* @var StompManagementClient
3623
*/
3724
private $management;
3825

39-
/**
40-
* @var QueueMetaRegistry
41-
*/
42-
private $queueMetaRegistry;
43-
44-
public function __construct(StompContext $context, Config $config, QueueMetaRegistry $queueMetaRegistry, StompManagementClient $management)
26+
public function __construct(StompContext $context, Config $config, RouteCollection $routeCollection, StompManagementClient $management)
4527
{
46-
parent::__construct($context, $config, $queueMetaRegistry);
28+
parent::__construct($context, $config, $routeCollection);
4729

48-
$this->context = $context;
49-
$this->config = $config;
50-
$this->queueMetaRegistry = $queueMetaRegistry;
5130
$this->management = $management;
52-
53-
$this->priorityMap = [
54-
MessagePriority::VERY_LOW => 0,
55-
MessagePriority::LOW => 1,
56-
MessagePriority::NORMAL => 2,
57-
MessagePriority::HIGH => 3,
58-
MessagePriority::VERY_HIGH => 4,
59-
];
6031
}
6132

6233
/**
@@ -71,15 +42,17 @@ public function createTransportMessage(Message $message): PsrMessage
7142
}
7243

7344
if ($priority = $message->getPriority()) {
74-
if (false == array_key_exists($priority, $this->priorityMap)) {
45+
$priorityMap = $this->getPriorityMap();
46+
47+
if (false == array_key_exists($priority, $priorityMap)) {
7548
throw new \LogicException(sprintf('Cant convert client priority to transport: "%s"', $priority));
7649
}
7750

78-
$transportMessage->setHeader('priority', $this->priorityMap[$priority]);
51+
$transportMessage->setHeader('priority', $priorityMap[$priority]);
7952
}
8053

8154
if ($message->getDelay()) {
82-
if (false == $this->config->getTransportOption('delay_plugin_installed', false)) {
55+
if (false == $this->getConfig()->getTransportOption('delay_plugin_installed', false)) {
8356
throw new \LogicException('The message delaying is not supported. In order to use delay feature install RabbitMQ delay plugin.');
8457
}
8558

@@ -89,68 +62,6 @@ public function createTransportMessage(Message $message): PsrMessage
8962
return $transportMessage;
9063
}
9164

92-
/**
93-
* @param StompMessage $message
94-
*/
95-
public function createClientMessage(PsrMessage $message): Message
96-
{
97-
$clientMessage = parent::createClientMessage($message);
98-
99-
$headers = $clientMessage->getHeaders();
100-
unset(
101-
$headers['x-delay'],
102-
$headers['expiration'],
103-
$headers['priority']
104-
);
105-
$clientMessage->setHeaders($headers);
106-
107-
if ($delay = $message->getHeader('x-delay')) {
108-
if (false == is_numeric($delay)) {
109-
throw new \LogicException(sprintf('x-delay header is not numeric. "%s"', $delay));
110-
}
111-
112-
$clientMessage->setDelay((int) ((int) $delay) / 1000);
113-
}
114-
115-
if ($expiration = $message->getHeader('expiration')) {
116-
if (false == is_numeric($expiration)) {
117-
throw new \LogicException(sprintf('expiration header is not numeric. "%s"', $expiration));
118-
}
119-
120-
$clientMessage->setExpire((int) ((int) $expiration) / 1000);
121-
}
122-
123-
if ($priority = $message->getHeader('priority')) {
124-
if (false === $clientPriority = array_search($priority, $this->priorityMap, true)) {
125-
throw new \LogicException(sprintf('Cant convert transport priority to client: "%s"', $priority));
126-
}
127-
128-
$clientMessage->setPriority($clientPriority);
129-
}
130-
131-
return $clientMessage;
132-
}
133-
134-
public function sendToProcessor(Message $message): void
135-
{
136-
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
137-
throw new \LogicException('Processor name parameter is required but is not set');
138-
}
139-
140-
if (false == $queueName = $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
141-
throw new \LogicException('Queue name parameter is required but is not set');
142-
}
143-
144-
$transportMessage = $this->createTransportMessage($message);
145-
$destination = $this->createQueue($queueName);
146-
147-
if ($message->getDelay()) {
148-
$destination = $this->createDelayedTopic($destination);
149-
}
150-
151-
$this->context->createProducer()->send($destination, $transportMessage);
152-
}
153-
15465
/**
15566
* @return StompDestination
15667
*/
@@ -169,22 +80,22 @@ public function setupBroker(LoggerInterface $logger = null): void
16980
$logger->debug(sprintf('[RabbitMqStompDriver] '.$text, ...$args));
17081
};
17182

172-
if (false == $this->config->getTransportOption('management_plugin_installed', false)) {
83+
if (false == $this->getConfig()->getTransportOption('management_plugin_installed', false)) {
17384
$log('Could not setup broker. The option `management_plugin_installed` is not enabled. Please enable that option and install rabbit management plugin');
17485

17586
return;
17687
}
17788

17889
// setup router
179-
$routerExchange = $this->config->createTransportRouterTopicName($this->config->getRouterTopicName());
90+
$routerExchange = $this->getConfig()->createTransportRouterTopicName($this->getConfig()->getRouterTopicName());
18091
$log('Declare router exchange: %s', $routerExchange);
18192
$this->management->declareExchange($routerExchange, [
18293
'type' => 'fanout',
18394
'durable' => true,
18495
'auto_delete' => false,
18596
]);
18697

187-
$routerQueue = $this->config->createTransportQueueName($this->config->getRouterQueueName());
98+
$routerQueue = $this->getConfig()->createTransportQueueName($this->getConfig()->getRouterQueueName());
18899
$log('Declare router queue: %s', $routerQueue);
189100
$this->management->declareQueue($routerQueue, [
190101
'auto_delete' => false,
@@ -198,11 +109,11 @@ public function setupBroker(LoggerInterface $logger = null): void
198109
$this->management->bind($routerExchange, $routerQueue, $routerQueue);
199110

200111
// setup queues
201-
foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) {
202-
$queue = $this->config->createTransportQueueName($meta->getClientName());
112+
foreach ($this->getRouteCollection()->all() as $route) {
113+
$queue = $this->createRouteQueue($route);
203114

204-
$log('Declare processor queue: %s', $queue);
205-
$this->management->declareQueue($queue, [
115+
$log('Declare processor queue: %s', $queue->getStompName());
116+
$this->management->declareQueue($queue->getStompName(), [
206117
'auto_delete' => false,
207118
'durable' => true,
208119
'arguments' => [
@@ -212,10 +123,10 @@ public function setupBroker(LoggerInterface $logger = null): void
212123
}
213124

214125
// setup delay exchanges
215-
if ($this->config->getTransportOption('delay_plugin_installed', false)) {
216-
foreach ($this->queueMetaRegistry->getQueuesMeta() as $meta) {
217-
$queue = $this->config->createTransportQueueName($meta->getClientName());
218-
$delayExchange = $queue.'.delayed';
126+
if ($this->getConfig()->getTransportOption('delay_plugin_installed', false)) {
127+
foreach ($this->getRouteCollection()->all() as $route) {
128+
$queue = $this->createRouteQueue($route);
129+
$delayExchange = $queue->getStompName().'.delayed';
219130

220131
$log('Declare delay exchange: %s', $delayExchange);
221132
$this->management->declareExchange($delayExchange, [
@@ -227,18 +138,49 @@ public function setupBroker(LoggerInterface $logger = null): void
227138
],
228139
]);
229140

230-
$log('Bind processor queue to delay exchange: %s -> %s', $queue, $delayExchange);
231-
$this->management->bind($delayExchange, $queue, $queue);
141+
$log('Bind processor queue to delay exchange: %s -> %s', $queue->getStompName(), $delayExchange);
142+
$this->management->bind($delayExchange, $queue->getStompName(), $queue->getStompName());
232143
}
233144
} else {
234145
$log('Delay exchange and bindings are not setup. if you\'d like to use delays please install delay rabbitmq plugin and set delay_plugin_installed option to true');
235146
}
236147
}
237148

149+
/**
150+
* @param StompProducer $producer
151+
* @param StompDestination $topic
152+
* @param StompMessage $transportMessage
153+
*/
154+
protected function doSendToRouter(PsrProducer $producer, PsrTopic $topic, PsrMessage $transportMessage): void
155+
{
156+
// We should not handle priority, expiration, and delay at this stage.
157+
// The router will take care of it while re-sending the message to the final destinations.
158+
$transportMessage->setHeader('expiration', null);
159+
$transportMessage->setHeader('priority', null);
160+
$transportMessage->setHeader('x-delay', null);
161+
162+
$producer->send($topic, $transportMessage);
163+
}
164+
165+
/**
166+
* @param StompProducer $producer
167+
* @param StompDestination $destination
168+
* @param StompMessage $transportMessage
169+
*/
170+
protected function doSendToProcessor(PsrProducer $producer, PsrQueue $destination, PsrMessage $transportMessage): void
171+
{
172+
if ($delay = $transportMessage->getProperty('X-Enqueue-Delay')) {
173+
$producer->setDeliveryDelay(null);
174+
$destination = $this->createDelayedTopic($destination);
175+
}
176+
177+
$producer->send($destination, $transportMessage);
178+
}
179+
238180
private function createDelayedTopic(StompDestination $queue): StompDestination
239181
{
240182
// in order to use delay feature make sure the rabbitmq_delayed_message_exchange plugin is installed.
241-
$destination = $this->context->createTopic($queue->getStompName().'.delayed');
183+
$destination = $this->getContext()->createTopic($queue->getStompName().'.delayed');
242184
$destination->setType(StompDestination::TYPE_EXCHANGE);
243185
$destination->setDurable(true);
244186
$destination->setAutoDelete(false);

pkg/enqueue/Tests/Client/Driver/DbalDriverTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,15 @@ protected function createProducerMock(): PsrProducer
7979
*/
8080
protected function createQueue(string $name): PsrQueue
8181
{
82-
return new DbalDestination(new \SplFileInfo($name));
82+
return new DbalDestination($name);
8383
}
8484

8585
/**
8686
* @return DbalDestination
8787
*/
8888
protected function createTopic(string $name): PsrTopic
8989
{
90-
return new DbalDestination(new \SplFileInfo($name));
90+
return new DbalDestination($name);
9191
}
9292

9393
/**

pkg/enqueue/Tests/Client/Driver/MongodbDriverTest.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,15 @@ protected function createProducerMock(): PsrProducer
8383
*/
8484
protected function createQueue(string $name): PsrQueue
8585
{
86-
return new MongodbDestination(new \SplFileInfo($name));
86+
return new MongodbDestination($name);
8787
}
8888

8989
/**
9090
* @return MongodbDestination
9191
*/
9292
protected function createTopic(string $name): PsrTopic
9393
{
94-
return new MongodbDestination(new \SplFileInfo($name));
94+
return new MongodbDestination($name);
9595
}
9696

9797
/**

0 commit comments

Comments
 (0)