Skip to content

Commit 8a8287c

Browse files
committed
[amqp-ext] fixes and impr for basic consume feature.
1 parent 398c52a commit 8a8287c

File tree

4 files changed

+36
-79
lines changed

4 files changed

+36
-79
lines changed

pkg/amqp-ext/AmqpConsumer.php

Lines changed: 25 additions & 78 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
use Interop\Amqp\AmqpMessage as InteropAmqpMessage;
77
use Interop\Amqp\AmqpQueue;
88
use Interop\Amqp\Impl\AmqpMessage;
9-
use Interop\Queue\Exception;
109
use Interop\Queue\InvalidMessageException;
1110
use Interop\Queue\PsrMessage;
1211

@@ -32,11 +31,6 @@ class AmqpConsumer implements InteropAmqpConsumer
3231
*/
3332
private $extQueue;
3433

35-
/**
36-
* @var bool
37-
*/
38-
private $isInit;
39-
4034
/**
4135
* @var string
4236
*/
@@ -65,19 +59,13 @@ public function __construct(AmqpContext $context, AmqpQueue $queue, Buffer $buff
6559
$this->buffer = $buffer;
6660
$this->receiveMethod = $receiveMethod;
6761
$this->flags = self::FLAG_NOPARAM;
68-
69-
$this->isInit = false;
7062
}
7163

7264
/**
7365
* {@inheritdoc}
7466
*/
7567
public function setConsumerTag($consumerTag)
7668
{
77-
if ($this->isInit) {
78-
throw new Exception('Consumer tag is not mutable after it has been subscribed to broker');
79-
}
80-
8169
$this->consumerTag = $consumerTag;
8270
}
8371

@@ -157,7 +145,7 @@ public function receive($timeout = 0)
157145
public function receiveNoWait()
158146
{
159147
if ($extMessage = $this->getExtQueue()->get(Flags::convertConsumerFlags($this->flags))) {
160-
return $this->convertMessage($extMessage);
148+
return $this->context->convertMessage($extMessage);
161149
}
162150
}
163151

