Skip to content

Commit f35d65d

Browse files
committed
[amqp-lib] throw only queue interop exception on Producer::send();
1 parent 9b6fbd4 commit f35d65d

File tree

1 file changed

+47
-34
lines changed

1 file changed

+47
-34
lines changed

pkg/amqp-lib/AmqpProducer.php

Lines changed: 47 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44

55
use Enqueue\AmqpTools\DelayStrategyAware;
66
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
7+
use Interop\Amqp\AmqpDestination as InteropAmqpDestination;
78
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
89
use Interop\Amqp\AmqpProducer as InteropAmqpProducer;
910
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
1011
use Interop\Amqp\AmqpTopic as InteropAmqpTopic;
1112
use Interop\Queue\DeliveryDelayNotSupportedException;
13+
use Interop\Queue\Exception;
1214
use Interop\Queue\InvalidDestinationException;
1315
use Interop\Queue\InvalidMessageException;
1416
use Interop\Queue\PsrDestination;
@@ -58,6 +60,8 @@ public function __construct(AMQPChannel $channel, AmqpContext $context)
5860
}
5961

6062
/**
63+
* {@inheritdoc}
64+
*
6165
* @param InteropAmqpTopic|InteropAmqpQueue $destination
6266
* @param InteropAmqpMessage $message
6367
*/
@@ -70,40 +74,10 @@ public function send(PsrDestination $destination, PsrMessage $message)
7074

7175
InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);
7276

73-
if (null !== $this->priority && null === $message->getPriority()) {
74-
$message->setPriority($this->priority);
75-
}
76-
77-
if (null !== $this->timeToLive && null === $message->getExpiration()) {
78-
$message->setExpiration($this->timeToLive);
79-
}
80-
81-
$amqpProperties = $message->getHeaders();
82-
83-
if ($appProperties = $message->getProperties()) {
84-
$amqpProperties['application_headers'] = new AMQPTable($appProperties);
85-
}
86-
87-
$amqpMessage = new LibAMQPMessage($message->getBody(), $amqpProperties);
88-
89-
if ($this->deliveryDelay) {
90-
$this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay);
91-
} elseif ($destination instanceof InteropAmqpTopic) {
92-
$this->channel->basic_publish(
93-
$amqpMessage,
94-
$destination->getTopicName(),
95-
$message->getRoutingKey(),
96-
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_MANDATORY),
97-
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_IMMEDIATE)
98-
);
99-
} else {
100-
$this->channel->basic_publish(
101-
$amqpMessage,
102-
'',
103-
$destination->getQueueName(),
104-
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_MANDATORY),
105-
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_IMMEDIATE)
106-
);
77+
try {
78+
$this->doSend($destination, $message);
79+
} catch (\Exception $e) {
80+
throw new Exception($e->getMessage(), $e->getCode(), $e);
10781
}
10882
}
10983

@@ -162,4 +136,43 @@ public function getTimeToLive()
162136
{
163137
return $this->timeToLive;
164138
}
139+
140+
private function doSend(InteropAmqpDestination $destination, InteropAmqpMessage $message)
141+
{
142+
if (null !== $this->priority && null === $message->getPriority()) {
143+
$message->setPriority($this->priority);
144+
}
145+
146+
if (null !== $this->timeToLive && null === $message->getExpiration()) {
147+
$message->setExpiration($this->timeToLive);
148+
}
149+
150+
$amqpProperties = $message->getHeaders();
151+
152+
if ($appProperties = $message->getProperties()) {
153+
$amqpProperties['application_headers'] = new AMQPTable($appProperties);
154+
}
155+
156+
$amqpMessage = new LibAMQPMessage($message->getBody(), $amqpProperties);
157+
158+
if ($this->deliveryDelay) {
159+
$this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay);
160+
} elseif ($destination instanceof InteropAmqpTopic) {
161+
$this->channel->basic_publish(
162+
$amqpMessage,
163+
$destination->getTopicName(),
164+
$message->getRoutingKey(),
165+
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_MANDATORY),
166+
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_IMMEDIATE)
167+
);
168+
} else {
169+
$this->channel->basic_publish(
170+
$amqpMessage,
171+
'',
172+
$destination->getQueueName(),
173+
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_MANDATORY),
174+
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_IMMEDIATE)
175+
);
176+
}
177+
}
165178
}

0 commit comments

Comments
 (0)