Skip to content

Commit eb6acb3

Browse files
committed
upd drivers
1 parent 5895d77 commit eb6acb3

20 files changed

+192
-154
lines changed

pkg/enqueue/Client/ConsumptionExtension/SetRouterPropertiesExtension.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,5 @@ public function onPreReceived(Context $context)
4343

4444
// RouterProcessor is our default message processor when that header is not set
4545
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $config->getRouterProcessorName());
46-
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $config->getRouterQueueName());
4746
}
4847
}

pkg/enqueue/Client/Driver/AmqpDriver.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,10 +96,10 @@ public function setupBroker(LoggerInterface $logger = null): void
9696
/**
9797
* @return AmqpQueue
9898
*/
99-
public function createQueue(string $clientQueuName): PsrQueue
99+
protected function doCreateQueue(string $transportQueueName): PsrQueue
100100
{
101101
/** @var AmqpQueue $queue */
102-
$queue = parent::createQueue($clientQueuName);
102+
$queue = parent::doCreateQueue($transportQueueName);
103103
$queue->addFlag(AmqpQueue::FLAG_DURABLE);
104104

105105
return $queue;

pkg/enqueue/Client/Driver/GenericDriver.php

Lines changed: 29 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,48 +62,48 @@ public function sendToRouter(Message $message): void
6262

6363
public function sendToProcessor(Message $message): void
6464
{
65-
$processor = $message->getProperty(Config::PARAMETER_PROCESSOR_NAME);
66-
if (false == $processor) {
67-
throw new \LogicException('Processor name parameter is required but is not set');
68-
}
69-
7065
$topic = $message->getProperty(Config::PARAMETER_TOPIC_NAME);
7166
$command = $message->getProperty(Config::PARAMETER_COMMAND_NAME);
7267

73-
/** @var Route $route */
74-
$route = null;
75-
if ($topic) {
68+
/** @var PsrQueue $queue */
69+
$queue = null;
70+
if ($topic && $processor = $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
7671
$route = $this->routeCollection->topicAndProcessor($topic, $processor);
7772
if (false == $route) {
7873
throw new \LogicException(sprintf('There is no route for topic "%s" and processor "%s"', $topic, $processor));
7974
}
75+
76+
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->config->getRouterProcessorName());
77+
$queue = $this->createRouteQueue($route);
78+
} elseif ($topic && false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
79+
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->config->getRouterProcessorName());
80+
81+
$queue = $this->createQueue($this->config->getRouterQueueName());
8082
} elseif ($command) {
8183
$route = $this->routeCollection->command($command);
8284
if (false == $route) {
83-
throw new \LogicException(sprintf('There is no route for command "%s" and processor "%s"', $command, $processor));
85+
throw new \LogicException(sprintf('There is no route for command "%s".', $command));
8486
}
8587

86-
if ($processor !== $route->getProcessor()) {
87-
throw new \LogicException(sprintf('The command "%s" route was found but processors do not match. Given "%s", route "%s"', $command, $processor, $route->getProcessor()));
88-
}
88+
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $route->getProcessor());
89+
$queue = $this->createRouteQueue($route);
8990
} else {
9091
throw new \LogicException('Either topic or command parameter must be set.');
9192
}
9293

9394
$transportMessage = $this->createTransportMessage($message);
94-
$queue = $this->createRouteQueue($route);
9595

9696
$producer = $this->context->createProducer();
9797

