Skip to content

Commit 398c52a

Browse files
committed
[amqp-lib] fixes and impr for basic consume feature.
1 parent e7baf3b commit 398c52a

File tree

5 files changed

+120
-149
lines changed

5 files changed

+120
-149
lines changed

pkg/amqp-lib/AmqpConsumer.php

Lines changed: 31 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@
55
use Interop\Amqp\AmqpConsumer as InteropAmqpConsumer;
66
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
77
use Interop\Amqp\AmqpQueue as InteropAmqpQueue;
8-
use Interop\Amqp\Impl\AmqpMessage;
9-
use Interop\Queue\Exception;
108
use Interop\Queue\InvalidMessageException;
119
use Interop\Queue\PsrMessage;
1210
use PhpAmqpLib\Channel\AMQPChannel;
13-
use PhpAmqpLib\Exception\AMQPTimeoutException;
14-
use PhpAmqpLib\Message\AMQPMessage as LibAMQPMessage;
15-
use PhpAmqpLib\Wire\AMQPTable;
1611

1712
class AmqpConsumer implements InteropAmqpConsumer
1813
{
14+
/**
15+
* @var AmqpContext
16+
*/
17+
private $context;
18+
1919
/**
2020
* @var AMQPChannel
2121
*/
@@ -31,11 +31,6 @@ class AmqpConsumer implements InteropAmqpConsumer
3131
*/
3232
private $buffer;
3333

34-
/**
35-
* @var bool
36-
*/
37-
private $isInit;
38-
3934
/**
4035
* @var string
4136
*/
@@ -52,31 +47,26 @@ class AmqpConsumer implements InteropAmqpConsumer
5247
private $consumerTag;
5348

5449
/**
55-
* @param AMQPChannel $channel
50+
* @param AmqpContext $context
5651
* @param InteropAmqpQueue $queue
5752
* @param Buffer $buffer
5853
* @param string $receiveMethod
5954
*/
60-
public function __construct(AMQPChannel $channel, InteropAmqpQueue $queue, Buffer $buffer, $receiveMethod)
55+
public function __construct(AmqpContext $context, InteropAmqpQueue $queue, Buffer $buffer, $receiveMethod)
6156
{
62-
$this->channel = $channel;
57+
$this->context = $context;
58+
$this->channel = $context->getLibChannel();
6359
$this->queue = $queue;
6460
$this->buffer = $buffer;
6561
$this->receiveMethod = $receiveMethod;
6662
$this->flags = self::FLAG_NOPARAM;
67-
68-
$this->isInit = false;
6963
}
7064

7165
/**
7266
* {@inheritdoc}
7367
*/
7468
public function setConsumerTag($consumerTag)
7569
{
76-
if ($this->isInit) {
77-
throw new Exception('Consumer tag is not mutable after it has been subscribed to broker');
78-
}
79-
8070
$this->consumerTag = $consumerTag;
8171
}
8272

@@ -152,7 +142,7 @@ public function receive($timeout = 0)
152142
public function receiveNoWait()
153143
{
154144
if ($message = $this->channel->basic_get($this->queue->getQueueName(), (bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOACK))) {
155-
return $this->convertMessage($message);
145+
return $this->context->convertMessage($message);
156146
}
157147
}
158148

@@ -177,30 +167,6 @@ public function reject(PsrMessage $message, $requeue = false)
177167
$this->channel->basic_reject($message->getDeliveryTag(), $requeue);
178168
}
179169

