Skip to content

Commit eca49cf

Browse files
committed
[amqp-ext] AmqpProducer::send must throw only interop exception.
1 parent 06771a8 commit eca49cf

File tree

1 file changed

+53
-42
lines changed

1 file changed

+53
-42
lines changed

pkg/amqp-ext/AmqpProducer.php

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44

55
use Enqueue\AmqpTools\DelayStrategyAware;
66
use Enqueue\AmqpTools\DelayStrategyAwareTrait;
7+
use Interop\Amqp\AmqpDestination;
78
use Interop\Amqp\AmqpMessage;
89
use Interop\Amqp\AmqpProducer as InteropAmqpProducer;
910
use Interop\Amqp\AmqpQueue;
1011
use Interop\Amqp\AmqpTopic;
1112
use Interop\Queue\DeliveryDelayNotSupportedException;
13+
use Interop\Queue\Exception;
1214
use Interop\Queue\InvalidDestinationException;
1315
use Interop\Queue\InvalidMessageException;
1416
use Interop\Queue\PsrDestination;
@@ -64,51 +66,14 @@ public function send(PsrDestination $destination, PsrMessage $message)
6466
{
6567
$destination instanceof PsrTopic
6668
? InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpTopic::class)
67-
: InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class)
68-
;
69+
: InvalidDestinationException::assertDestinationInstanceOf($destination, AmqpQueue::class);
6970

7071
InvalidMessageException::assertMessageInstanceOf($message, AmqpMessage::class);
7172

72-
if (null !== $this->priority && null === $message->getPriority()) {
73-
$message->setPriority($this->priority);
74-
}
75-
76-
if (null !== $this->timeToLive && null === $message->getExpiration()) {
77-
$message->setExpiration($this->timeToLive);
78-
}
79-
80-
$amqpAttributes = $message->getHeaders();
81-
82-
if ($message->getProperties()) {
83-
$amqpAttributes['headers'] = $message->getProperties();
84-
}
85-
86-
if ($this->deliveryDelay) {
87-
$this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay);
88-
} elseif ($destination instanceof AmqpTopic) {
89-
$amqpExchange = new \AMQPExchange($this->amqpChannel);
90-
$amqpExchange->setType($destination->getType());
91-
$amqpExchange->setName($destination->getTopicName());
92-
$amqpExchange->setFlags(Flags::convertTopicFlags($destination->getFlags()));
93-
$amqpExchange->setArguments($destination->getArguments());
94-
95-
$amqpExchange->publish(
96-
$message->getBody(),
97-
$message->getRoutingKey(),
98-
Flags::convertMessageFlags($message->getFlags()),
99-
$amqpAttributes
100-
);
101-
} else {
102-
$amqpExchange = new \AMQPExchange($this->amqpChannel);
103-
$amqpExchange->setType(AMQP_EX_TYPE_DIRECT);
104-
$amqpExchange->setName('');
105-
106-
$amqpExchange->publish(
107-
$message->getBody(),
108-
$destination->getQueueName(),
109-
Flags::convertMessageFlags($message->getFlags()),
110-
$amqpAttributes
111-
);
73+
try {
74+
$this->doSend($destination, $message);
75+
} catch (\Exception $e) {
76+
throw new Exception($e->getMessage(), $e->getCode(), $e);
11277
}
11378
}
11479

@@ -167,4 +132,50 @@ public function getTimeToLive()
167132
{
168133
return $this->timeToLive;
169134
}
135+
136+
private function doSend(AmqpDestination $destination, AmqpMessage $message)
137+
{
138+
if (null !== $this->priority && null === $message->getPriority()) {
139+
$message->setPriority($this->priority);
140+
}
141+
142+
if (null !== $this->timeToLive && null === $message->getExpiration()) {
143+
$message->setExpiration($this->timeToLive);
144+
}
145+
146+
$amqpAttributes = $message->getHeaders();
147+
148+
if ($message->getProperties()) {
149+
$amqpAttributes['headers'] = $message->getProperties();
150+
}
151+
152+
if ($this->deliveryDelay) {
153+
$this->delayStrategy->delayMessage($this->context, $destination, $message, $this->deliveryDelay);
154+
} elseif ($destination instanceof AmqpTopic) {
155+
$amqpExchange = new \AMQPExchange($this->amqpChannel);
156+
$amqpExchange->setType($destination->getType());
157+
$amqpExchange->setName($destination->getTopicName());
158+
$amqpExchange->setFlags(Flags::convertTopicFlags($destination->getFlags()));
159+
$amqpExchange->setArguments($destination->getArguments());
160+
161+
$amqpExchange->publish(
162+
$message->getBody(),
163+
$message->getRoutingKey(),
164+
Flags::convertMessageFlags($message->getFlags()),
165+
$amqpAttributes
166+
);
167+
} else {
168+
/** @var AmqpQueue $destination */
169+
$amqpExchange = new \AMQPExchange($this->amqpChannel);
170+
$amqpExchange->setType(AMQP_EX_TYPE_DIRECT);
171+
$amqpExchange->setName('');
172+
173+
$amqpExchange->publish(
174+
$message->getBody(),
175+
$destination->getQueueName(),
176+
Flags::convertMessageFlags($message->getFlags()),
177+
$amqpAttributes
178+
);
179+
}
180+
}
170181
}

0 commit comments

Comments
 (0)