98-
if ($delay = $transportMessage->getProperty('X-Enqueue-Delay')) {
98+
if (null !== $delay = $transportMessage->getProperty('X-Enqueue-Delay')) {
9999
$producer->setDeliveryDelay($delay * 1000);
100100
}
101101

102-
if ($expire = $transportMessage->getProperty('X-Enqueue-Expire')) {
102+
if (null !== $expire = $transportMessage->getProperty('X-Enqueue-Expire')) {
103103
$producer->setTimeToLive($expire * 1000);
104104
}
105105

106-
if ($priority = $transportMessage->getProperty('X-Enqueue-Priority')) {
106+
if (null !== $priority = $transportMessage->getProperty('X-Enqueue-Priority')) {
107107
$priorityMap = $this->getPriorityMap();
108108

109109
$producer->setPriority($priorityMap[$priority]);
@@ -120,7 +120,7 @@ public function createQueue(string $clientQueueName): PsrQueue
120120
{
121121
$transportName = $this->createTransportQueueName($clientQueueName, true);
122122

123-
return $this->context->createQueue($transportName);
123+
return $this->doCreateQueue($transportName);
124124
}
125125

126126
public function createTransportMessage(Message $clientMessage): PsrMessage
@@ -163,10 +163,8 @@ public function createClientMessage(PsrMessage $transportMessage): Message
163163
$clientMessage->setBody($transportMessage->getBody());
164164
$clientMessage->setHeaders($transportMessage->getHeaders());
165165
$clientMessage->setProperties($transportMessage->getProperties());
166-
167166
$clientMessage->setMessageId($transportMessage->getMessageId());
168167
$clientMessage->setTimestamp($transportMessage->getTimestamp());
169-
$clientMessage->setPriority(MessagePriority::NORMAL);
170168
$clientMessage->setReplyTo($transportMessage->getReplyTo());
171169
$clientMessage->setCorrelationId($transportMessage->getCorrelationId());
172170

@@ -216,7 +214,7 @@ protected function doSendToProcessor(PsrProducer $producer, PsrQueue $queue, Psr
216214

217215
protected function createRouterTopic(): PsrTopic
218216
{
219-
return $this->context->createTopic(
217+
return $this->doCreateTopic(
220218
$this->createTransportRouterTopicName($this->config->getRouterTopicName(), true)
221219
);
222220
}
@@ -228,7 +226,7 @@ protected function createRouteQueue(Route $route): PsrQueue
228226
$route->isPrefixQueue()
229227
);
230228

231-
return $this->context->createQueue($transportName);
229+
return $this->doCreateQueue($transportName);
232230
}
233231

234232
protected function createTransportRouterTopicName(string $name, bool $prefix): string
@@ -246,6 +244,16 @@ protected function createTransportQueueName(string $name, bool $prefix): string
246244
return strtolower(implode($this->config->getSeparator(), array_filter([$clientPrefix, $clientAppName, $name])));
247245
}
248246

247+
protected function doCreateQueue(string $transportQueueName): PsrQueue
248+
{
249+
return $this->context->createQueue($transportQueueName);
250+
}
251+
252+
protected function doCreateTopic(string $transportTopicName): PsrTopic
253+
{
254+
return $this->context->createTopic($transportTopicName);
255+
}
256+
249257
/**
250258
* [client message priority => transport message priority].
251259
*

pkg/enqueue/Client/Driver/RabbitMqDriver.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ class RabbitMqDriver extends AmqpDriver
1010
/**
1111
* @return AmqpQueue
1212
*/
13-
public function createQueue(string $queueName): PsrQueue
13+
protected function doCreateQueue(string $transportQueueName): PsrQueue
1414
{
15-
$queue = parent::createQueue($queueName);
15+
$queue = parent::doCreateQueue($transportQueueName);
1616
$queue->setArguments(['x-max-priority' => 4]);
1717

1818
return $queue;

pkg/enqueue/Client/Driver/RabbitMqStompDriver.php

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,6 @@ public function createTransportMessage(Message $message): PsrMessage
6262
return $transportMessage;
6363
}
6464

65-
/**
66-
* @return StompDestination
67-
*/
68-
public function createQueue(string $queueName): PsrQueue
69-
{
70-
$queue = parent::createQueue($queueName);
71-
$queue->setHeader('x-max-priority', 4);
72-
73-
return $queue;
74-
}
75-
7665
public function setupBroker(LoggerInterface $logger = null): void
7766
{
7867
$logger = $logger ?: new NullLogger();
@@ -146,6 +135,17 @@ public function setupBroker(LoggerInterface $logger = null): void
146135
}
147136
}
148137

138+
/**
139+
* @return StompDestination
140+
*/
141+
protected function doCreateQueue(string $transportQueueName): PsrQueue
142+
{
143+
$queue = parent::doCreateQueue($transportQueueName);
144+
$queue->setHeader('x-max-priority', 4);
145+
146+
return $queue;
147+
}
148+
149149
/**
150150
* @param StompProducer $producer
151151
* @param StompDestination $topic

pkg/enqueue/Client/Driver/StompDriver.php

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,10 @@ public function createTransportMessage(Message $message): PsrMessage
4343
/**
4444
* @return StompDestination
4545
*/
46-
public function createQueue(string $queueName): PsrQueue
46+
protected function doCreateQueue(string $transportQueueName): PsrQueue
4747
{
4848
/** @var StompDestination $queue */
49-
$queue = parent::createQueue($queueName);
49+
$queue = parent::doCreateQueue($transportQueueName);
5050
$queue->setDurable(true);
5151
$queue->setAutoDelete(false);
5252
$queue->setExclusive(false);

pkg/enqueue/Client/DriverPreSend.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@ public function getDriver(): DriverInterface
3434

3535
public function isEvent(): bool
3636
{
37-
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
37+
return (bool) $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
38+
}
39+
40+
public function isCommand(): bool
41+
{
42+
return (bool) $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
3843
}
3944

4045
public function getCommand(): string

pkg/enqueue/Client/Message.php

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -205,18 +205,12 @@ public function setDelay($delay)
205205
$this->delay = $delay;
206206
}
207207

208-
/**
209-
* @param string $scope
210-
*/
211-
public function setScope($scope)
208+
public function setScope(string $scope): void
212209
{
213210
$this->scope = $scope;
214211
}
215212

216-
/**
217-
* @return string
218-
*/
219-
public function getScope()
213+
public function getScope(): string
220214
{
221215
return $this->scope;
222216
}

pkg/enqueue/Client/PostSend.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,12 @@ public function getDriver(): DriverInterface
3434

3535
public function isEvent(): bool
3636
{
37-
return Config::COMMAND_TOPIC !== $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
37+
return (bool) $this->message->getProperty(Config::PARAMETER_TOPIC_NAME);
38+
}
39+
40+
public function isCommand(): bool
41+
{
42+
return (bool) $this->message->getProperty(Config::PARAMETER_COMMAND_NAME);
3843
}
3944

4045
public function getCommand(): string

pkg/enqueue/Client/Producer.php

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -47,10 +47,8 @@ public function sendEvent(string $topic, $message): void
4747
$preSend = new PreSend($topic, $message, $this, $this->driver);
4848
$this->extension->onPreSendEvent($preSend);
4949

50-
$topic = $preSend->getTopic();
5150
$message = $preSend->getMessage();
52-
53-
$message->setProperty(Config::PARAMETER_TOPIC_NAME, $topic);
51+
$message->setProperty(Config::PARAMETER_TOPIC_NAME, $preSend->getTopic());
5452

5553
$this->doSend($message);
5654
}
@@ -81,7 +79,6 @@ public function sendCommand(string $command, $message, bool $needReply = false):
8179
}
8280
}
8381