180-
/**
181-
* @param LibAMQPMessage $amqpMessage
182-
*
183-
* @return InteropAmqpMessage
184-
*/
185-
private function convertMessage(LibAMQPMessage $amqpMessage)
186-
{
187-
$headers = new AMQPTable($amqpMessage->get_properties());
188-
$headers = $headers->getNativeData();
189-
190-
$properties = [];
191-
if (isset($headers['application_headers'])) {
192-
$properties = $headers['application_headers'];
193-
}
194-
unset($headers['application_headers']);
195-
196-
$message = new AmqpMessage($amqpMessage->getBody(), $properties, $headers);
197-
$message->setDeliveryTag($amqpMessage->delivery_info['delivery_tag']);
198-
$message->setRedelivered($amqpMessage->delivery_info['redelivered']);
199-
$message->setRoutingKey($amqpMessage->delivery_info['routing_key']);
200-
201-
return $message;
202-
}
203-
204170
/**
205171
* @param int $timeout
206172
*
@@ -226,63 +192,41 @@ private function receiveBasicGet($timeout)
226192
*/
227193
private function receiveBasicConsume($timeout)
228194
{
229-
if (false === $this->isInit) {
230-
$callback = function (LibAMQPMessage $message) {
231-
$receivedMessage = $this->convertMessage($message);
232-
$receivedMessage->setConsumerTag($message->delivery_info['consumer_tag']);
233-
234-
$this->buffer->push($receivedMessage->getConsumerTag(), $receivedMessage);
235-
};
236-
237-
$consumerTag = $this->channel->basic_consume(
238-
$this->queue->getQueueName(),
239-
$this->getConsumerTag() ?: $this->getQueue()->getConsumerTag(),
240-
(bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOLOCAL),
241-
(bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOACK),
242-
(bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_EXCLUSIVE),
243-
(bool) ($this->getFlags() & InteropAmqpConsumer::FLAG_NOWAIT),
244-
$callback
245-
);
246-
247-
$this->consumerTag = $consumerTag ?: $this->getQueue()->getConsumerTag();
248-
249-
if (empty($this->consumerTag)) {
250-
throw new Exception('Got empty consumer tag');
251-
}
195+
if (false == $this->consumerTag) {
196+
$this->context->subscribe($this, function (InteropAmqpMessage $message) {
197+
$this->buffer->push($message->getConsumerTag(), $message);
252198

253-
$this->isInit = true;
199+
return false;
200+
});
254201
}
255202

256203
if ($message = $this->buffer->pop($this->consumerTag)) {
257204
return $message;
258205
}
259206

260-
try {
261-
while (true) {
262-
$start = microtime(true);
207+
while (true) {
208+
$start = microtime(true);
263209

264-
$this->channel->wait(null, false, $timeout / 1000);
210+
$this->context->consume($timeout);
265211

266-
if ($message = $this->buffer->pop($this->consumerTag)) {
267-
return $message;
268-
}
212+
if ($message = $this->buffer->pop($this->consumerTag)) {
213+
return $message;
214+
}
269215

270-
// is here when consumed message is not for this consumer
216+
// is here when consumed message is not for this consumer
271217

272-
// as timeout is infinite have to continue consumption, but it can overflow message buffer
273-
if ($timeout <= 0) {
274-
continue;
275-
}
218+
// as timeout is infinite have to continue consumption, but it can overflow message buffer
219+
if ($timeout <= 0) {
220+
continue;
221+
}
276222

277-
// compute remaining timeout and continue until time is up
278-
$stop = microtime(true);
279-
$timeout -= ($stop - $start) * 1000;
223+
// compute remaining timeout and continue until time is up
224+
$stop = microtime(true);
225+
$timeout -= ($stop - $start) * 1000;
280226

281-
if ($timeout <= 0) {
282-
break;
283-
}
227+
if ($timeout <= 0) {
228+
break;
284229
}
285-
} catch (AMQPTimeoutException $e) {
286230
}
287231
}
288232
}

pkg/amqp-lib/AmqpContext.php

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,10 +120,10 @@ public function createConsumer(PsrDestination $destination)
120120
$queue = $this->createTemporaryQueue();
121121
$this->bind(new AmqpBind($destination, $queue, $queue->getQueueName()));
122122

123-
return new AmqpConsumer($this->getLibChannel(), $queue, $this->buffer, $this->config['receive_method']);
123+
return new AmqpConsumer($this, $queue, $this->buffer, $this->config['receive_method']);
124124
}
125125

126-
return new AmqpConsumer($this->getLibChannel(), $destination, $this->buffer, $this->config['receive_method']);
126+
return new AmqpConsumer($this, $destination, $this->buffer, $this->config['receive_method']);
127127
}
128128

129129
/**
@@ -422,11 +422,13 @@ public function getLibChannel()
422422
}
423423

424424
/**
425+
* @internal It must be used here and in the consumer only
426+
*
425427
* @param LibAMQPMessage $amqpMessage
426428
*
427429
* @return InteropAmqpMessage
428430
*/
429-
private function convertMessage(LibAMQPMessage $amqpMessage)
431+
public function convertMessage(LibAMQPMessage $amqpMessage)
430432
{
431433
$headers = new AMQPTable($amqpMessage->get_properties());
432434
$headers = $headers->getNativeData();

0 commit comments

Comments
 (0)