Skip to content

Commit 9b6fbd4

Browse files
committed
[amqp-bunny] throw only queue interop exception on Producer::send();
1 parent b2287c3 commit 9b6fbd4

File tree

1 file changed

+53
-39
lines changed

1 file changed

+53
-39
lines changed

pkg/amqp-bunny/AmqpProducer.php

Lines changed: 53 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,13 @@
55
use Bunny\Channel;
66
use Enqueue\AmqpTools\DelayStrategyAware;
77
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
8+
use Interop\Amqp\AmqpDestination as InteropAmqpDestination;
89
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
910
use Interop\Amqp\AmqpProducer as InteropAmqpProducer;
1011
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
1112
use Interop\Amqp\AmqpTopic as InteropAmqpTopic;
1213
use Interop\Queue\DeliveryDelayNotSupportedException;
14+
use Interop\Queue\Exception;
1315
use Interop\Queue\InvalidDestinationException;
1416
use Interop\Queue\InvalidMessageException;
1517
use Interop\Queue\PsrDestination;
@@ -56,55 +58,24 @@ public function __construct(Channel $channel, AmqpContext $context)
5658
}
5759

5860
/**
61+
* {@inheritdoc}
62+
*
5963
* @param InteropAmqpTopic|InteropAmqpQueue $destination
6064
* @param InteropAmqpMessage $message
6165
*/
6266
public function send(PsrDestination $destination, PsrMessage $message)
6367
{
6468
$destination instanceof PsrTopic
6569
? InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpTopic::class)
66-
: InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpQueue::class);
70+
: InvalidDestinationException::assertDestinationInstanceOf($destination, InteropAmqpQueue::class)
71+
;
6772

6873
InvalidMessageException::assertMessageInstanceOf($message, InteropAmqpMessage::class);
6974

70-
if (null !== $this->priority && null === $message->getPriority()) {
71-
$message->setPriority($this->priority);
72-
}
73-
74-
if (null !== $this->timeToLive && null === $message->getExpiration()) {
75-
$message->setExpiration($this->timeToLive);
76-
}
77-
78-
$amqpProperties = $message->getHeaders();
79-
80-
if (array_key_exists('timestamp', $amqpProperties) && null !== $amqpProperties['timestamp']) {
81-
$amqpProperties['timestamp'] = \DateTime::createFromFormat('U', $amqpProperties['timestamp']);
82-
}
83-
84-
if ($appProperties = $message->getProperties()) {
85-
$amqpProperties['application_headers'] = $appProperties;
86-
}
87-
88-
if ($this->deliveryDelay) {
89-
$this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay);
90-
} elseif ($destination instanceof InteropAmqpTopic) {
91-
$this->channel->publish(
92-
$message->getBody(),
93-
$amqpProperties,
94-
$destination->getTopicName(),
95-
$message->getRoutingKey(),
96-
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_MANDATORY),
97-
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_IMMEDIATE)
98-
);
99-
} else {
100-
$this->channel->publish(
101-
$message->getBody(),
102-
$amqpProperties,
103-
'',
104-
$destination->getQueueName(),
105-
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_MANDATORY),
106-
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_IMMEDIATE)
107-
);
75+
try {
76+
$this->doSend($destination, $message);
77+
} catch (\Exception $e) {
78+
throw new Exception($e->getMessage(), $e->getCode(), $e);
10879
}
10980
}
11081

@@ -163,4 +134,47 @@ public function getTimeToLive()
163134
{
164135
return $this->timeToLive;
165136
}
137+
138+
private function doSend(InteropAmqpDestination $destination, InteropAmqpMessage $message)
139+
{
140+
if (null !== $this->priority && null === $message->getPriority()) {
141+
$message->setPriority($this->priority);
142+
}
143+
144+
if (null !== $this->timeToLive && null === $message->getExpiration()) {
145+
$message->setExpiration($this->timeToLive);
146+
}
147+
148+
$amqpProperties = $message->getHeaders();
149+
150+
if (array_key_exists('timestamp', $amqpProperties) && null !== $amqpProperties['timestamp']) {
151+
$amqpProperties['timestamp'] = \DateTime::createFromFormat('U', $amqpProperties['timestamp']);
152+
}
153+
154+
if ($appProperties = $message->getProperties()) {
155+
$amqpProperties['application_headers'] = $appProperties;
156+
}
157+
158+
if ($this->deliveryDelay) {
159+
$this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay);
160+
} elseif ($destination instanceof InteropAmqpTopic) {
161+
$this->channel->publish(
162+
$message->getBody(),
163+
$amqpProperties,
164+
$destination->getTopicName(),
165+
$message->getRoutingKey(),
166+
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_MANDATORY),
167+
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_IMMEDIATE)
168+
);
169+
} else {
170+
$this->channel->publish(
171+
$message->getBody(),
172+
$amqpProperties,
173+
'',
174+
$destination->getQueueName(),
175+
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_MANDATORY),
176+
(bool) ($message->getFlags() & InteropAmqpMessage::FLAG_IMMEDIATE)
177+
);
178+
}
179+
}
166180
}

0 commit comments

Comments
 (0)