84-
$message->setProperty(Config::PARAMETER_TOPIC_NAME, Config::COMMAND_TOPIC);
8582
$message->setProperty(Config::PARAMETER_COMMAND_NAME, $command);
8683
$message->setScope(Message::SCOPE_APP);
8784

@@ -106,6 +103,10 @@ private function doSend(Message $message): void
106103
));
107104
}
108105

106+
if ($message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
107+
throw new \LogicException(sprintf('The %s property must not be set.', Config::PARAMETER_PROCESSOR_NAME));
108+
}
109+
109110
if (!$message->getMessageId()) {
110111
$message->setMessageId(UUID::generate());
111112
}
@@ -118,25 +119,11 @@ private function doSend(Message $message): void
118119
$message->setPriority(MessagePriority::NORMAL);
119120
}
120121

121-
if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) {
122-
if ($message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
123-
throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_QUEUE_NAME));
124-
}
125-
if ($message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
126-
throw new \LogicException(sprintf('The %s property must not be set for messages that are sent to message bus.', Config::PARAMETER_PROCESSOR_NAME));
127-
}
122+
$this->extension->onDriverPreSend(new DriverPreSend($message, $this, $this->driver));
128123

129-
$this->extension->onDriverPreSend(new DriverPreSend($message, $this, $this->driver));
124+
if (Message::SCOPE_MESSAGE_BUS == $message->getScope()) {
130125
$this->driver->sendToRouter($message);
131126
} elseif (Message::SCOPE_APP == $message->getScope()) {
132-
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_NAME)) {
133-
$message->setProperty(Config::PARAMETER_PROCESSOR_NAME, $this->driver->getConfig()->getRouterProcessorName());
134-
}
135-
if (false == $message->getProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME)) {
136-
$message->setProperty(Config::PARAMETER_PROCESSOR_QUEUE_NAME, $this->driver->getConfig()->getRouterQueueName());
137-
}
138-
139-
$this->extension->onDriverPreSend(new DriverPreSend($message, $this, $this->driver));
140127
$this->driver->sendToProcessor($message);
141128
} else {
142129
throw new \LogicException(sprintf('The message scope "%s" is not supported.', $message->getScope()));

pkg/enqueue/Client/Resources.php

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,18 @@ public static function getKnownDrivers(): array
135135
'requiredSchemeExtensions' => [],
136136
'package' => ['enqueue/enqueue', 'enqueue/dbal'],
137137
];
138+
$map[] = [
139+
'schemes' => ['gearman'],
140+
'factoryClass' => GenericDriver::class,
141+
'requiredSchemeExtensions' => [],
142+
'package' => ['enqueue/enqueue', 'enqueue/gearman'],
143+
];
144+
$map[] = [
145+
'schemes' => ['beanstalk'],
146+
'factoryClass' => GenericDriver::class,
147+
'requiredSchemeExtensions' => [],
148+
'package' => ['enqueue/enqueue', 'enqueue/pheanstalk'],
149+
];
138150

139151
self::$knownDrivers = $map;
140152
}

pkg/enqueue/Client/TraceableProducer.php

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public function sendCommand(string $command, $message, bool $needReply = false):
3232
{
3333
$result = $this->producer->sendCommand($command, $message, $needReply);
3434

35-
$this->collectTrace(Config::COMMAND_TOPIC, $command, $message);
35+
$this->collectTrace(null, $command, $message);
3636

3737
return $result;
3838
}
@@ -86,6 +86,7 @@ private function collectTrace(string $topic = null, string $command = null, $mes
8686
'contentType' => null,
8787
'messageId' => null,
8888
];
89+
8990
if ($message instanceof Message) {
9091
$trace['body'] = $message->getBody();
9192
$trace['headers'] = $message->getHeaders();

pkg/enqueue/Tests/Client/ConsumptionExtension/SetRouterPropertiesExtensionTest.php

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ public function testShouldSetRouterProcessorPropertyIfNotSetAndOnRouterQueue()
5656

5757
$this->assertEquals([
5858
'enqueue.processor_name' => 'router-processor-name',
59-
'enqueue.processor_queue_name' => 'router-queue',
6059
], $message->getProperties());
6160
}
6261

0 commit comments

Comments
 (0)