@@ -213,85 +201,44 @@ private function receiveBasicGet($timeout)
213201
*/
214202
private function receiveBasicConsume($timeout)
215203
{
216-
if ($this->isInit && $message = $this->buffer->pop($this->getExtQueue()->getConsumerTag())) {
217-
return $message;
204+
if (false == $this->consumerTag) {
205+
$this->context->subscribe($this, function (InteropAmqpMessage $message) {
206+
$this->buffer->push($message->getConsumerTag(), $message);
207+
208+
return false;
209+
});
218210
}
219211

220-
/** @var \AMQPQueue $extQueue */
221-
$extConnection = $this->getExtQueue()->getChannel()->getConnection();
212+
if ($message = $this->buffer->pop($this->consumerTag)) {
213+
return $message;
214+
}
222215

223-
$originalTimeout = $extConnection->getReadTimeout();
224-
try {
225-
$extConnection->setReadTimeout($timeout / 1000);
216+
while (true) {
217+
$start = microtime(true);
226218

227-
if (false == $this->isInit) {
228-
$this->getExtQueue()->consume(null, Flags::convertConsumerFlags($this->flags), $this->consumerTag);
219+
$this->context->consume($timeout);
229220

230-
$this->isInit = true;
221+
if ($message = $this->buffer->pop($this->consumerTag)) {
222+
return $message;
231223
}
232224

233-
/** @var AmqpMessage|null $message */
234-
$message = null;
235-
236-
$this->getExtQueue()->consume(function (\AMQPEnvelope $extEnvelope, \AMQPQueue $q) use (&$message) {
237-
$message = $this->convertMessage($extEnvelope);
238-
$message->setConsumerTag($q->getConsumerTag());
239-
240-
if ($this->getExtQueue()->getConsumerTag() == $q->getConsumerTag()) {
241-
return false;
242-
}
225+
// is here when consumed message is not for this consumer
243226

244-
// not our message, put it to buffer and continue.
245-
$this->buffer->push($q->getConsumerTag(), $message);
246-
247-
$message = null;
227+
// as timeout is infinite have to continue consumption, but it can overflow message buffer
228+
if ($timeout <= 0) {
229+
continue;
230+
}
248231

249-
return true;
250-
}, AMQP_JUST_CONSUME);
232+
// compute remaining timeout and continue until time is up
233+
$stop = microtime(true);
234+
$timeout -= ($stop - $start) * 1000;
251235

252-
return $message;
253-
} catch (\AMQPQueueException $e) {
254-
if ('Consumer timeout exceed' == $e->getMessage()) {
255-
return null;
236+
if ($timeout <= 0) {
237+
break;
256238
}
257-
258-
throw $e;
259-
} finally {
260-
$extConnection->setReadTimeout($originalTimeout);
261239
}
262240
}
263241

264-
/**
265-
* @param \AMQPEnvelope $extEnvelope
266-
*
267-
* @return AmqpMessage
268-
*/
269-
private function convertMessage(\AMQPEnvelope $extEnvelope)
270-
{
271-
$message = new AmqpMessage(
272-
$extEnvelope->getBody(),
273-
$extEnvelope->getHeaders(),
274-
[
275-
'message_id' => $extEnvelope->getMessageId(),
276-
'correlation_id' => $extEnvelope->getCorrelationId(),
277-
'app_id' => $extEnvelope->getAppId(),
278-
'type' => $extEnvelope->getType(),
279-
'content_encoding' => $extEnvelope->getContentEncoding(),
280-
'content_type' => $extEnvelope->getContentType(),
281-
'expiration' => $extEnvelope->getExpiration(),
282-
'priority' => $extEnvelope->getPriority(),
283-
'reply_to' => $extEnvelope->getReplyTo(),
284-
'timestamp' => $extEnvelope->getTimeStamp(),
285-
'user_id' => $extEnvelope->getUserId(),
286-
]
287-
);
288-
$message->setRedelivered($extEnvelope->isRedelivery());
289-
$message->setDeliveryTag($extEnvelope->getDeliveryTag());
290-
$message->setRoutingKey($extEnvelope->getRoutingKey());
291-
292-
return $message;
293-
}
294-
295242
/**
296243
* @return \AMQPQueue
297244
*/

pkg/amqp-ext/AmqpContext.php

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -384,11 +384,13 @@ public function consume($timeout = 0)
384384
}
385385

386386
/**
387+
* @internal It must be used here and in the consumer only
388+
*
387389
* @param \AMQPEnvelope $extEnvelope
388390
*
389391
* @return AmqpMessage
390392
*/
391-
private function convertMessage(\AMQPEnvelope $extEnvelope)
393+
public function convertMessage(\AMQPEnvelope $extEnvelope)
392394
{
393395
$message = new AmqpMessage(
394396
$extEnvelope->getBody(),

pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDelayPluginStrategyTest.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\AmqpExt\Tests\Spec;
44

55
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Enqueue\AmqpExt\AmqpContext;
67
use Enqueue\AmqpTools\RabbitMqDelayPluginDelayStrategy;
78
use Interop\Queue\PsrContext;
89
use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec;
@@ -24,13 +25,16 @@ protected function createContext()
2425
}
2526

2627
/**
28+
* @param AmqpContext $context
29+
*
2730
* {@inheritdoc}
2831
*/
2932
protected function createQueue(PsrContext $context, $queueName)
3033
{
3134
$queue = parent::createQueue($context, $queueName);
3235

3336
$context->declareQueue($queue);
37+
$context->purgeQueue($queue);
3438

3539
return $queue;
3640
}

pkg/amqp-ext/Tests/Spec/AmqpSendAndReceiveDelayedMessageWithDlxStrategyTest.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace Enqueue\AmqpExt\Tests\Spec;
44

55
use Enqueue\AmqpExt\AmqpConnectionFactory;
6+
use Enqueue\AmqpExt\AmqpContext;
67
use Enqueue\AmqpTools\RabbitMqDlxDelayStrategy;
78
use Interop\Queue\PsrContext;
89
use Interop\Queue\Spec\SendAndReceiveDelayedMessageFromQueueSpec;
@@ -24,13 +25,16 @@ protected function createContext()
2425
}
2526

2627
/**
28+
* @param AmqpContext $context
29+
*
2730
* {@inheritdoc}
2831
*/
2932
protected function createQueue(PsrContext $context, $queueName)
3033
{
3134
$queue = parent::createQueue($context, $queueName);
3235

3336
$context->declareQueue($queue);
37+
$context->purgeQueue($queue);
3438

3539
return $queue;
3640
}

0 commit comments

Comments
 (0)